FastFlow  SVN-r182-Aug-14-2014
A high-level, lock-less, parallel programming (shared-memory) and distributed programming (distributed-memory) framework for multi-cores and many-cores systems
 All Classes Namespaces Files Functions Variables Typedefs Groups Pages
Files | Classes | Functions
Core patterns

Core patterns (basic streaming patterns) More...

+ Collaboration diagram for Core patterns:

Files

file  allocator.hpp
 Implementations of the FastFlow's lock-free allocator.
 
file  farm.hpp
 Farm pattern.
 
file  pipeline.hpp
 This file implements the pipeline skeleton, both in the high-level pattern syntax (ff::ff_pipe) and low-level syntax (ff::ff_pipeline)
 

Classes

class  ff::FFAllocator
 A user-space parallel allocator (process-wide) More...
 
class  ff::ff_farm< lb_t, gt_t >
 The Farm skeleton, with Emitter (lb_t) and Collector (gt_t). More...
 
class  ff::ff_ofarm
 The ordered Farm skeleton. More...
 
class  ff::ff_pipeline
 The Pipeline skeleton (low-level syntax) More...
 

Functions

 ff::ff_farm< lb_t, gt_t >::ff_farm (std::vector< ff_node * > &W, ff_node *const Emitter=NULL, ff_node *const Collector=NULL, bool input_ch=false)
 Core patterns constructor 2. More...
 
 ff::ff_farm< lb_t, gt_t >::ff_farm (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool worker_cleanup=false, int max_num_workers=DEF_MAX_NUM_WORKERS, bool fixedsize=false)
 Core patterns constructor 1. More...
 

Detailed Description

Core patterns (basic streaming patterns)

Basic streaming patterns: pipeline, farm and loopback qualifier


Class Documentation

class ff::FFAllocator

A user-space parallel allocator (process-wide)

Based on the ff_allocator, the FFAllocator might be used by any number of ff:ff_node (i.e. threads) to dynamically allocate/deallocate memory. It consists on a network of ff_allocator per ff_node. It uses ff::uSWSR_Ptr_Buffer to avoid deadlocks.

FFAllocator will be created as a static object on the first call to FFAllocator::instance()

Usage example:

#define INIT() // no init needed
#define MALLOC(size) (FFAllocator::instance()->malloc(size))
#define FREE(ptr,unused) (FFAllocator::instance()->free(ptr))

Public Member Functions

 FFAllocator (int delayedReclaim=0)
 Constructor. More...
 
 ~FFAllocator ()
 Destructor. More...
 
void * malloc (size_t size)
 malloc More...
 
int posix_memalign (void **memptr, size_t alignment, size_t size)
 ff posix_memalign More...
 
void free (void *ptr)
 free More...
 
void * realloc (void *ptr, size_t newsize)
 realloc. More...
 

Static Public Member Functions

static FFAllocatorinstance ()
 Returns an instance of the FFAllocator object.
 

Constructor & Destructor Documentation

ff::FFAllocator::FFAllocator ( int  delayedReclaim = 0)
inline

Constructor.

Parameters
delayedReclaimDeferred reclamation configuration
ff::FFAllocator::~FFAllocator ( )
inline

Destructor.

Delete the allocator and return pre-allocated memory to the OS

Member Function Documentation

void ff::FFAllocator::free ( void *  ptr)
inline

free

free the requested memory chunk

Parameters
ptra pointer to the buffer.
void* ff::FFAllocator::malloc ( size_t  size)
inline

malloc

Request to allocate size bytes. If the size is too large, use OS malloc to get a new chunk of memory. Otherwise use the ff_allocator on the calling ff_node

Parameters
sizethe size of memory requested
Returns
pointer to allocated memory chunk
int ff::FFAllocator::posix_memalign ( void **  memptr,
size_t  alignment,
size_t  size 
)
inline

ff posix_memalign

Parameters
[out]*memptrpointer to a chunk of memory where the aligned memory will be returned
alignmentallocation's base address is an exact multiple of alignment, which must be a power of 2 at least as large as sizeof(void *)
sizethe size of memory requested.
Returns
0 if successful; otherwise it returns an error value.
void* ff::FFAllocator::realloc ( void *  ptr,
size_t  newsize 
)
inline

realloc.

It changes the size of the memory block pointed to by ptr to newsize bytes.

Parameters
ptrpointer to the buffer
newsizethe new size.
class ff::ff_farm

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
class ff::ff_farm< lb_t, gt_t >

