FastFlow
SVN-r182-Aug-14-2014
A high-level, lock-less, parallel programming (shared-memory) and distributed programming (distributed-memory) framework for multi-cores and many-cores systems
|
Core patterns (basic streaming patterns) More...
Files | |
file | allocator.hpp |
Implementations of the FastFlow's lock-free allocator. | |
file | farm.hpp |
Farm pattern. | |
file | pipeline.hpp |
This file implements the pipeline skeleton, both in the high-level pattern syntax (ff::ff_pipe) and low-level syntax (ff::ff_pipeline) | |
Classes | |
class | ff::FFAllocator |
A user-space parallel allocator (process-wide) More... | |
class | ff::ff_farm< lb_t, gt_t > |
The Farm skeleton, with Emitter (lb_t ) and Collector (gt_t ). More... | |
class | ff::ff_ofarm |
The ordered Farm skeleton. More... | |
class | ff::ff_pipeline |
The Pipeline skeleton (low-level syntax) More... | |
Functions | |
ff::ff_farm< lb_t, gt_t >::ff_farm (std::vector< ff_node * > &W, ff_node *const Emitter=NULL, ff_node *const Collector=NULL, bool input_ch=false) | |
Core patterns constructor 2. More... | |
ff::ff_farm< lb_t, gt_t >::ff_farm (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool worker_cleanup=false, int max_num_workers=DEF_MAX_NUM_WORKERS, bool fixedsize=false) | |
Core patterns constructor 1. More... | |
Core patterns (basic streaming patterns)
Basic streaming patterns: pipeline, farm and loopback qualifier
class ff::FFAllocator |
A user-space parallel allocator (process-wide)
Based on the ff_allocator
, the FFAllocator might be used by any number of ff:ff_node (i.e. threads) to dynamically allocate/deallocate memory. It consists on a network of ff_allocator
per ff_node. It uses ff::uSWSR_Ptr_Buffer to avoid deadlocks.
FFAllocator will be created as a static object on the first call to FFAllocator::instance()
Usage example:
Public Member Functions | |
FFAllocator (int delayedReclaim=0) | |
Constructor. More... | |
~FFAllocator () | |
Destructor. More... | |
void * | malloc (size_t size) |
malloc More... | |
int | posix_memalign (void **memptr, size_t alignment, size_t size) |
ff posix_memalign More... | |
void | free (void *ptr) |
free More... | |
void * | realloc (void *ptr, size_t newsize) |
realloc. More... | |
Static Public Member Functions | |
static FFAllocator * | instance () |
Returns an instance of the FFAllocator object. | |
|
inline |
Constructor.
delayedReclaim | Deferred reclamation configuration |
|
inline |
Destructor.
Delete the allocator and return pre-allocated memory to the OS
|
inline |
free
free the requested memory chunk
ptr | a pointer to the buffer. |
|
inline |
malloc
Request to allocate size
bytes. If the size is too large, use OS malloc to get a new chunk of memory. Otherwise use the ff_allocator on the calling ff_node
size | the size of memory requested |
|
inline |
ff posix_memalign
[out] | *memptr | pointer to a chunk of memory where the aligned memory will be returned |
alignment | allocation's base address is an exact multiple of alignment , which must be a power of 2 at least as large as sizeof(void *) | |
size | the size of memory requested. |
|
inline |
realloc.
It changes the size of the memory block pointed to by ptr
to newsize
bytes.
ptr | pointer to the buffer |
newsize | the new size. |
class ff::ff_farm |
The Farm skeleton, with Emitter (lb_t
) and Collector (gt_t
).
The Farm skeleton can be seen as a 3-stages pipeline. The first stage is the Emitter (lb_t) that act as a load-balancer; the last (optional) stage would be the Collector (gt_t) that gathers the results computed by the Workers, which are ff_nodes.
This class is defined in farm.hpp
Public Member Functions | |
template<typename T > | |
ff_farm (const std::function< T *(T *, ff_node *const)> &F, int nw, bool input_ch=false) | |
High-level pattern constructor. | |
ff_farm (std::vector< ff_node * > &W, ff_node *const Emitter=NULL, ff_node *const Collector=NULL, bool input_ch=false) | |
Core patterns constructor 2. More... | |
ff_farm (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool worker_cleanup=false, int max_num_workers=DEF_MAX_NUM_WORKERS, bool fixedsize=false) | |
Core patterns constructor 1. More... | |
~ff_farm () | |
Destructor. More... | |
int | add_emitter (ff_node *e) |
Adds the emitter. More... | |
void | set_scheduling_ondemand (const int inbufferentries=1) |
Set scheduling with on demand polity. More... | |
int | add_workers (std::vector< ff_node * > &w) |
Adds workers to the form. More... | |
int | add_collector (ff_node *c, bool outpresent=false) |
Adds the collector. More... | |
int | wrap_around (bool multi_input=false) |
Sets the feedback channel from the collector to the emitter. More... | |
int | remove_collector () |
Removes the collector. More... | |
int | run (bool skip_init=false) |
Execute the Farm. More... | |
virtual int | run_and_wait_end () |
Executs the farm and wait for workers to complete. More... | |
virtual int | run_then_freeze (ssize_t nw=-1) |
Executes the farm and then freeze. More... | |
int | wait () |
Puts the thread in waiting state. More... | |
int | wait_freezing () |
Waits for freezing. More... | |
bool | offload (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_loadbalancer::TICKS2WAIT) |
bool | load_result (void **task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_gatherer::TICKS2WAIT) |
Loads results into gatherer. More... | |
bool | load_result_nb (void **task) |
Loads result with non-blocking. More... | |
double | ffTime () |
Misure ff::ff_node execution time. More... | |
Public Member Functions inherited from ff::ff_node | |
virtual bool | put (void *ptr) |
Nonblocking put onto output channel. More... | |
virtual bool | get (void **ptr) |
Noblocking pop from input channel. More... | |
virtual FFBUFFER * | get_in_buffer () const |
Gets input channel. More... | |
virtual FFBUFFER * | get_out_buffer () const |
Gets pointer to the output channel. More... | |
virtual bool | ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT)) |
Sends out the task. More... | |
Protected Member Functions | |
void | skipfirstpop (bool sk) |
Set the ff_node to start with no input task. More... | |
void * | svc (void *task) |
svc method | |
int | svc_init () |
The svc_init method. | |
void | svc_end () |
The svc_end method. | |
int | create_input_buffer (int nentries, bool fixedsize) |
Creates the input buffer for the emitter node. More... | |
int | create_output_buffer (int nentries, bool fixedsize=false) |
Creates the output channel. More... | |
int | set_output_buffer (FFBUFFER *const o) |
Sets the output buffer of the collector. More... | |
ff_node * | getEmitter () |
Gets Emitter. More... | |
ff_node * | getCollector () |
Gets Collector. More... | |
Protected Member Functions inherited from ff::ff_node | |
bool | skipfirstpop () const |
Gets the status of spontaneous start. More... | |
virtual int | set_input_buffer (FFBUFFER *const i) |
Assign the input channelname to a channel. More... | |
virtual double | wffTime () |
Misure ff_node::svc execution time. More... | |
virtual | ~ff_node () |
Destructor. | |
|
inline |
Destructor.
Destruct the load balancer, the gatherer, all the workers
|
inline |
Adds the collector.
It adds the Collector filter to the farm skeleton. If no object is passed as a colelctor, than a default collector will be added (i.e. ff_gatherer). Note that it is not possible to add more than one collector.
c | Collector object |
outpresent | outstream? |
set_filter(x)
if successful, otherwise -1 is returned.
|
inline |
|
inline |
Adds workers to the form.
Add workers to the Farm. There is a limit to the number of workers that can be added to a Farm. This limit is set by default to 64. This limit can be augmented by passing the desired limit as the fifth parameter of the ff_farm
constructor.
w | a vector of ff_nodes which are Workers to be attached to the Farm. |
|
inlineprotectedvirtual |
Creates the input buffer for the emitter node.
This function redefines the ff_node's virtual method of the same name. It creates an input buffer for the Emitter node.
nentries | the size of the buffer |
fixedsize | flag to decide whether the buffer is resizable. |
Reimplemented from ff::ff_node.
|
inlineprotectedvirtual |
Creates the output channel.
nentries | the number of elements of the buffer |
fixedsize | flag to decide whether the buffer is bound or unbound. Default is true . |
Reimplemented from ff::ff_node.
|
inlinevirtual |
|
inlineprotected |
Gets Collector.
It returns a pointer to the collector.
NULL
|
inlineprotected |
Gets Emitter.
It returns a pointer to the emitter.
|
inline |
Loads results into gatherer.
It loads the results from the gatherer (if any).
task | is a void pointer |
retry | is the number of tries to load the results |
ticks | is the number of ticks to wait |
false
if EOS arrived or too many retries, true
if there is a new value
|
inline |
Loads result with non-blocking.
It loads the result with non-blocking situation.
task | is a void pointer |
FF_EOS
|
inline |
Offloads teh task to farm
It offloads the given task to the farm.
task | is a void pointer |
retry | showing the number of tries to offload |
ticks | is the number of ticks to wait |
true
if successful, otherwise false
|
inline |
Removes the collector.
It allows not to start the collector thread, whereas all worker's output buffer will be created as if it were present.
|
inlinevirtual |
Execute the Farm.
It executes the form.
skip_init | A booleon value showing if the initialization should be skipped |
Reimplemented from ff::ff_node.
|
inlinevirtual |
Executs the farm and wait for workers to complete.
It executes the farm and waits for all workers to complete their tasks.
|
inlinevirtual |
Executes the farm and then freeze.
It executs the form and then freezes the form. If workers are frozen, it is possible to wake up just a subset of them.
|
inlineprotectedvirtual |
Sets the output buffer of the collector.
This function redefines the ff_node's virtual method of the same name. Set the output buffer for the Collector.
o | a buffer object, which can be of type SWSR_Ptr_Buffer or uSWSR_Ptr_Buffer |
Reimplemented from ff::ff_node.
|
inline |
Set scheduling with on demand polity.
The default scheduling policy is round-robin, When there is a great computational difference among tasks the round-robin scheduling policy could lead to load imbalance in worker's workload (expecially with short stream length). The on-demand scheduling policy can guarantee a near optimal load balancing in lots of cases. Alternatively it is always possible to define a complete application-level scheduling by redefining the ff_loadbalancer class.
inbufferentries | sets the number of queue slot for one worker threads. |
|
inlineprotectedvirtual |
Set the ff_node to start with no input task.
Setting it to true let the ff_node
execute the svc
method spontaneusly before receiving a task on the input channel. skipfirstpop
makes it possible to define a "producer" node that starts the network.
sk | true start spontaneously (*task will be NULL) |
Reimplemented from ff::ff_node.
|
inlinevirtual |
Puts the thread in waiting state.
It puts the thread in waiting state.
Reimplemented from ff::ff_node.
|
inlinevirtual |
Waits for freezing.
It waits for thread to freeze.
Reimplemented from ff::ff_node.
|
inline |
Sets the feedback channel from the collector to the emitter.
This method allows to estabilish a feedback channel from the Collector to the Emitter. If the collector is present, than the collector output queue will be connected to the emitter input queue (feedback channel)
class ff::ff_ofarm |
The ordered Farm skeleton.
This class is defined in farm.hpp
Public Member Functions | |
ff_ofarm (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool worker_cleanup=false, int max_num_workers=DEF_MAX_NUM_WORKERS, bool fixedsize=false) | |
Constructor. More... | |
~ff_ofarm () | |
Destructor. More... | |
int | run (bool skip_init=false) |
run | |
Public Member Functions inherited from ff::ff_farm< ofarm_lb, ofarm_gt > | |
ff_farm (const std::function< T *(T *, ff_node *const)> &F, int nw, bool input_ch=false) | |
High-level pattern constructor. | |
ff_farm (std::vector< ff_node * > &W, ff_node *const Emitter=NULL, ff_node *const Collector=NULL, bool input_ch=false) | |
Core patterns constructor 2. More... | |
ff_farm (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool worker_cleanup=false, int max_num_workers=DEF_MAX_NUM_WORKERS, bool fixedsize=false) | |
Core patterns constructor 1. More... | |
~ff_farm () | |
Destructor. More... | |
int | add_emitter (ff_node *e) |
Adds the emitter. More... | |
void | set_scheduling_ondemand (const int inbufferentries=1) |
Set scheduling with on demand polity. More... | |
int | add_workers (std::vector< ff_node * > &w) |
Adds workers to the form. More... | |
int | add_collector (ff_node *c, bool outpresent=false) |
Adds the collector. More... | |
int | wrap_around (bool multi_input=false) |
Sets the feedback channel from the collector to the emitter. More... | |
int | remove_collector () |
Removes the collector. More... | |
int | run (bool skip_init=false) |
Execute the Farm. More... | |
virtual int | run_and_wait_end () |
Executs the farm and wait for workers to complete. More... | |
virtual int | run_then_freeze (ssize_t nw=-1) |
Executes the farm and then freeze. More... | |
int | wait () |
Puts the thread in waiting state. More... | |
int | wait_freezing () |
Waits for freezing. More... | |
bool | offload (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_loadbalancer::TICKS2WAIT) |
bool | load_result (void **task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_gatherer::TICKS2WAIT) |
Loads results into gatherer. More... | |
bool | load_result_nb (void **task) |
Loads result with non-blocking. More... | |
Public Member Functions inherited from ff::ff_node | |
virtual bool | put (void *ptr) |
Nonblocking put onto output channel. More... | |
virtual bool | get (void **ptr) |
Noblocking pop from input channel. More... | |
virtual FFBUFFER * | get_in_buffer () const |
Gets input channel. More... | |
virtual FFBUFFER * | get_out_buffer () const |
Gets pointer to the output channel. More... | |
virtual bool | ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT)) |
Sends out the task. More... | |
Additional Inherited Members | |
Protected Member Functions inherited from ff::ff_farm< ofarm_lb, ofarm_gt > | |
void * | svc (void *task) |
svc method | |
int | svc_init () |
The svc_init method. | |
void | svc_end () |
The svc_end method. | |
int | create_input_buffer (int nentries, bool fixedsize) |
Creates the input buffer for the emitter node. More... | |
int | set_output_buffer (FFBUFFER *const o) |
Sets the output buffer of the collector. More... | |
ff_node * | getEmitter () |
Gets Emitter. More... | |
ff_node * | getCollector () |
Gets Collector. More... | |
Protected Member Functions inherited from ff::ff_node | |
bool | skipfirstpop () const |
Gets the status of spontaneous start. More... | |
virtual int | set_input_buffer (FFBUFFER *const i) |
Assign the input channelname to a channel. More... | |
virtual double | wffTime () |
Misure ff_node::svc execution time. More... | |
virtual | ~ff_node () |
Destructor. | |
|
inline |
Constructor.
input_ch | states the presence of input channel |
in_buffer_entries | defines the number of input entries |
worker_cleanup | states the cleaning of workers |
max_num_workers | defines the maximum number of workers in the ordered farm |
fixedsize | states the status of fixed size buffer |
References ff::ff_farm< ofarm_lb, ofarm_gt >::add_collector(), and ff::ff_farm< ofarm_lb, ofarm_gt >::add_emitter().
|
inline |
Destructor.
It clean emitter and collector.
class ff::ff_pipeline |
The Pipeline skeleton (low-level syntax)
Public Member Functions | |
ff_pipeline (bool input_ch=false, int in_buffer_entries=DEF_IN_BUFF_ENTRIES, int out_buffer_entries=DEF_OUT_BUFF_ENTRIES, bool fixedsize=true) | |
Constructor. More... | |
~ff_pipeline () | |
Destructor. | |
int | add_stage (ff_node *s) |
It adds a stage to the pipeline. More... | |
int | wrap_around (bool multi_input=false) |
Feedback channel (pattern modifier) More... | |
int | run (bool skip_init=false) |
Run the pipeline skeleton asynchronously. More... | |
int | wait () |
wait for pipeline termination (all stages received EOS) | |
int | wait_freezing () |
wait for pipeline to complete and suspend (all stages received EOS) More... | |
bool | offload (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_node::TICKS2WAIT) |
offload a task to the pipeline from the offloading thread (accelerator mode) More... | |
bool | load_result (void **task, unsigned int retry=((unsigned int)-1), unsigned int ticks=ff_node::TICKS2WAIT) |
gets a result from a task to the pipeline from the main thread (accelator mode) More... | |
bool | load_result_nb (void **task) |
try to get a result from a task to the pipeline from the main thread (accelator mode) More... | |
double | ffTime () |
Misure ff::ff_node execution time. More... | |
Public Member Functions inherited from ff::ff_node | |
virtual bool | put (void *ptr) |
Nonblocking put onto output channel. More... | |
virtual bool | get (void **ptr) |
Noblocking pop from input channel. More... | |
virtual FFBUFFER * | get_in_buffer () const |
Gets input channel. More... | |
virtual FFBUFFER * | get_out_buffer () const |
Gets pointer to the output channel. More... | |
virtual bool | ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT)) |
Sends out the task. More... | |
Protected Member Functions | |
void * | svc (void *task) |
The service callback (should be filled by user with parallel activity business code) More... | |
int | svc_init () |
Service initialisation. More... | |
void | svc_end () |
Service finalisation. More... | |
int | create_input_buffer (int nentries, bool fixedsize) |
Creates the input channel. More... | |
int | create_output_buffer (int nentries, bool fixedsize=false) |
Creates the output channel. More... | |
int | set_output_buffer (FFBUFFER *const o) |
Assign the output channelname to a channel. More... | |
Protected Member Functions inherited from ff::ff_node | |
virtual void | skipfirstpop (bool sk) |
Set the ff_node to start with no input task. More... | |
bool | skipfirstpop () const |
Gets the status of spontaneous start. More... | |
virtual int | set_input_buffer (FFBUFFER *const i) |
Assign the input channelname to a channel. More... | |
virtual double | wffTime () |
Misure ff_node::svc execution time. More... | |
virtual | ~ff_node () |
Destructor. | |
Related Functions | |
(Note that these are not member functions.) | |
int | run_and_wait_end () |
run the pipeline, waits that all stages received the End-Of-Stream (EOS), and destroy the pipeline run-time More... | |
virtual int | run_then_freeze (bool skip_init=false) |
run the pipeline, waits that all stages received the End-Of-Stream (EOS), and suspend the pipeline run-time More... | |
|
inline |
Constructor.
input_ch | true set accelerator mode |
in_buffer_entries | input queue length |
out_buffer_entries | output queue length |
fixedsize | true uses bound channels (SPSC queue) |
|
inline |
It adds a stage to the pipeline.
s | a ff_node (or derived, e.g. farm) object that is the stage to be added to the pipeline |
References ff::svector< T >::push_back(), and ff::svector< T >::size().
|
inlineprotectedvirtual |
Creates the input channel.
nentries | the number of elements of the buffer |
fixedsize | flag to decide whether the buffer is bound or unbound. Default is true . |
Reimplemented from ff::ff_node.
References ff::ff_node::get_in_buffer(), and ff::ff_node::set_input_buffer().
|
inlineprotectedvirtual |
Creates the output channel.
nentries | the number of elements of the buffer |
fixedsize | flag to decide whether the buffer is bound or unbound. Default is true . |
Reimplemented from ff::ff_node.
References ff::ff_node::get_out_buffer(), ff::ff_node::set_output_buffer(), and ff::svector< T >::size().
|
inlinevirtual |
Misure ff::ff_node execution time.
Reimplemented from ff::ff_node.
References ff::svector< T >::size().
|
inline |
gets a result from a task to the pipeline from the main thread (accelator mode)
Total call: return when a result is available. To be used in accelerator mode only
[out] | task | |
retry | number of attempts to get a result before failing (related to nonblocking get from channel - expert use only) | |
ticks | number of clock cycles between successive attempts (related to nonblocking get from channel - expert use only) |
true
is a task is returned, false
if End-Of-Stream (EOS) References ff::ff_node::get_out_buffer().
|
inline |
try to get a result from a task to the pipeline from the main thread (accelator mode)
Partial call: can return no result. To be used in accelerator mode only
[out] | task |
true
is a task is returned (including EOS), false
if no task is returned References ff::ff_node::get_out_buffer().
|
inline |
offload a task to the pipeline from the offloading thread (accelerator mode)
Offload a task onto a pipeline accelerator, tipically the offloading entity is the main thread (even if it can be used from any ff_node::svc method)
References ff::ff_node::get_in_buffer().
|
inlinevirtual |
Run the pipeline skeleton asynchronously.
Run the pipeline, the method call return immediately. To be coupled with ff_pipeline::wait()
Reimplemented from ff::ff_node.
References ff::svector< T >::size().
|
inlineprotectedvirtual |
Assign the output channelname to a channel.
Attach the output of a ff_node
to an existing channel, typically the input channel of another ff_node
o | reference to a channel of type FFBUFFER |
Reimplemented from ff::ff_node.
References ff::svector< T >::size().
|
inlineprotectedvirtual |
The service callback (should be filled by user with parallel activity business code)
task | is a the input data stream item pointer (task) |
Implements ff::ff_node.
|
inlineprotectedvirtual |
Service finalisation.
Called after EOS arrived (logical termination) but before shutdding down runtime support (can be useful for housekeeping)
Reimplemented from ff::ff_node.
|
inlineprotectedvirtual |
Service initialisation.
Called after run-time initialisation (e.g. thread spawning) but before to start to get items from input stream (can be useful for initialisation of parallel activities, e.g. manual thread pinning that cannot be done in the costructor because threads stil do not exist).
Reimplemented from ff::ff_node.
|
inlinevirtual |
wait for pipeline to complete and suspend (all stages received EOS)
Should be coupled with ???
Reimplemented from ff::ff_node.
References ff::svector< T >::size().
|
inline |
Feedback channel (pattern modifier)
The last stage output stream will be connected to the first stage input stream in a cycle (feedback channel)
References create_input_buffer(), create_output_buffer(), ff::ff_node::get_in_buffer(), ff::svector< T >::push_back(), set_output_buffer(), and ff::svector< T >::size().
|
related |
|
related |
run the pipeline, waits that all stages received the End-Of-Stream (EOS), and suspend the pipeline run-time
Run-time threads are suspended by way of a distrubuted protocol. The same pipeline can be re-started by calling again run_then_freeze
|
inline |
Core patterns constructor 2.
This is a constructor at core patterns level
W | vector of workers |
Emitter | pointer to Emitter object (mandatory) |
Collector | pointer to Collector object (optional) |
input_ch | true for enabling the input stream |
|
inline |
Core patterns constructor 1.
This is a constructor at core patterns level. To be coupled with add_worker
, add_emitter
, and add_collector
input_ch | = true to set accelerator mode |
in_buffer_entries | = input queue length |
out_buffer_entries | = output queue length |
max_num_workers | = highest number of farm's worker |
worker_cleanup | = true deallocate worker object at exit |
fixedsize | = true uses only fixed size queue |