#include <iostream> #include <cstdlib> #include <ff/pipeline.hpp> #include <ff/farm.hpp> #define DELAYSUM 100000 #define DELAYSQ 500000 #define MAX(a,b) (a>b ? a : b) using namespace std; using namespace ff; class Iota : public ff_node { private: int m; public: Iota(int m):m(m) {} void * svc(void * t) { for(int i=1; i<=m; i++) { int * n = new int(i); ff_send_out((void *) n); } return((void *) FF_EOS); } }; class Summer : public ff_node { private: int sum; public: int svc_init() { sum = 0; return(0); } void * svc(void * t) { sum += *((int *) t); return (GO_ON); } void svc_end() { cout << "sum = " << sum << endl; return; } }; class Incr : public ff_node { void * svc(void * t) { int * r = new int( ++( *((int *) t)) ); free(t); usleep(DELAYSUM); return ((void *) r); } }; class Square : public ff_node { void * svc(void * t) { int x = *((int *) t); free(t); usleep(DELAYSQ); return ((void *) new int(x*x)); } }; // a.out maxnum nw int main(int argc, char * argv[]) { int m = atoi(argv[1]); int nw1 = atoi(argv[2]); int nw2 = atoi(argv[3]); std::cout << "Computing " << m << " tasks" << std::endl;; std::cout << "DELAYSUM is " << DELAYSUM/1000 << " msecs" << endl;; std::cout << "DELAYSQ is " << DELAYSQ/1000 << " msecs" << endl;; // build main pipeline : four stages(generator, farm(inc) farm(sq), summer) ff_pipeline mainPipe; mainPipe.add_stage(new Iota(m)); // generator // second stage: farm(inc) // as much workers as the parameter from the command line ff_farm<> farmstage2; vector<ff_node*> w1; for(int i=0; i<nw1; i++) w1.push_back(new Incr); farmstage2.add_workers(w1); farmstage2.add_collector(NULL); // adds default collector // default emitter already here mainPipe.add_stage(&farmstage2); // third stage: farm(sq) // as much workers as the parameter from the command line ff_farm<> farmstage3; vector<ff_node*> w2; for(int i=0; i<nw2; i++) w2.push_back(new Square); farmstage3.add_workers(w2); farmstage3.add_collector(NULL); // adds default collector // default emitter already here mainPipe.add_stage(&farmstage3); mainPipe.add_stage(new Summer); mainPipe.run_and_wait_end(); std::cout << "Spent " << mainPipe.ffTime() << " msecs " << std::endl; return(0); }