The Farm skeleton, with Emitter (lb_t) and Collector (gt_t).

The Farm skeleton can be seen as a 3-stages pipeline. The first stage is the Emitter (lb_t) that act as a load-balancer; the last (optional) stage would be the Collector (gt_t) that gathers the results computed by the Workers, which are ff_nodes.

This class is defined in farm.hpp

Examples:
perf_test_alloc1.cpp, perf_test_alloc2.cpp, and pipe_basic.cpp.
+ Inheritance diagram for ff::ff_farm< lb_t, gt_t >:
+ Collaboration diagram for ff::ff_farm< lb_t, gt_t >:

Public Member Functions

template<typename T >
 ff_farm (const std::function< T *(T *, ff_node *const)> &F, int nw, bool input_ch=false)
 High-level pattern constructor.
 
 ff_farm (std::vector< ff_node * > &W, ff_node *const Emitter=NULL, ff_node *const Collector=NULL, bool input_ch=false)
 Core patterns constructor 2. More...
 
 ff_farm (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool worker_cleanup=false, int max_num_workers=DEF_MAX_NUM_WORKERS, bool fixedsize=false)
 Core patterns constructor 1. More...
 
 ~ff_farm ()
 Destructor. More...
 
int add_emitter (ff_node *e)
 Adds the emitter. More...
 
void set_scheduling_ondemand (const int inbufferentries=1)
 Set scheduling with on demand polity. More...
 
int add_workers (std::vector< ff_node * > &w)
 Adds workers to the form. More...
 
int add_collector (ff_node *c, bool outpresent=false)
 Adds the collector. More...
 
int wrap_around (bool multi_input=false)
 Sets the feedback channel from the collector to the emitter. More...
 
int remove_collector ()
 Removes the collector. More...
 
int run (bool skip_init=false)
 Execute the Farm. More...
 
virtual int run_and_wait_end ()
 Executs the farm and wait for workers to complete. More...
 
virtual int run_then_freeze (ssize_t nw=-1)
 Executes the farm and then freeze. More...
 
int wait ()
 Puts the thread in waiting state. More...
 
int wait_freezing ()
 Waits for freezing. More...
 
bool offload (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_loadbalancer::TICKS2WAIT)
 
bool load_result (void **task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_gatherer::TICKS2WAIT)
 Loads results into gatherer. More...
 
bool load_result_nb (void **task)
 Loads result with non-blocking. More...
 
double ffTime ()
 Misure ff::ff_node execution time. More...
 
- Public Member Functions inherited from ff::ff_node
virtual bool put (void *ptr)
 Nonblocking put onto output channel. More...
 
virtual bool get (void **ptr)
 Noblocking pop from input channel. More...
 
virtual FFBUFFER * get_in_buffer () const
 Gets input channel. More...
 
virtual FFBUFFER * get_out_buffer () const
 Gets pointer to the output channel. More...
 
virtual bool ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT))
 Sends out the task. More...
 

Protected Member Functions

void skipfirstpop (bool sk)
 Set the ff_node to start with no input task. More...
 
void * svc (void *task)
 svc method
 
int svc_init ()
 The svc_init method.
 
void svc_end ()
 The svc_end method.
 
int create_input_buffer (int nentries, bool fixedsize)
 Creates the input buffer for the emitter node. More...
 
int create_output_buffer (int nentries, bool fixedsize=false)
 Creates the output channel. More...
 
int set_output_buffer (FFBUFFER *const o)
 Sets the output buffer of the collector. More...
 
ff_nodegetEmitter ()
 Gets Emitter. More...
 
ff_nodegetCollector ()
 Gets Collector. More...
 
- Protected Member Functions inherited from ff::ff_node
bool skipfirstpop () const
 Gets the status of spontaneous start. More...
 
virtual int set_input_buffer (FFBUFFER *const i)
 Assign the input channelname to a channel. More...
 
virtual double wffTime ()
 Misure ff_node::svc execution time. More...
 
virtual ~ff_node ()
 Destructor.
 

Constructor & Destructor Documentation

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
ff::ff_farm< lb_t, gt_t >::~ff_farm ( )
inline

Destructor.

Destruct the load balancer, the gatherer, all the workers

Member Function Documentation

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::add_collector ( ff_node c,
bool  outpresent = false 
)
inline

Adds the collector.

