Ex 1

Sample solution code:

pipe3.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License version 2 as
 *  published by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *  As a special exception, you may use this file as part of a free software
 *  library without restriction.  Specifically, if other files instantiate
 *  templates or use macros or inline functions from this file, or you compile
 *  this file and link it with other files to produce an executable, this
 *  file does not by itself cause the resulting executable to be covered by
 *  the GNU General Public License.  This exception does not however
 *  invalidate any other reasons why the executable file might be covered by
 *  the GNU General Public License.
 *
 ****************************************************************************
 */
/*
 * FastFlow pipeline basic: 3-stage pipeline
 * Exercise 1
 *
 */
 
 
#include <cstdlib>
#include <cstdio>
#include <vector>
#include <ff/pipeline.hpp>
 
using namespace ff;
 
const size_t MAX_LENGTH = 8192;
 
struct task_t {
    task_t() {}
    task_t(size_t l):A(l),B(l) {}
 
    std::vector<size_t> A;
    std::vector<size_t> B;
};
 
 
// generic stage
class Stage1: public ff_node {
public:
    Stage1(unsigned streamlen):streamlen(streamlen), seed(111) {}
 
    int svc_init() {
        srandom(seed);
        return 0;
    }
 
    void * svc(void * t) {
        //The simplest (and the most efficient) way to produce a stream in outpu
t is to use ff_send_out
        /*
        for(size_t i=0;i<streamlen;++i) {
            size_t l = (random() % MAX_LENGTH) + 1;
            task_t *task = new task_t(l);
            for(size_t j=0;j<l;++j) {
                task->A[j] = random() % 100000;
                task->B[j] = random() % 100000;
            }
 
            ff_send_out(task);
        }
        return NULL;
        */
 
        // this is another way. It works only for the first stage!!
        if (streamlen-- == 0) return NULL;
        size_t l = (random() % MAX_LENGTH) + 1;
        task_t *task = new task_t(l);
        for(size_t j=0;j<l;++j) {
            task->A[j] = random() % 100000;
            task->B[j] = random() % 100000;
        }
        return task;
    }
private:
    unsigned streamlen;
    unsigned seed;
};
 
class Stage2: public ff_node {
public:
    void *svc(void *task) {
        // g
        task_t *t = static_cast<task_t*>(task);
        size_t sum = 0;
        for(size_t j=0;j<t->A.size();++j)
            sum += t->A[j] + t->B[j];
        delete t;
 
        return (void*)sum;
    }
};
 
class Stage3: public ff_node {
public:
    int svc_init() { sum=0; return 0;}
    void *svc(void *task) {
        // h
        size_t t = reinterpret_cast<size_t>(task);
        std::cout << "Stage " << get_my_id() << " has received " << t << "\n";
        sum += t;
        return GO_ON;
    }
    void svc_end() {
        std::cout << "Final sum= " << sum << "\n";
    }
private:
    size_t sum;
};
 
int main(int argc, char * argv[]) {
    if (argc!=2) {
        std::cerr << "use: "  << argv[0] << " streamlen\n";
        return -1;
    }
 
    // bild a 2-stage pipeline
    ff_pipeline pipe;
    pipe.add_stage(new Stage1(atoi(argv[1])));
    pipe.add_stage(new Stage2);
    pipe.add_stage(new Stage3);
 
    if (pipe.run_and_wait_end()<0) {
        error("running pipeline\n");
        return -1;
    }
    std::cout << "DONE, pipe  time= " << pipe.ffTime() << " (ms)\n";
    pipe.ffStats(std::cerr);
    return 0;
}