using namespace ff;
static int k=-1;
struct myTask {
myTask(int n,int* V):n(n),V(V) {}
int n;
int *V;
};
myTask* f1(myTask* in,
ff_node*
const) {
if (++k == 10) { printf("f1 END\n"); return NULL;}
if (in==NULL) {
int *V = new int[k+5];
for(int i=0;i<(k+5);++i) V[i]=0;
return new myTask(k+5, V);
}
return in;
}
myTask* f2(myTask *in,
ff_node*
const) {
for(int i=0;i<in->n;++i) in->V[i]++;
return in;
}
myTask* f3(myTask *in,
ff_node*
const) {
printf("f3 received: ");
for(int i=0;i<in->n;++i) printf(" %d ", in->V[i]);
printf("\n");
return in;
}
typedef std::function<myTask*(myTask*,ff_node*const)> func_t;
int main() {
pipe1.run_and_wait_end();
printf("done 1st\n\n");
k=-1;
{
auto lambda1 = [] (myTask *in,
ff_node*
const) -> myTask* {
return f1(in,nullptr);
};
pipe1.run_and_wait_end();
printf("done 1st with lambda\n\n");
}
k=-1;
pipe2.run_and_wait_end();
printf("done 2nd\n\n");
k=-1;
pipe3.run_and_wait_end();
printf("done 3rd\n\n");
k=-1;
pipe4.add_feedback();
pipe4.run_and_wait_end();
printf("done 4th\n\n");
auto lambda = []() -> void* {
static int k = 0;
if (k++ == 10) { printf("Emitter END\n"); return NULL;}
int *V = new int[k+5];
for(int i=0;i<(k+5);++i) V[i]=0;
return new myTask(k+5, V);
};
std::function<void*()> F;
Emitter(std::function<void*()> F):F(F) {}
void *svc(void*) { return F(); }
};
farm1.add_emitter(new Emitter(lambda));
farm1.remove_collector();
farm2.setMultiInput();
farm2.remove_collector();
farm3.setMultiInput();
farm3.remove_collector();
pipe5.run_and_wait_end();
printf("done 5th\n\n");
k=-1;
void *svc(void *t) {
if (lb->get_channel_id() == -1) {
return t;
}
return GO_ON;
}
void eosnotify(ssize_t id) {
if (id==-1) lb->broadcast_task(EOS);
}
};
farm0.add_emitter(new Scheduler(farm0.getlb()));
farm0.remove_collector();
farm0.wrap_around();
pipe6.run_and_wait_end();
printf("done 6th\n\n");
return 0;
}