It adds the Collector filter to the farm skeleton. If no object is passed as a colelctor, than a default collector will be added (i.e. ff_gatherer). Note that it is not possible to add more than one collector.

Parameters
cCollector object
outpresentoutstream?
Returns
The status of set_filter(x) if successful, otherwise -1 is returned.
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::add_emitter ( ff_node e)
inline

Adds the emitter.

It adds an Emitter to the Farm. The Emitter is of type ff_node and there can be only one Emitter in a Farm skeleton.

Parameters
ethe ff_node acting as an Emitter
Returns
Returns 0 if successful -1 otherwise
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::add_workers ( std::vector< ff_node * > &  w)
inline

Adds workers to the form.

Add workers to the Farm. There is a limit to the number of workers that can be added to a Farm. This limit is set by default to 64. This limit can be augmented by passing the desired limit as the fifth parameter of the ff_farm constructor.

Parameters
wa vector of ff_nodes which are Workers to be attached to the Farm.
Returns
0 if successsful, otherwise -1 is returned.
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::create_input_buffer ( int  nentries,
bool  fixedsize 
)
inlineprotectedvirtual

Creates the input buffer for the emitter node.

This function redefines the ff_node's virtual method of the same name. It creates an input buffer for the Emitter node.

Parameters
nentriesthe size of the buffer
fixedsizeflag to decide whether the buffer is resizable.
Returns
If successful 0, otherwsie a negative value.

Reimplemented from ff::ff_node.

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::create_output_buffer ( int  nentries,
bool  fixedsize = false 
)
inlineprotectedvirtual

Creates the output channel.

Parameters
nentriesthe number of elements of the buffer
fixedsizeflag to decide whether the buffer is bound or unbound. Default is true.
Returns
0 if successful, -1 otherwise

Reimplemented from ff::ff_node.

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
double ff::ff_farm< lb_t, gt_t >::ffTime ( )
inlinevirtual

Misure ff::ff_node execution time.

Returns
time (ms)

Reimplemented from ff::ff_node.

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
ff_node* ff::ff_farm< lb_t, gt_t >::getCollector ( )
inlineprotected

Gets Collector.

It returns a pointer to the collector.

Returns
A pointer to collector node if exists, otherwise a NULL
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
ff_node* ff::ff_farm< lb_t, gt_t >::getEmitter ( )
inlineprotected

Gets Emitter.

It returns a pointer to the emitter.

Returns
A pointer of the FastFlow node which is actually the emitter.
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
bool ff::ff_farm< lb_t, gt_t >::load_result ( void **  task,
unsigned int  retry = ((unsigned int)-1),
unsigned int  ticks = ff_gatherer::TICKS2WAIT 
)
inline

Loads results into gatherer.

It loads the results from the gatherer (if any).

Parameters
taskis a void pointer
retryis the number of tries to load the results
ticksis the number of ticks to wait
Returns
false if EOS arrived or too many retries, true if there is a new value
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
bool ff::ff_farm< lb_t, gt_t >::load_result_nb ( void **  task)
inline

Loads result with non-blocking.

It loads the result with non-blocking situation.

Parameters
taskis a void pointer
Returns
if no task is present, otherwise if there is a new value. It should be checked if the task has a FF_EOS
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
bool ff::ff_farm< lb_t, gt_t >::offload ( void *  task,
unsigned int  retry = ((unsigned int)-1),
unsigned int  ticks = ff_loadbalancer::TICKS2WAIT 
)
inline

Offloads teh task to farm

It offloads the given task to the farm.

Parameters
taskis a void pointer
retryshowing the number of tries to offload
ticksis the number of ticks to wait
Returns
true if successful, otherwise false
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::remove_collector ( )
inline

Removes the collector.

It allows not to start the collector thread, whereas all worker's output buffer will be created as if it were present.

Returns
0 is always returned.
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::run ( bool  skip_init = false)
inlinevirtual

Execute the Farm.

It executes the form.

Parameters
skip_initA booleon value showing if the initialization should be skipped
Returns
If successful 0, otherwise a negative is returned.

Reimplemented from ff::ff_node.

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
virtual int ff::ff_farm< lb_t, gt_t >::run_and_wait_end ( )
inlinevirtual

Executs the farm and wait for workers to complete.

It executes the farm and waits for all workers to complete their tasks.

Returns
If successful 0, otherwise a negative value is returned.
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
virtual int ff::ff_farm< lb_t, gt_t >::run_then_freeze ( ssize_t  nw = -1)
inlinevirtual

