pipe_basic.cpp File Reference

Detailed Description

Some basic usage examples of the pipeline (and farm) pattern.

using namespace ff;
// a stream counter
static int k=-1;
// a task
struct myTask {
myTask(int n,int* V):n(n),V(V) {}
int n;
int *V;
// f1 f2 and f3 are 3 functions which get in input a task
// pointer and return a task pointer.
// memory management is up to the user
myTask* f1(myTask* in, ff_node*const) {
if (++k == 10) { printf("f1 END\n"); return NULL;}
if (in==NULL) {
//int *V = new int[k+5]{}; brace-init does not work with Intel compiler
int *V = new int[k+5];
for(int i=0;i<(k+5);++i) V[i]=0;
return new myTask(k+5, V);
return in;
myTask* f2(myTask *in, ff_node*const) {
for(int i=0;i<in->n;++i) in->V[i]++;
return in;
myTask* f3(myTask *in, ff_node*const) {
printf("f3 received: ");
for(int i=0;i<in->n;++i) printf(" %d ", in->V[i]);
return in;
typedef std::function<myTask*(myTask*,ff_node*const)> func_t;
int main() {
/* ------------------------------------------- */
// Basic 3-stage pipeline f1;f2;f3
ff_pipe<myTask> pipe1(f1,f2,f3);
printf("done 1st\n\n");
/* ------------------------------------------- */
k=-1; // reset k
/* ------------------------------------------- */
// functions may also be lambda functions
auto lambda1 = [] (myTask *in,ff_node*const) -> myTask* {
return f1(in,nullptr);
ff_pipe<myTask> pipe1(lambda1,f2,f3);
printf("done 1st with lambda\n\n");
/* ------------------------------------------- */
k=-1; // reset k
/* ------------------------------------------- */
// ...with some more stages
ff_pipe<myTask> pipe2(f1,f2,f2,f2,f3);
printf("done 2nd\n\n");
/* ------------------------------------------- */
k=-1; // reset k
/* ------------------------------------------- */
// composing 2 pipelines
ff_pipe<myTask> pipe0(f2,f2);
ff_pipe<myTask> pipe3(f1, &pipe0,f3);
printf("done 3rd\n\n");
/* ------------------------------------------- */
k=-1; // reset k
/* ------------------------------------------- */
// farm introduction. The pipeline has also a feedback channel.
ff_pipe<myTask> pipe4(f1,new ff_farm<>((func_t)f2,3),new ff_farm<>((func_t)f3,2));
printf("done 4th\n\n");
/* ------------------------------------------- */
/* ------------------------------------------- */
// ... just a little bit more complicated.
// A pipeline with 3 farms, each farm without collector.
// The emitter of the first farm produces the stream.
auto lambda = []() -> void* {
static int k = 0;
if (k++ == 10) { printf("Emitter END\n"); return NULL;}
int *V = new int[k+5];
for(int i=0;i<(k+5);++i) V[i]=0;
return new myTask(k+5, V);
struct Emitter:public ff_node {
std::function<void*()> F;
Emitter(std::function<void*()> F):F(F) {}
void *svc(void*) { return F(); }
ff_farm<> farm1((func_t)f1,2);
ff_farm<> farm2((func_t)f2,3);
ff_farm<> farm3((func_t)f3,2);
ff_pipe<myTask> pipe5(&farm1,&farm2,&farm3);
farm1.add_emitter(new Emitter(lambda));
printf("done 5th\n\n");
/* ------------------------------------------- */
k=-1; // reset k
/* ------------------------------------------- */
// Pipeline of 3 stages: sequential, sequential
// and farm-with-feedback.
ff_farm<> farm0((func_t)f3, 3);
ff_pipe<myTask> pipe6(f1,f2,&farm0);
struct Scheduler:public ff_node {
Scheduler(ff_loadbalancer* lb):lb(lb) {}
void *svc(void *t) {
// do something smart here :)
if (lb->get_channel_id() == -1) {
return t;
return GO_ON;
void eosnotify(ssize_t id) {
if (id==-1) lb->broadcast_task(EOS);
farm0.add_emitter(new Scheduler(farm0.getlb()));
printf("done 6th\n\n");
/* ------------------------------------------- */
return 0;