This is the simple pipeline code:
#include <iostream> #include <cstdlib> #include <ff/pipeline.hpp> using namespace std; using namespace ff; class Iota : public ff_node { private: int m; public: Iota(int m):m(m) {} void * svc(void * t) { for(int i=1; i<=m; i++) { int * n = new int(i); ff_send_out((void *) n); } return((void *) FF_EOS); } }; class Incr : public ff_node { void * svc(void * t) { int * r = new int( ++( *((int *) t)) ); free(t); usleep(500000); return ((void *) r); } }; class Square : public ff_node { void * svc(void * t) { int x = *((int *) t); free(t); usleep(500000); return ((void *) new int(x*x)); } }; class Summer : public ff_node { private: int sum; public: int svc_init() { sum = 0; return(0); } void * svc(void * t) { sum += *((int *) t); return (GO_ON); } void svc_end() { cout << "sum = " << sum << endl; return; } }; int main(int argc, char * argv[]) { ff_pipeline mainPipe; mainPipe.add_stage(new Iota(atoi(argv[1]))); // generator mainPipe.add_stage(new Incr); mainPipe.add_stage(new Square); mainPipe.add_stage(new Summer); mainPipe.run_and_wait_end(); return(0); }