Executes the farm and then freeze.

It executs the form and then freezes the form. If workers are frozen, it is possible to wake up just a subset of them.

Returns
If successful 0, otherwise a negative value
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::set_output_buffer ( FFBUFFER *const  o)
inlineprotectedvirtual

Sets the output buffer of the collector.

This function redefines the ff_node's virtual method of the same name. Set the output buffer for the Collector.

Parameters
oa buffer object, which can be of type SWSR_Ptr_Buffer or uSWSR_Ptr_Buffer
Returns
0 if successful, otherwise -1 is returned.

Reimplemented from ff::ff_node.

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
void ff::ff_farm< lb_t, gt_t >::set_scheduling_ondemand ( const int  inbufferentries = 1)
inline

Set scheduling with on demand polity.

The default scheduling policy is round-robin, When there is a great computational difference among tasks the round-robin scheduling policy could lead to load imbalance in worker's workload (expecially with short stream length). The on-demand scheduling policy can guarantee a near optimal load balancing in lots of cases. Alternatively it is always possible to define a complete application-level scheduling by redefining the ff_loadbalancer class.

Parameters
inbufferentriessets the number of queue slot for one worker threads.
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
void ff::ff_farm< lb_t, gt_t >::skipfirstpop ( bool  sk)
inlineprotectedvirtual

Set the ff_node to start with no input task.

Setting it to true let the ff_node execute the svc method spontaneusly before receiving a task on the input channel. skipfirstpop makes it possible to define a "producer" node that starts the network.

Parameters
sktrue start spontaneously (*task will be NULL)

Reimplemented from ff::ff_node.

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::wait ( )
inlinevirtual

Puts the thread in waiting state.

It puts the thread in waiting state.

Returns
0 if successful, otherwise -1 is returned.

Reimplemented from ff::ff_node.

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::wait_freezing ( )
inlinevirtual

Waits for freezing.

It waits for thread to freeze.

Returns
0 if successful otherwise -1 is returned.

Reimplemented from ff::ff_node.

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
int ff::ff_farm< lb_t, gt_t >::wrap_around ( bool  multi_input = false)
inline

Sets the feedback channel from the collector to the emitter.

This method allows to estabilish a feedback channel from the Collector to the Emitter. If the collector is present, than the collector output queue will be connected to the emitter input queue (feedback channel)

Returns
0 if successful, otherwise -1 is returned.
class ff::ff_ofarm

The ordered Farm skeleton.

This class is defined in farm.hpp

+ Inheritance diagram for ff::ff_ofarm:
+ Collaboration diagram for ff::ff_ofarm:

Public Member Functions

 ff_ofarm (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool worker_cleanup=false, int max_num_workers=DEF_MAX_NUM_WORKERS, bool fixedsize=false)
 Constructor. More...
 
 ~ff_ofarm ()
 Destructor. More...
 
int run (bool skip_init=false)
 run
 
- Public Member Functions inherited from ff::ff_farm< ofarm_lb, ofarm_gt >
 ff_farm (const std::function< T *(T *, ff_node *const)> &F, int nw, bool input_ch=false)
 High-level pattern constructor.
 
 ff_farm (std::vector< ff_node * > &W, ff_node *const Emitter=NULL, ff_node *const Collector=NULL, bool input_ch=false)
 Core patterns constructor 2. More...
 
 ff_farm (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool worker_cleanup=false, int max_num_workers=DEF_MAX_NUM_WORKERS, bool fixedsize=false)
 Core patterns constructor 1. More...
 
 ~ff_farm ()
 Destructor. More...
 
int add_emitter (ff_node *e)
 Adds the emitter. More...
 
void set_scheduling_ondemand (const int inbufferentries=1)
 Set scheduling with on demand polity. More...
 
int add_workers (std::vector< ff_node * > &w)
 Adds workers to the form. More...
 
int add_collector (ff_node *c, bool outpresent=false)
 Adds the collector. More...
 
int wrap_around (bool multi_input=false)
 Sets the feedback channel from the collector to the emitter. More...
 
int remove_collector ()
 Removes the collector. More...
 
int run (bool skip_init=false)
 Execute the Farm. More...
 
virtual int run_and_wait_end ()
 Executs the farm and wait for workers to complete. More...
 
virtual int run_then_freeze (ssize_t nw=-1)
 Executes the farm and then freeze. More...
 
int wait ()
 Puts the thread in waiting state. More...
 
int wait_freezing ()
 Waits for freezing. More...
 
bool offload (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_loadbalancer::TICKS2WAIT)
 
bool load_result (void **task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_gatherer::TICKS2WAIT)
 Loads results into gatherer. More...
 
bool load_result_nb (void **task)
 Loads result with non-blocking. More...
 
- Public Member Functions inherited from ff::ff_node
virtual bool put (void *ptr)
 Nonblocking put onto output channel. More...
 
virtual bool get (void **ptr)
 Noblocking pop from input channel. More...
 
virtual FFBUFFER * get_in_buffer () const
 Gets input channel. More...
 
virtual FFBUFFER * get_out_buffer () const
 Gets pointer to the output channel. More...
 
virtual bool ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT))
 Sends out the task. More...
 

Additional Inherited Members

- Protected Member Functions inherited from ff::ff_farm< ofarm_lb, ofarm_gt >
void * svc (void *task)
 svc method
 
int svc_init ()
 The svc_init method.
 
void svc_end ()
 The svc_end method.
 
int create_input_buffer (int nentries, bool fixedsize)
 Creates the input buffer for the emitter node. More...
 
int set_output_buffer (FFBUFFER *const o)
 Sets the output buffer of the collector. More...
 
ff_nodegetEmitter ()
 Gets Emitter. More...
 
ff_nodegetCollector ()
 Gets Collector. More...
 
- Protected Member Functions inherited from ff::ff_node
bool skipfirstpop () const
 Gets the status of spontaneous start. More...
 
virtual int set_input_buffer (FFBUFFER *const i)
 Assign the input channelname to a channel. More...
 
virtual double wffTime ()
 Misure ff_node::svc execution time. More...
 
virtual ~ff_node ()
 Destructor.
 

Constructor & Destructor Documentation

ff::ff_ofarm::ff_ofarm ( bool  input_ch = false,
int  in_buffer_entries = DEF_IN_BUFF_ENTRIES,
int  out_buffer_entries = DEF_OUT_BUFF_ENTRIES,
bool  worker_cleanup = false,
int  max_num_workers = DEF_MAX_NUM_WORKERS,
bool  fixedsize = false 
)
inline

Constructor.

Parameters
input_chstates the presence of input channel
in_buffer_entriesdefines the number of input entries
worker_cleanupstates the cleaning of workers
max_num_workersdefines the maximum number of workers in the ordered farm
fixedsizestates the status of fixed size buffer

References ff::ff_farm< ofarm_lb, ofarm_gt >::add_collector(), and ff::ff_farm< ofarm_lb, ofarm_gt >::add_emitter().

ff::ff_ofarm::~ff_ofarm ( )
inline

Destructor.

It clean emitter and collector.

class ff::ff_pipeline

The Pipeline skeleton (low-level syntax)

+ Inheritance diagram for ff::ff_pipeline:
+ Collaboration diagram for ff::ff_pipeline:

Public Member Functions

 ff_pipeline (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool fixedsize=true)
 Constructor. More...
 
 ~ff_pipeline ()
 Destructor.
 
int add_stage (ff_node *s)
 It adds a stage to the pipeline. More...
 
int wrap_around (bool multi_input=false)
 Feedback channel (pattern modifier) More...
 
int run (bool skip_init=false)
 Run the pipeline skeleton asynchronously. More...
 
int wait ()
 wait for pipeline termination (all stages received EOS)
 
int wait_freezing ()
 wait for pipeline to complete and suspend (all stages received EOS) More...
 
bool offload (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_node::TICKS2WAIT)
 offload a task to the pipeline from the offloading thread (accelerator mode) More...
 
bool load_result (void **task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_node::TICKS2WAIT)
 gets a result from a task to the pipeline from the main thread (accelator mode) More...
 
bool load_result_nb (void **task)
 try to get a result from a task to the pipeline from the main thread (accelator mode) More...
 
double ffTime ()
 Misure ff::ff_node execution time. More...
 
- Public Member Functions inherited from ff::ff_node
virtual bool put (void *ptr)
 Nonblocking put onto output channel. More...
 
virtual bool get (void **ptr)
 Noblocking pop from input channel. More...
 
virtual FFBUFFER * get_in_buffer () const
 Gets input channel. More...
 
virtual FFBUFFER * get_out_buffer () const
 Gets pointer to the output channel. More...
 
virtual bool ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT))
 Sends out the task. More...
 

Protected Member Functions

void * svc (void *task)
 The service callback (should be filled by user with parallel activity business code) More...
 
int svc_init ()
 Service initialisation. More...
 
void svc_end ()
 Service finalisation. More...
 
int create_input_buffer (int nentries, bool fixedsize)
 Creates the input channel. More...
 
int create_output_buffer (int nentries, bool fixedsize=false)
 Creates the output channel. More...
 
int set_output_buffer (FFBUFFER *const o)
 Assign the output channelname to a channel. More...
 
- Protected Member Functions inherited from ff::ff_node
virtual void skipfirstpop (bool sk)
 Set the ff_node to start with no input task. More...
 
bool skipfirstpop () const
 Gets the status of spontaneous start. More...
 
virtual int set_input_buffer (FFBUFFER *const i)
 Assign the input channelname to a channel. More...
 
virtual double wffTime ()
 Misure ff_node::svc execution time. More...
 
virtual ~ff_node ()
 Destructor.
 

Related Functions

(Note that these are not member functions.)

int run_and_wait_end ()
 run the pipeline, waits that all stages received the End-Of-Stream (EOS), and destroy the pipeline run-time More...
 
virtual int run_then_freeze (bool skip_init=false)
 run the pipeline, waits that all stages received the End-Of-Stream (EOS), and suspend the pipeline run-time More...
 

Constructor & Destructor Documentation

ff::ff_pipeline::ff_pipeline ( bool  input_ch = false,
int  in_buffer_entries = DEF_IN_BUFF_ENTRIES,
int  out_buffer_entries = DEF_OUT_BUFF_ENTRIES,
bool  fixedsize = true 
)
inline

Constructor.

Parameters
input_chtrue set accelerator mode
in_buffer_entriesinput queue length
out_buffer_entriesoutput queue length
fixedsizetrue uses bound channels (SPSC queue)

Member Function Documentation

int ff::ff_pipeline::add_stage ( ff_node s)
inline

It adds a stage to the pipeline.

Parameters
sa ff_node (or derived, e.g. farm) object that is the stage to be added to the pipeline

References ff::svector< T >::push_back(), and ff::svector< T >::size().

int ff::ff_pipeline::create_input_buffer ( int  nentries,
bool  fixedsize 
)
inlineprotectedvirtual

Creates the input channel.

Parameters
nentriesthe number of elements of the buffer
fixedsizeflag to decide whether the buffer is bound or unbound. Default is true.
Returns
0 if successful, -1 otherwise

Reimplemented from ff::ff_node.

References ff::ff_node::get_in_buffer(), and ff::ff_node::set_input_buffer().

int ff::ff_pipeline::create_output_buffer ( int  nentries,
bool  fixedsize = false 
)
inlineprotectedvirtual

Creates the output channel.

Parameters
nentriesthe number of elements of the buffer
fixedsizeflag to decide whether the buffer is bound or unbound. Default is true.
Returns
0 if successful, -1 otherwise

Reimplemented from ff::ff_node.

References ff::ff_node::get_out_buffer(), ff::ff_node::set_output_buffer(), and ff::svector< T >::size().

double ff::ff_pipeline::ffTime ( )
inlinevirtual

Misure ff::ff_node execution time.

Returns
time (ms)

Reimplemented from ff::ff_node.

References ff::svector< T >::size().

bool ff::ff_pipeline::load_result ( void **  task,
unsigned int  retry = ((unsigned int)-1),
unsigned int  ticks = ff_node::TICKS2WAIT 
)
inline

gets a result from a task to the pipeline from the main thread (accelator mode)

Total call: return when a result is available. To be used in accelerator mode only

Parameters
[out]task
retrynumber of attempts to get a result before failing (related to nonblocking get from channel - expert use only)
ticksnumber of clock cycles between successive attempts (related to nonblocking get from channel - expert use only)
Returns
true is a task is returned, false if End-Of-Stream (EOS)

References ff::ff_node::get_out_buffer().

bool ff::ff_pipeline::load_result_nb ( void **  task)
inline

try to get a result from a task to the pipeline from the main thread (accelator mode)

Partial call: can return no result. To be used in accelerator mode only

Parameters
[out]task
Returns
true is a task is returned (including EOS), false if no task is returned

References ff::ff_node::get_out_buffer().

bool ff::ff_pipeline::offload ( void *  task,
unsigned int  retry = ((unsigned int)-1),
unsigned int  ticks = ff_node::TICKS2WAIT 
)
inline

offload a task to the pipeline from the offloading thread (accelerator mode)

Offload a task onto a pipeline accelerator, tipically the offloading entity is the main thread (even if it can be used from any ff_node::svc method)

Note
to be used in accelerator mode only

References ff::ff_node::get_in_buffer().

int ff::ff_pipeline::run ( bool  skip_init = false)
inlinevirtual

Run the pipeline skeleton asynchronously.

Run the pipeline, the method call return immediately. To be coupled with ff_pipeline::wait()

Reimplemented from ff::ff_node.

References ff::svector< T >::size().

int ff::ff_pipeline::set_output_buffer ( FFBUFFER *const  o)
inlineprotectedvirtual

Assign the output channelname to a channel.

Attach the output of a ff_node to an existing channel, typically the input channel of another ff_node

Parameters
oreference to a channel of type FFBUFFER
Returns
0 if successful, -1 otherwise

Reimplemented from ff::ff_node.

References ff::svector< T >::size().

void* ff::ff_pipeline::svc ( void *  task)
inlineprotectedvirtual

The service callback (should be filled by user with parallel activity business code)

Parameters
taskis a the input data stream item pointer (task)
Returns
output data stream item pointer

Implements ff::ff_node.

void ff::ff_pipeline::svc_end ( )
inlineprotectedvirtual

Service finalisation.

Called after EOS arrived (logical termination) but before shutdding down runtime support (can be useful for housekeeping)

Reimplemented from ff::ff_node.

int ff::ff_pipeline::svc_init ( )
inlineprotectedvirtual

Service initialisation.

Called after run-time initialisation (e.g. thread spawning) but before to start to get items from input stream (can be useful for initialisation of parallel activities, e.g. manual thread pinning that cannot be done in the costructor because threads stil do not exist).

Returns
0

Reimplemented from ff::ff_node.

int ff::ff_pipeline::wait_freezing ( )
inlinevirtual

wait for pipeline to complete and suspend (all stages received EOS)

Should be coupled with ???

Reimplemented from ff::ff_node.

References ff::svector< T >::size().

int ff::ff_pipeline::wrap_around ( bool  multi_input = false)
inline

Feedback channel (pattern modifier)

The last stage output stream will be connected to the first stage input stream in a cycle (feedback channel)

References create_input_buffer(), create_output_buffer(), ff::ff_node::get_in_buffer(), ff::svector< T >::push_back(), set_output_buffer(), and ff::svector< T >::size().

Friends And Related Function Documentation

int run_and_wait_end ( )
related

run the pipeline, waits that all stages received the End-Of-Stream (EOS), and destroy the pipeline run-time

Blocking behaviour w.r.t. main thread to be clarified

References run(), and wait().

virtual int run_then_freeze ( bool  skip_init = false)
related

run the pipeline, waits that all stages received the End-Of-Stream (EOS), and suspend the pipeline run-time

Run-time threads are suspended by way of a distrubuted protocol. The same pipeline can be re-started by calling again run_then_freeze

Function Documentation

template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
ff::ff_farm< lb_t, gt_t >::ff_farm ( std::vector< ff_node * > &  W,
ff_node *const  Emitter = NULL,
ff_node *const  Collector = NULL,
bool  input_ch = false 
)
inline

Core patterns constructor 2.

This is a constructor at core patterns level

Parameters
Wvector of workers
Emitterpointer to Emitter object (mandatory)
Collectorpointer to Collector object (optional)
input_chtrue for enabling the input stream
template<typename lb_t = ff_loadbalancer, typename gt_t = ff_gatherer>
ff::ff_farm< lb_t, gt_t >::ff_farm ( bool  input_ch = false,
int  in_buffer_entries = DEF_IN_BUFF_ENTRIES,
int  out_buffer_entries = DEF_OUT_BUFF_ENTRIES,
bool  worker_cleanup = false,
int  max_num_workers = DEF_MAX_NUM_WORKERS,
bool  fixedsize = false 
)
inline

Core patterns constructor 1.

This is a constructor at core patterns level. To be coupled with add_worker, add_emitter, and add_collector

Parameters
input_ch= true to set accelerator mode
in_buffer_entries= input queue length
out_buffer_entries= output queue length
max_num_workers= highest number of farm's worker
worker_cleanup= true deallocate worker object at exit
fixedsize= true uses only fixed size queue