FastFlow
SVN-r182-Aug-14-2014
A high-level, lock-less, parallel programming (shared-memory) and distributed programming (distributed-memory) framework for multi-cores and many-cores systems
|
Run-time support building blocks. More...
Files | |
file | allocator.hpp |
Implementations of the FastFlow's lock-free allocator. | |
file | buffer.hpp |
This file contains the definition of the bounded SPSC channel buffer used in FastFlow. | |
file | dnode.hpp |
Contains the definition of the ff_dnode class, which is an extension of the base class ff_node , with features oriented to distributed systems. | |
file | farm.hpp |
Farm pattern. | |
file | gt.hpp |
Farm Collector (it is not a ff_node) | |
file | lb.hpp |
Farm Emitter (not a ff_node) | |
file | node.hpp |
FastFlow ff_node. | |
file | oclnode.hpp |
FastFlow OpenCL interface node. | |
file | spin-lock.hpp |
This file contains several alternative spin lock(s) implementations that can be used as FastFlow spin-lock. | |
file | ubuffer.hpp |
This file contains the definition of the unbounded SWSR circular buffer used in FastFlow. | |
Classes | |
class | ff::ofarm_lb |
Ordered farm emitter. More... | |
class | ff::ofarm_gt |
Ordered farm Collector. More... | |
class | ff::ff_minode |
Multiple input ff_node (the MPSC mediator) More... | |
class | ff::ff_monode |
Multiple output ff_node (the SPMC mediator) More... | |
class | ff::ff_allocator |
The ff_allocator, based on the idea of the Slab allocator More... | |
class | ff::SWSR_Ptr_Buffer |
SPSC bound channel (Single-Writer/Single-Reader) More... | |
class | ff::ff_dinout< CommImplIn, CommImplOut > |
A ff::ff_dnode serving both Input and Output to the network. More... | |
class | ff::ff_dnode< CommImpl > |
Defines ff::ff_dnode. More... | |
class | ff::ff_gatherer |
A class representing the Collector node in a Farm skeleton. More... | |
class | ff::ff_loadbalancer |
A class representing the Emitter node in a typical Farm skeleton. More... | |
class | ff::Barrier |
Blocking barrier - Used only to start all nodes synchronously. More... | |
class | ff::spinBarrier |
Nonblocking barrier - Used only to start all nodes synchronously. More... | |
class | ff::ff_node |
The FastFlow abstract contanier for a parallel activity (actor). More... | |
class | ff::ff_oclNode |
OpenCL implementation of FastFlow node. More... | |
class | ff::uSWSR_Ptr_Buffer |
Unbounded Single-Writer/Single-Reader buffer (FastFlow unbound channel) More... | |
Run-time support building blocks.
Set up of Parallel activities, synchronisations, communications, ...
class ff::ofarm_lb |
Ordered farm emitter.
This class is defined in farm.hpp
Public Member Functions | |
ofarm_lb (int max_num_workers) | |
Constructor. More... | |
Public Member Functions inherited from ff::ff_loadbalancer | |
ff_loadbalancer (size_t max_num_workers) | |
Default constructor. More... | |
virtual | ~ff_loadbalancer () |
Destructor. More... | |
int | set_filter (ff_node *f) |
Sets filter node. More... | |
void | set_in_buffer (FFBUFFER *const buff) |
Sets input buffer. More... | |
FFBUFFER * | get_in_buffer () const |
Gets input buffer. More... | |
ssize_t | get_channel_id () const |
Gets channel id. More... | |
void | reset_channel_id () |
Resets the channel id. More... | |
void | reset () |
Resets input buffer. More... | |
size_t | getnworkers () const |
Get the number of workers. More... | |
size_t | getNWorkers () const |
Get the number of workers. More... | |
void | skipfirstpop () |
Skips first pop. More... | |
int | set_masterworker () |
Decides master-worker schema. More... | |
int | set_input (svector< ff_node * > &mi) |
Sets multiple input buffers. More... | |
void | broadcast_task (void *task) |
Send the same task to all workers. More... | |
bool | masterworker () const |
Gets master worker. More... | |
int | register_worker (ff_node *w) |
Registers the given node into the workers' list. More... | |
int | deregister_worker () |
Deregister worker. More... | |
virtual void * | svc (void *) |
Load balances task. More... | |
virtual int | svc_init () |
Initializes the load balancer task. More... | |
virtual void | svc_end () |
Finalizes the loadbalancer task. More... | |
int | runlb (bool=false, ssize_t nw=-1) |
Runs the loadbalancer. More... | |
virtual int | run (bool=false) |
Spawns workers threads. More... | |
int | waitlb () |
Waits for load balancer. More... | |
virtual int | wait () |
Waits for workers to finish their task. More... | |
virtual int | wait_freezing () |
Waits for freezing. More... | |
int | wait_freezing (const size_t n) |
Waits for freezing for one single worker thread. | |
void | stop () |
Stops the thread. More... | |
void | freeze () |
Freezes all threads registered with the lb and the lb itself. More... | |
void | freeze (const size_t n) |
Freezes one worker thread. | |
virtual void | thaw (bool _freeze=false, ssize_t nw=-1) |
Thaws all threads register with the lb and the lb itself. | |
void | thaw (const size_t n, bool _freeze=false) |
Thaws one single worker thread. | |
virtual double | ffTime () |
FastFlow start timing. More... | |
virtual double | wffTime () |
FastFlow finish timing. More... | |
Protected Member Functions | |
size_t | selectworker () |
Get selected worker. More... | |
Protected Member Functions inherited from ff::ff_loadbalancer | |
void | push_eos (bool nofreeze=false) |
Pushes EOS to the worker. More... | |
virtual size_t | ntentative () |
Gets the number of tentatives before wasting some times. More... | |
virtual void | losetime_out () |
Loses some time before sending the message to output buffer. More... | |
virtual void | losetime_in () |
Loses time before retrying to get a message from the input buffer. More... | |
virtual bool | schedule_task (void *task, unsigned int retry=(unsigned)-1, unsigned int ticks=0) |
Scheduling of tasks. More... | |
virtual std::deque< ff_node * > ::iterator | collect_task (void **task, std::deque< ff_node * > &availworkers, std::deque< ff_node * >::iterator &start) |
Collects tasks. More... | |
bool | pop (void **task) |
Pop a task from buffer. More... | |
Additional Inherited Members | |
Static Protected Member Functions inherited from ff::ff_loadbalancer | |
static bool | ff_send_out_emitter (void *task, unsigned int retry, unsigned int ticks, void *obj) |
Task scheduler. More... | |
|
inline |
Constructor.
max_num_workers | defines the maximum number of workers. |
|
inlineprotectedvirtual |
Get selected worker.
Inspect which worker has been selected
Reimplemented from ff::ff_loadbalancer.
class ff::ofarm_gt |
Public Member Functions | |
ofarm_gt (int max_num_workers) | |
Constructor. More... | |
Public Member Functions inherited from ff::ff_gatherer | |
ff_gatherer (int max_num_workers) | |
Constructor. More... | |
int | set_filter (ff_node *f) |
Sets the filer. More... | |
void | set_out_buffer (FFBUFFER *const buff) |
Sets output buffer. More... | |
ssize_t | get_channel_id () const |
Gets the channel id. More... | |
size_t | getnworkers () const |
Gets the number of worker threads currently running. More... | |
size_t | getNWorkers () const |
Get the number of workers. More... | |
void | skipfirstpop () |
Skips the first pop. More... | |
FFBUFFER * | get_out_buffer () const |
Gets the ouput buffer. More... | |
int | register_worker (ff_node *w) |
Register the given worker to the list of workers. More... | |
virtual int | svc_init () |
Initializes the gatherer task. More... | |
virtual void * | svc (void *) |
The gatherer task. More... | |
virtual void | svc_end () |
Finializes the gatherer. More... | |
int | run (bool=false) |
Execute the gatherer task. More... | |
virtual int | all_gather (void *task, void **V) |
It gathers all tasks. More... | |
void | thaw (bool _freeze=false, ssize_t nw=-1) |
Thaws all threads register with the gt and the gt itself. | |
void | reset () |
Resets output buffer. More... | |
virtual double | ffTime () |
Start counting time. More... | |
virtual double | wffTime () |
Complete counting time. More... | |
Protected Member Functions | |
ssize_t | selectworker () |
Selects a worker. More... | |
Protected Member Functions inherited from ff::ff_gatherer | |
virtual void | notifyeos (int id) |
Notifies the EOS. More... | |
virtual size_t | ntentative () |
Gets the number of tentatives. More... | |
virtual void | losetime_out () |
Loses the time out. More... | |
virtual void | losetime_in () |
Loses the time in. More... | |
virtual ssize_t | gather_task (void **task) |
It gathers the tasks. More... | |
void | push (void *task) |
Pushes the task in the tasks queue. More... | |
bool | pop (void **task) |
Pop a task out of the queue. More... | |
bool | pop_nb (void **task) |
Pop a tak from un unbounded queue. More... | |
|
inline |
Constructor.
max_num_workers | defines the maximum number of workers. |
References ff::svector< T >::resize().
|
inlineprotectedvirtual |
Selects a worker.
It gets the next worker using the Round Robin policy. The selected worker has to be alive (and kicking).
Reimplemented from ff::ff_gatherer.
class ff::ff_minode |
Multiple input ff_node (the MPSC mediator)
The ff_node with many input channels.
This class is defined in farm.hpp
Public Member Functions | |
ff_minode (int max_num_workers=ff_farm<>::DEF_MAX_NUM_WORKERS) | |
Constructor. | |
virtual | ~ff_minode () |
Destructor. | |
virtual int | set_input (svector< ff_node * > &w) |
Assembly input channels. More... | |
virtual int | set_input (ff_node *node) |
Assembly a input channel. More... | |
void | skipfirstpop (bool sk) |
Skip first pop. More... | |
int | run (bool skip_init=false) |
run More... | |
int | wait_freezing () |
Wait the freezing state. More... | |
ssize_t | get_channel_id () const |
Gets the channel id from which the data has just been received. | |
Public Member Functions inherited from ff::ff_node | |
virtual void * | svc (void *task)=0 |
The service callback (should be filled by user with parallel activity business code) More... | |
virtual int | svc_init () |
Service initialisation. More... | |
virtual void | svc_end () |
Service finalisation. More... | |
virtual bool | put (void *ptr) |
Nonblocking put onto output channel. More... | |
virtual bool | get (void **ptr) |
Noblocking pop from input channel. More... | |
virtual FFBUFFER * | get_in_buffer () const |
Gets input channel. More... | |
virtual FFBUFFER * | get_out_buffer () const |
Gets pointer to the output channel. More... | |
virtual bool | ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT)) |
Sends out the task. More... | |
Protected Member Functions | |
int | cardinality (BARRIER_T *const barrier) |
Gets the number of input channels. | |
int | create_input_buffer (int nentries, bool fixedsize=true) |
Creates the input channels. More... | |
int | wait () |
Wait ff_node termination. More... | |
Protected Member Functions inherited from ff::ff_node | |
bool | skipfirstpop () const |
Gets the status of spontaneous start. More... | |
virtual int | create_output_buffer (int nentries, bool fixedsize=false) |
Creates the output channel. More... | |
virtual int | set_output_buffer (FFBUFFER *const o) |
Assign the output channelname to a channel. More... | |
virtual int | set_input_buffer (FFBUFFER *const i) |
Assign the input channelname to a channel. More... | |
virtual double | ffTime () |
Misure ff::ff_node execution time. More... | |
virtual double | wffTime () |
Misure ff_node::svc execution time. More... | |
virtual | ~ff_node () |
Destructor. | |
|
inlineprotectedvirtual |
Creates the input channels.
Reimplemented from ff::ff_node.
References ff::ff_node::create_input_buffer(), and ff::svector< T >::size().
|
inlinevirtual |
run
Reimplemented from ff::ff_node.
References ff::ff_gatherer::register_worker(), ff::ff_gatherer::run(), ff::ff_gatherer::set_filter(), ff::svector< T >::size(), ff::ff_gatherer::skipfirstpop(), and ff::ff_node::skipfirstpop().
Assembly input channels.
Assembly input channelnames to ff_node channels
Reimplemented from ff::ff_node.
|
inlinevirtual |
Assembly a input channel.
Assembly a input channelname to a ff_node channel
Reimplemented from ff::ff_node.
References ff::svector< T >::push_back().
|
inlinevirtual |
Skip first pop.
Set up spontaneous start
Reimplemented from ff::ff_node.
References ff::ff_node::skipfirstpop().
|
inlineprotectedvirtual |
|
inlinevirtual |
Wait the freezing state.
It will happen on EOS arrival on the input channel
Reimplemented from ff::ff_node.
class ff::ff_monode |
Multiple output ff_node (the SPMC mediator)
The ff_node with many output channels.
This class is defined in farm.hpp
Public Member Functions | |
ff_monode (int max_num_workers=ff_farm<>::DEF_MAX_NUM_WORKERS) | |
Constructor. More... | |
virtual | ~ff_monode () |
Destructor. | |
virtual int | set_output (svector< ff_node * > &w) |
Assembly the output channels. More... | |
virtual int | set_output (ff_node *node) |
Assembly an output channels. More... | |
void | skipfirstpop (bool sk) |
Skips the first pop. More... | |
bool | ff_send_out_to (void *task, int id) |
Sends one task to a specific node id. | |
int | run (bool skip_init=false) |
run More... | |
Public Member Functions inherited from ff::ff_node | |
virtual void * | svc (void *task)=0 |
The service callback (should be filled by user with parallel activity business code) More... | |
virtual int | svc_init () |
Service initialisation. More... | |
virtual void | svc_end () |
Service finalisation. More... | |
virtual bool | put (void *ptr) |
Nonblocking put onto output channel. More... | |
virtual bool | get (void **ptr) |
Noblocking pop from input channel. More... | |
virtual FFBUFFER * | get_in_buffer () const |
Gets input channel. More... | |
virtual FFBUFFER * | get_out_buffer () const |
Gets pointer to the output channel. More... | |
virtual bool | ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT)) |
Sends out the task. More... | |
Protected Member Functions | |
int | cardinality (BARRIER_T *const barrier) |
Cardinatlity. More... | |
int | wait () |
Wait ff_node termination. More... | |
Protected Member Functions inherited from ff::ff_node | |
bool | skipfirstpop () const |
Gets the status of spontaneous start. More... | |
virtual int | create_input_buffer (int nentries, bool fixedsize=true) |
Creates the input channel. More... | |
virtual int | create_output_buffer (int nentries, bool fixedsize=false) |
Creates the output channel. More... | |
virtual int | set_output_buffer (FFBUFFER *const o) |
Assign the output channelname to a channel. More... | |
virtual int | set_input_buffer (FFBUFFER *const i) |
Assign the input channelname to a channel. More... | |
virtual int | wait_freezing () |
Wait the freezing state. More... | |
virtual double | ffTime () |
Misure ff::ff_node execution time. More... | |
virtual double | wffTime () |
Misure ff_node::svc execution time. More... | |
virtual | ~ff_node () |
Destructor. | |
|
inline |
Constructor.
max_num_workers | defines the maximum number of workers |
|
inlineprotectedvirtual |
Cardinatlity.
Defines the cardinatlity of the FastFlow node.
barrier | defines the barrier |
Reimplemented from ff::ff_node.
|
inlinevirtual |
run
skip_init | defines if the initilization should be skipped |
Reimplemented from ff::ff_node.
References ff::ff_loadbalancer::register_worker(), ff::ff_loadbalancer::runlb(), ff::ff_loadbalancer::set_filter(), ff::svector< T >::size(), ff::ff_loadbalancer::skipfirstpop(), and ff::ff_node::skipfirstpop().
Assembly the output channels.
Attach output channelnames to ff_node channels
Reimplemented from ff::ff_node.
References ff::svector< T >::push_back(), and ff::svector< T >::size().
|
inlinevirtual |
Assembly an output channels.
Attach a output channelname to ff_node channel
Reimplemented from ff::ff_node.
References ff::svector< T >::push_back().
|
inlinevirtual |
Skips the first pop.
Set up spontaneous start
Reimplemented from ff::ff_node.
References ff::ff_loadbalancer::skipfirstpop().
|
inlineprotectedvirtual |
Wait ff_node termination.
Reimplemented from ff::ff_node.
References ff::ff_loadbalancer::waitlb().
class ff::ff_allocator |
The ff_allocator, based on the idea of the Slab allocator
A ff_allocator is owner by a single ff_node, which is the ff_node which it. The owner can
malloc/realloc/posix_memalign
. Other ff_node can free
provided they /p register4free. A ff_node can init more than one ff_allocator. Every violation cause unexpected results (e.g. segfault).
The ff_allocator works over the SlabCache and is tuned to outperform standard allocators' performance in a multi-threaded envirnoment. When it is initialised, it creates a (predefined) number of SlabCaches, each one containing a (predefined) number of buffers of different sizes, from 32 to 8192 bytes. The thread that calls first the allocator object and wants to allocate memory has to register itself to the shared leak queue. Only one thread can perform this operation. The allocator obtains memory from the pre-allocated SlabCache: if there is a slab (i.e. a buffer) big enough to contain an object of the requested size, the pointer to that buffer is passed to the thread that requested the memory. This latter thread has to register as well to the same shared leak queue, so that when it has finished its operations, it can return memory to the allocator thread.
Inherited by ff::ffa_wrapper.
Public Member Functions | |
ff_allocator (size_t max_size=0, const int delayedReclaim=0) | |
Default Constructor. | |
int | init (const int _nslabs[N_SLABBUFFER]=0, bool prealloc=true) |
init the allocator More... | |
int | registerAllocator () |
register ff_allocator (get ownership) More... | |
void | deregisterAllocator (bool reclaimMemory=true) |
de-register the ff_allocator (release ownership) More... | |
int | register4free () |
register for the free operation More... | |
void * | malloc (size_t size) |
malloc More... | |
int | posix_memalign (void **memptr, size_t alignment, size_t size) |
ff posix_memalign. More... | |
void | free (void *ptr) |
free More... | |
void * | realloc (void *ptr, size_t newsize) |
realloc More... | |
|
inline |
de-register the ff_allocator (release ownership)
Deregister the ff_allocator (i.e. release ownership) and reclaim allocated memory back to the allocator. Every ff_node can perform this action. No malloc/realloc/posix_memalign
requires than one ff_node own the ff_allocator
reclaimMemory | true if reclaim; false if deregister only |
|
inline |
free
free the requested memory chunk on a given ff::ff_allocator object.
The memory should have been allocated using malloc
on the same object (otherwise segfault), and the ff_node should have been ff::register4free()
on the calling ff::ff_node.
Wrong registering cause segfault or unspected behaviour.
ptr | a pointer to the buffer. |
|
inline |
init the allocator
Initialise the allocator. This method is called by one ff_node for each data-path. (e.g. the Emitter in a Farm skeleton). It creates a number of SlabCaches objects, a number specified by the N_SLABBUFFER
constant. The size of each created segment goes from 32 (the first one created) to 8192 (the last). Typically the number of buffers in a slab segment decreases as the size of the slab increases.
Default:
The number of nslabs is dynamically increased if needed recaliming more memory from OS. This is a reltevely slow and not lock-free path of the code.
_nslabs | an array specifying the allowed numbers of buffers in a SlabCache (overwrite nslabs_default ) |
prealloc | if use preallocated segments |
References ff::svector< T >::push_back().
|
inline |
malloc
Request to allocate size
bytes. If the size is too large, use OS malloc to get a new chunk of memory.
To be called on a ff::ff_allocator initialised via ff:ff_allocator.init()
on the calling ff::ff_node (owner). Each ff::ff_allocator object is owned by the ff::ff_node which registerAllocator
it. Only the owner can malloc on it, whereas other ff_node can free. More than one ff::ff_allocator per ff_node can be defined.
Violating ownership cause segfault or unspected behaviour.
size | the size of memory requested |
use standard allocator if the size is too big or we don't want to use the ff_allocator for that size
|
inline |
ff posix_memalign.
[out] | *memptr | pointer to a chunk of memory where the aligned memory will be returned |
alignment | allocation's base address is an exact multiple of alignment , which must be a power of 2 at least as large as sizeof(void *) | |
size | the size of memory requested. |
|
inline |
realloc
It changes the size of the memory block pointed to by ptr
to newsize
bytes. If the size is too large, use OS malloc to get a new chunk of memory.
To be called on a ff::ff_allocator initialised via registerAllocator()
on the calling ff::ff_node (owner). Each ff::ff_allocator object is owned by the ff::ff_node which registerAllocator
it. Only the owner can malloc/realloc on it, whereas other ff_node can free. More than one ff::ff_allocator per ff_node can be defined.
Violating ownership cause segfault or unspected behaviour.
size | the size of memory requested |
|
inline |
register for the free operation
Threads different from the allocator (i.e. those threads that do not allocate memory) have to register themselves to the shared buffer by calling this method. In this way they are provided with a chunk of memory. Since they are registered, once their taks terminates they free the memory assigned and that memory returns back to the allocator thread's buffer, so that it can be reused.
|
inline |
register ff_allocator (get ownership)
The ff_node that allocates memory have to call this method in order to register itself to the shared buffer. With this method, a ff_node is allowed to allocate memory. Only one ff_node can allocate memory.
class ff::SWSR_Ptr_Buffer |
SPSC bound channel (Single-Writer/Single-Reader)
This class describes the SWSR circular buffer, used in FastFlow to implement a lock-free (wait-free) bounded FIFO queue. No lock is needed around pop and push methods.
A single NULL value is used to indicate buffer full and buffer empty conditions.
This class is defined in buffer.hpp
Public Member Functions | |
SWSR_Ptr_Buffer (unsigned long n, const bool=true) | |
~SWSR_Ptr_Buffer () | |
bool | init (const bool startatlineend=false) |
bool | empty () |
bool | available () |
unsigned long | buffersize () const |
bool | push (void *const data) |
bool | multipush (void *const data[], int len) |
bool | pop (void **data) |
bool | inc () |
void * | top () const |
void | reset (const bool startatlineend=false) |
unsigned long | length () const |
|
inline |
Constructor.
n | the size of the buffer |
|
inline |
Default destructor
|
inline |
It returns true if there is at least one room in the buffer.
|
inline |
It returns the size of the buffer.
|
inline |
It returns true if the buffer is empty.
|
inline |
It is like pop but doesn't copy any data.
true
is alway returned.
|
inline |
It initialise the buffer. Allocate space (size
) of possibly aligned memory and reset the pointers (read pointer and write pointer) by placing them at the beginning of the buffer.
References reset().
|
inline |
It returns the length of the buffer (i.e. the actual number of elements it contains)
|
inline |
The multipush method, which pushes a batch of elements (array) in the queue. NOTE: len should be a multiple of longxCacheLine/sizeof(void*)
|
inline |
|
inline |
Push method: push the input value into the queue. A Write Memory Barrier (WMB) ensures that all previous memory writes are visible to the other processors before any later write is executed. This is an "expensive" memory fence operation needed in all the architectures with a weak-ordering memory model, where stores can be executed out-of-order (e.g. PowerPc). This is a no-op on Intel x86/x86-64 CPUs.
data | Element to be pushed in the buffer |
Write Memory Barrier: ensure all previous memory write are visible to the other processors before any later writes are executed. This is an "expensive" memory fence operation needed in all the architectures with a weak-ordering memory model where stores can be executed out-or-order (e.g. Powerpc). This is a no-op on Intel x86/x86-64 CPUs.
References available().
|
inline |
Reset the buffer and move read
and write
pointers to the beginning of the buffer (i.e. position 0). Also, the entire buffer is cleaned and set to 0
This is a good starting point if the multipush method will be used in order to reduce cache trashing.
|
inline |
It returns the "head" of the buffer, i.e. the element pointed by the read pointer (it is a FIFO queue, so push
on the tail and pop
from the head).
class ff::ff_dinout |
A ff::ff_dnode serving both Input and Output to the network.
A ff_dnode
with both input and output channels.
A ff_dinout
is actually a ff_dnode
with an extra communication channel (external channel), so that the dinout node is connected with the "external world" both with input and output channels.
It is implemented as a template class: the template type CommImplIn
refers to the input communication pattern, the type CommImplOut
refers to the output communication pattern.
This class is defined in dnode.hpp
Public Member Functions | |
virtual void | prepare (svector< iovec > &v, void *ptr, const int sender=-1) |
Prepares output message. More... | |
int | run (bool=false) |
Run the ff_node. More... | |
int | wait () |
Waits the thread to finish. | |
void | skipfirstpop (bool sk) |
Set the ff_node to start with no input task. More... | |
int | initIn (const std::string &name, const std::string &address, const int peers, typename CommImplIn::TransportImpl *const transp, const int nodeId=-1) |
Initializes input communication channels. More... | |
int | initOut (const std::string &name, const std::string &address, const int peers, typename CommImplOut::TransportImpl *const transp, const int nodeId=-1, dnode_cbk_t cbk=0) |
Initializes the output communicaiton channels. More... | |
Public Member Functions inherited from ff::ff_dnode< CommImplIn > | |
int | init (const std::string &name, const std::string &address, const int peers, typename CommImplIn::TransportImpl *const transp, const bool p, const int nodeId=-1, dnode_cbk_t cbk=0) |
Initializes distributed communication channel. More... | |
virtual void | prepare (svector< msg_t * > *&v, size_t len, const int sender=-1) |
Prepare a pool of messages. More... | |
virtual void | unmarshalling (svector< msg_t * > *const v[], const int vlen, void *&task) |
Unmarshalling. More... | |
void | setCallbackArg (void *arg) |
Sets call back arguments. More... | |
int | run (bool=false) |
Executes the FastFlow dnode. More... | |
int | wait () |
Waits the FastFlow node. More... | |
void | skipfirstpop (bool sk) |
Skips first pop. More... | |
Public Member Functions inherited from ff::ff_node | |
virtual void * | svc (void *task)=0 |
The service callback (should be filled by user with parallel activity business code) More... | |
virtual int | svc_init () |
Service initialisation. More... | |
virtual void | svc_end () |
Service finalisation. More... | |
virtual bool | put (void *ptr) |
Nonblocking put onto output channel. More... | |
virtual bool | get (void **ptr) |
Noblocking pop from input channel. More... | |
virtual FFBUFFER * | get_in_buffer () const |
Gets input channel. More... | |
virtual FFBUFFER * | get_out_buffer () const |
Gets pointer to the output channel. More... | |
virtual bool | ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT)) |
Sends out the task. More... | |
Protected Types | |
typedef CommImplOut::TransportImpl::msg_t | msg_t |
Protected Member Functions | |
ff_dnode () | |
Default constructor. | |
virtual | ~ff_dnode () |
Destructor: closes all connections. | |
virtual bool | push (void *ptr) |
push ff::ff_dnode -> ff::ff_node (shared-memory channel) More... | |
virtual bool | pop (void **ptr) |
pop ff::ff_node -> ff::ff_dnode (shared-memory channel) More... | |
virtual | ~ff_dinout () |
Destructor. More... | |
virtual bool | push (void *ptr) |
Pushes the task. More... | |
Protected Member Functions inherited from ff::ff_dnode< CommImplIn > | |
bool | isEos (const char data[msg_t::HEADER_LENGTH]) const |
Checks EOS. More... | |
ff_dnode () | |
Constructor. | |
Protected Member Functions inherited from ff::ff_node | |
bool | skipfirstpop () const |
Gets the status of spontaneous start. More... | |
virtual int | create_input_buffer (int nentries, bool fixedsize=true) |
Creates the input channel. More... | |
virtual int | create_output_buffer (int nentries, bool fixedsize=false) |
Creates the output channel. More... | |
virtual int | set_output_buffer (FFBUFFER *const o) |
Assign the output channelname to a channel. More... | |
virtual int | set_input_buffer (FFBUFFER *const i) |
Assign the input channelname to a channel. More... | |
virtual int | wait_freezing () |
Wait the freezing state. More... | |
virtual double | ffTime () |
Misure ff::ff_node execution time. More... | |
virtual double | wffTime () |
Misure ff_node::svc execution time. More... | |
virtual | ~ff_node () |
Destructor. | |
Additional Inherited Members | |
Public Types inherited from ff::ff_dnode< CommImplIn > | |
enum | |
Defines sender and receiver. | |
typedef CommImplIn::TransportImpl::msg_t | msg_t |
Static Protected Member Functions inherited from ff::ff_dnode< CommImplIn > | |
static void | freeMsg (void *data, void *arg) |
Callback to free the sent message. More... | |
static void | freeHdr (void *data, void *) |
Frees header. More... | |
|
protected |
Defines the output communication
|
inlineprotectedvirtual |
Destructor.
It closes all connections.
|
inline |
Initializes input communication channels.
It initializes all the input communication channels.
name | is the string to be passed through the communication channel |
address | is the address of the destination |
peers | are the number of peers involved in the communication |
transp | is the transport for the communication |
nodeID | is the identifer of the node |
com.init()
method.
|
inline |
Initializes the output communicaiton channels.
It initializes all the output communication channels.
name | is the string to be sent |
address | is the address of the destination |
peers | are the number of peers involve in the communication |
transp | is the transport address |
nodeId | is the identifier of the node |
cbk | is the callback |
comOut.init()
method.
|
inlineprotectedvirtual |
pop ff::ff_node -> ff::ff_dnode (shared-memory channel)
It overrides ff_node
's pop
method.
ptr | points to the data |
true
is returned, otherwise false
. Reimplemented from ff::ff_dnode< CommImplIn >.
|
inlinevirtual |
Prepares output message.
It is used to prepare (non contiguous) output messages
v | is a vector containing the pool of messages |
ptr | is pointer to the data |
sender | is the sender of the message |
Reimplemented from ff::ff_dnode< CommImplIn >.
|
inlineprotectedvirtual |
push ff::ff_dnode -> ff::ff_node (shared-memory channel)
It overrides ff_node's
push
method
ptr points to the data
Reimplemented from ff::ff_dnode< CommImplIn >.
|
inlineprotectedvirtual |
Pushes the task.
It overrides ff_dnode's
push
method.
ptr | points to the task |
internal_push()
method. Reimplemented from ff::ff_dnode< CommImplIn >.
|
inlinevirtual |
|
inlinevirtual |
Set the ff_node to start with no input task.
Setting it to true let the ff_node
execute the svc
method spontaneusly before receiving a task on the input channel. skipfirstpop
makes it possible to define a "producer" node that starts the network.
sk | true start spontaneously (*task will be NULL) |
Reimplemented from ff::ff_node.
class ff::ff_dnode |
Defines ff::ff_dnode.
This class represents the distributed version of ff_node
. A ff_dnode
is actually a ff_node
with an extra communication channel (external channel), which connects the edge-node of the graph with one or more edge-nodes of other FastFlow application graphs running on the same host or on different host(s).
It is implemented as a template: the template type CommImpl
refers to the communication pattern that the programmer wishes to use to connect different ff_dnodes
(i.e. unicast, broadcast, scatter, ondemand, fromAll, fromAny).
Public Types | |
enum | |
Defines sender and receiver. | |
typedef CommImpl::TransportImpl::msg_t | msg_t |
Public Member Functions | |
int | init (const std::string &name, const std::string &address, const int peers, typename CommImpl::TransportImpl *const transp, const bool p, const int nodeId=-1, dnode_cbk_t cbk=0) |
Initializes distributed communication channel. More... | |
virtual void | prepare (svector< iovec > &v, void *ptr, const int sender=-1) |
Prepares output message. More... | |
virtual void | prepare (svector< msg_t * > *&v, size_t len, const int sender=-1) |
Prepare a pool of messages. More... | |
virtual void | unmarshalling (svector< msg_t * > *const v[], const int vlen, void *&task) |
Unmarshalling. More... | |
void | setCallbackArg (void *arg) |
Sets call back arguments. More... | |
int | run (bool=false) |
Executes the FastFlow dnode. More... | |
int | wait () |
Waits the FastFlow node. More... | |
void | skipfirstpop (bool sk) |
Skips first pop. More... | |
Public Member Functions inherited from ff::ff_node | |
virtual void * | svc (void *task)=0 |
The service callback (should be filled by user with parallel activity business code) More... | |
virtual int | svc_init () |
Service initialisation. More... | |
virtual void | svc_end () |
Service finalisation. More... | |
virtual bool | put (void *ptr) |
Nonblocking put onto output channel. More... | |
virtual bool | get (void **ptr) |
Noblocking pop from input channel. More... | |
virtual FFBUFFER * | get_in_buffer () const |
Gets input channel. More... | |
virtual FFBUFFER * | get_out_buffer () const |
Gets pointer to the output channel. More... | |
virtual bool | ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT)) |
Sends out the task. More... | |
Protected Member Functions | |
bool | isEos (const char data[msg_t::HEADER_LENGTH]) const |
Checks EOS. More... | |
ff_dnode () | |
Constructor. | |
virtual | ~ff_dnode () |
Destructor. | |
virtual bool | push (void *ptr) |
push ff::ff_dnode -> ff::ff_node (shared-memory channel) More... | |
virtual bool | pop (void **ptr) |
pop ff::ff_node -> ff::ff_dnode (shared-memory channel) More... | |
Protected Member Functions inherited from ff::ff_node | |
bool | skipfirstpop () const |
Gets the status of spontaneous start. More... | |
virtual int | create_input_buffer (int nentries, bool fixedsize=true) |
Creates the input channel. More... | |
virtual int | create_output_buffer (int nentries, bool fixedsize=false) |
Creates the output channel. More... | |
virtual int | set_output_buffer (FFBUFFER *const o) |
Assign the output channelname to a channel. More... | |
virtual int | set_input_buffer (FFBUFFER *const i) |
Assign the input channelname to a channel. More... | |
virtual int | wait_freezing () |
Wait the freezing state. More... | |
virtual double | ffTime () |
Misure ff::ff_node execution time. More... | |
virtual double | wffTime () |
Misure ff_node::svc execution time. More... | |
virtual | ~ff_node () |
Destructor. | |
Static Protected Member Functions | |
static void | freeMsg (void *data, void *arg) |
Callback to free the sent message. More... | |
static void | freeHdr (void *data, void *) |
Frees header. More... | |
typedef CommImpl::TransportImpl::msg_t ff::ff_dnode< CommImpl >::msg_t |
Implements communication transport layer
|
inlinestaticprotected |
Frees header.
It frees the header of the message
data | is pointer to the header |
|
inlinestaticprotected |
Callback to free the sent message.
data | pointer to the data to be freed |
arg | arguments to be passed to the callback function |
|
inline |
Initializes distributed communication channel.
name | is the name of the channel |
address | is the address where to listen or connect to |
peers | is the number of peers |
transp | is the transport layer to be used |
p | is a flag saying whether the current dnode is the producer (p = true ) or the consumer (p = false ) w.r.t. the communication pattern used |
nodeId | is the ID of the node |
cbk | is the callback function that will be called once when a message just sent is no longer in use by the run-time |
com.init()
as an integer value
|
inlineprotected |
Checks EOS.
It checks if it is EOS of the message.
data | is the array of messages |
|
inlineprotectedvirtual |
pop ff::ff_node -> ff::ff_dnode (shared-memory channel)
It overrides ff_node
's pop
method.
ptr | points to the data |
true
is returned, otherwise false
. Reimplemented from ff::ff_node.
Reimplemented in ff::ff_dinout< CommImplIn, CommImplOut >, and ff::ff_dinout< COMM1, COMM2 >.
|
inlinevirtual |
Prepares output message.
It is used to prepare (non contiguous) output messages
v | is a vector containing the pool of messages |
ptr | is pointer to the data |
sender | is the sender of the message |
Reimplemented in ff::ff_dinout< CommImplIn, CommImplOut >, and ff::ff_dinout< COMM1, COMM2 >.
|
inlinevirtual |
Prepare a pool of messages.
It is used to give to the run-time a pool of messages on which input message frames will be received
len | is the number of input messages expected |
sender | is the sender of the message |
v | vector contains the pool of messages |
|
inlineprotectedvirtual |
push ff::ff_dnode -> ff::ff_node (shared-memory channel)
It overrides ff_node's
push
method
ptr points to the data
Reimplemented from ff::ff_node.
Reimplemented in ff::ff_dinout< CommImplIn, CommImplOut >, ff::ff_dinout< COMM1, COMM2 >, ff::ff_dinout< CommImplIn, CommImplOut >, and ff::ff_dinout< COMM1, COMM2 >.
|
inlinevirtual |
Executes the FastFlow dnode.
It runs the dnode
as a stand-alone thread. Typically, it should not be called by application code unless you want to have just a sequential dnode
.
run()
method. Reimplemented from ff::ff_node.
|
inline |
Sets call back arguments.
It is used to pass an additional parameter (the 2nd one) to the callback function. Typically it is called in the prepare method of the producer.
arg | points to the arguments to be pussed to the callback |
|
inlinevirtual |
Skips first pop.
It jumps the first pop from the input queue or from the input external channel. This is typically used in the first stage of a cyclic graph (e.g. the first stage of a torus pipeline).
sk | is boolean value showing whether skifirstpop should be skipped or not |
Reimplemented from ff::ff_node.
|
inlinevirtual |
Unmarshalling.
It is called once, when all frames composing the message have been received by the run-time. Within that method, it is possible to convert or re-arrange all the frames back to their original data or object layout.
v | is vector of messages |
vlen | is the length of the vector |
task | pointer to the task |
|
inlinevirtual |
Waits the FastFlow node.
It waits the thread to finish.
wait()
method. Reimplemented from ff::ff_node.
class ff::ff_gatherer |
A class representing the Collector node in a Farm skeleton.
This class models the gatherer
, which wraps all the methods and structures used by the Collector node in a Farm
skeleton. The farm
can be seen as a three-stages pipeline
, the stages being a ff_loadbalancer
called emitter, a pool of ff_node
called workers and - optionally - a ff_gatherer
called collector. The Collector node can be used to gather the results coming from the computations executed by the pool of workers. The collector can also be connected to the emitter node via a feedback channel, in order to create a farm-with-feedback
skeleton.
This class is defined in gt.hpp
Public Member Functions | |
ff_gatherer (int max_num_workers) | |
Constructor. More... | |
int | set_filter (ff_node *f) |
Sets the filer. More... | |
void | set_out_buffer (FFBUFFER *const buff) |
Sets output buffer. More... | |
ssize_t | get_channel_id () const |
Gets the channel id. More... | |
size_t | getnworkers () const |
Gets the number of worker threads currently running. More... | |
size_t | getNWorkers () const |
Get the number of workers. More... | |
void | skipfirstpop () |
Skips the first pop. More... | |
FFBUFFER * | get_out_buffer () const |
Gets the ouput buffer. More... | |
int | register_worker (ff_node *w) |
Register the given worker to the list of workers. More... | |
virtual int | svc_init () |
Initializes the gatherer task. More... | |
virtual void * | svc (void *) |
The gatherer task. More... | |
virtual void | svc_end () |
Finializes the gatherer. More... | |
int | run (bool=false) |
Execute the gatherer task. More... | |
virtual int | all_gather (void *task, void **V) |
It gathers all tasks. More... | |
void | thaw (bool _freeze=false, ssize_t nw=-1) |
Thaws all threads register with the gt and the gt itself. | |
void | reset () |
Resets output buffer. More... | |
virtual double | ffTime () |
Start counting time. More... | |
virtual double | wffTime () |
Complete counting time. More... | |
Protected Member Functions | |
virtual ssize_t | selectworker () |
Selects a worker. More... | |
virtual void | notifyeos (int id) |
Notifies the EOS. More... | |
virtual size_t | ntentative () |
Gets the number of tentatives. More... | |
virtual void | losetime_out () |
Loses the time out. More... | |
virtual void | losetime_in () |
Loses the time in. More... | |
virtual ssize_t | gather_task (void **task) |
It gathers the tasks. More... | |
void | push (void *task) |
Pushes the task in the tasks queue. More... | |
bool | pop (void **task) |
Pop a task out of the queue. More... | |
bool | pop_nb (void **task) |
Pop a tak from un unbounded queue. More... | |
|
inline |
Constructor.
It creates max_num_workers
and NULL
pointers to worker objects.
|
inlinevirtual |
It gathers all tasks.
It is a virtual function, and gathers results from the workers.
References ff::svector< T >::back(), getnworkers(), losetime_in(), ff::svector< T >::pop_back(), ff::svector< T >::push_back(), and ff::svector< T >::size().
|
inlinevirtual |
Start counting time.
It defines the counting of start time.
|
inlineprotectedvirtual |
It gathers the tasks.
It keeps selecting the worker. If a worker has task, then the worker is returned. Otherwise a tick is wasted and then keep looking for the worker with the task.
References losetime_in(), ntentative(), and selectworker().
|
inline |
Gets the channel id.
It gets the channelid
.
channelid
is returned.
|
inline |
Gets the ouput buffer.
It gets the output buffer
buffer
is returned.
|
inline |
Gets the number of worker threads currently running.
It gets the number of threads currently running.
|
inline |
Get the number of workers.
It returns the number of total workers registered
References ff::svector< T >::size().
|
inlineprotectedvirtual |
Loses the time in.
It is a virutal function which defines the number of ticks to be waited.
|
inlineprotectedvirtual |
Loses the time out.
It is a virutal function which defines the number of ticks to be waited.
|
inlineprotectedvirtual |
Notifies the EOS.
It is a virtual function and is used to notify EOS
|
inlineprotectedvirtual |
Gets the number of tentatives.
The number of tentative before wasting some times and than retry
References getnworkers().
|
inlineprotected |
Pop a task out of the queue.
It pops the task out of the queue.
false
if not successful, otherwise true
is returned. References get_out_buffer(), and losetime_in().
|
inlineprotected |
Pop a tak from un unbounded queue.
It pops the task from an unbounded queue.
References get_out_buffer().
|
inlineprotected |
|
inline |
Register the given worker to the list of workers.
It registers the given worker to the list of workers.
References ff::svector< T >::push_back(), and ff::svector< T >::size().
|
inline |
Resets output buffer.
Warning resetting the buffer while the node is running may produce unexpected results.
|
inline |
Execute the gatherer task.
It executes the gatherer task.
References ff::svector< T >::size().
|
inlineprotectedvirtual |
Selects a worker.
It gets the next worker using the Round Robin policy. The selected worker has to be alive (and kicking).
Reimplemented in ff::ofarm_gt.
|
inline |
Sets the filer.
It sents the ff_node
to the filter.
|
inline |
Sets output buffer.
It sets the output buffer.
|
inline |
Skips the first pop.
It determine whether the first pop should be skipped or not.
|
inlinevirtual |
The gatherer task.
It is a virtual function to be used as the gatherer task.
Implements ff::ff_thread.
References gather_task(), ff::ff_node::get_in_buffer(), get_out_buffer(), ff::ff_node::get_out_buffer(), push(), set_out_buffer(), skipfirstpop(), and ff::ff_node::svc().
|
inlinevirtual |
Finializes the gatherer.
It is a virtual function used to finalise the gatherer task.
Reimplemented from ff::ff_thread.
References ff::ff_node::svc_end().
|
inlinevirtual |
Initializes the gatherer task.
It is a virtual function to initialise the gatherer task.
Reimplemented from ff::ff_thread.
References ff::svector< T >::size(), and ff::ff_node::svc_init().
|
inlinevirtual |
Complete counting time.
It defines the counting of finished time.
class ff::ff_loadbalancer |
A class representing the Emitter node in a typical Farm skeleton.
This class models the loadbalancer
, which wraps all the methods and structures used by the Emitter node in a Farm
skeleton. The emitter node is used to generate the stream of tasks for the pool of workers. The emitter can also be used as sequential preprocessor if the stream is coming from outside the farm, as is the case when the stream is coming from a previous node of a pipeline chain or from an external device.
The Farm
skeleton must have the emitter node defined: if the user does not add it to the farm, the run-time support adds a default emitter, which acts as a stream filter and schedules tasks in a round-robin fashion towards the workers.
This class is defined in lb.hpp
Public Member Functions | |
ff_loadbalancer (size_t max_num_workers) | |
Default constructor. More... | |
virtual | ~ff_loadbalancer () |
Destructor. More... | |
int | set_filter (ff_node *f) |
Sets filter node. More... | |
void | set_in_buffer (FFBUFFER *const buff) |
Sets input buffer. More... | |
FFBUFFER * | get_in_buffer () const |
Gets input buffer. More... | |
ssize_t | get_channel_id () const |
Gets channel id. More... | |
void | reset_channel_id () |
Resets the channel id. More... | |
void | reset () |
Resets input buffer. More... | |
size_t | getnworkers () const |
Get the number of workers. More... | |
size_t | getNWorkers () const |
Get the number of workers. More... | |
void | skipfirstpop () |
Skips first pop. More... | |
int | set_masterworker () |
Decides master-worker schema. More... | |
int | set_input (svector< ff_node * > &mi) |
Sets multiple input buffers. More... | |
void | broadcast_task (void *task) |
Send the same task to all workers. More... | |
bool | masterworker () const |
Gets master worker. More... | |
int | register_worker (ff_node *w) |
Registers the given node into the workers' list. More... | |
int | deregister_worker () |
Deregister worker. More... | |
virtual void * | svc (void *) |
Load balances task. More... | |
virtual int | svc_init () |
Initializes the load balancer task. More... | |
virtual void | svc_end () |
Finalizes the loadbalancer task. More... | |
int | runlb (bool=false, ssize_t nw=-1) |
Runs the loadbalancer. More... | |
virtual int | run (bool=false) |
Spawns workers threads. More... | |
int | waitlb () |
Waits for load balancer. More... | |
virtual int | wait () |
Waits for workers to finish their task. More... | |
virtual int | wait_freezing () |
Waits for freezing. More... | |
int | wait_freezing (const size_t n) |
Waits for freezing for one single worker thread. | |
void | stop () |
Stops the thread. More... | |
void | freeze () |
Freezes all threads registered with the lb and the lb itself. More... | |
void | freeze (const size_t n) |
Freezes one worker thread. | |
virtual void | thaw (bool _freeze=false, ssize_t nw=-1) |
Thaws all threads register with the lb and the lb itself. | |
void | thaw (const size_t n, bool _freeze=false) |
Thaws one single worker thread. | |
virtual double | ffTime () |
FastFlow start timing. More... | |
virtual double | wffTime () |
FastFlow finish timing. More... | |
Protected Member Functions | |
void | push_eos (bool nofreeze=false) |
Pushes EOS to the worker. More... | |
virtual size_t | selectworker () |
Virtual function that can be redefined to implement a new scheduling policy. More... | |
virtual size_t | ntentative () |
Gets the number of tentatives before wasting some times. More... | |
virtual void | losetime_out () |
Loses some time before sending the message to output buffer. More... | |
virtual void | losetime_in () |
Loses time before retrying to get a message from the input buffer. More... | |
virtual bool | schedule_task (void *task, unsigned int retry=(unsigned)-1, unsigned int ticks=0) |
Scheduling of tasks. More... | |
virtual std::deque< ff_node * > ::iterator | collect_task (void **task, std::deque< ff_node * > &availworkers, std::deque< ff_node * >::iterator &start) |
Collects tasks. More... | |
bool | pop (void **task) |
Pop a task from buffer. More... | |
Static Protected Member Functions | |
static bool | ff_send_out_emitter (void *task, unsigned int retry, unsigned int ticks, void *obj) |
Task scheduler. More... | |
|
inline |
Default constructor.
It is the defauls constructor
max_num_workers | The max number of workers allowed |
|
inlinevirtual |
Destructor.
It deallocates dynamic memory spaces previoulsy allocated for workers.
|
inline |
Send the same task to all workers.
task is a void pointer
It sends the same task to all workers.
References losetime_out().
|
inlineprotectedvirtual |
Collects tasks.
It collects tasks from the worker and returns in the form of deque.
task is a void pointer availworkers is a queue of available workers start is a queue of TODO
References losetime_in().
|
inline |
Deregister worker.
It deregister worker.
|
inlinestaticprotected |
Task scheduler.
It defines the static version of the task scheduler.
true
or false
.
|
inlinevirtual |
FastFlow start timing.
It returns the starting of FastFlow timing.
|
inline |
Freezes all threads registered with the lb and the lb itself.
It freezes all workers and the emitter.
|
inline |
Gets channel id.
It returns the identifier of the channel.
|
inline |
Gets input buffer.
It gets the input buffer
|
inline |
Get the number of workers.
It returns the number of workers running
|
inline |
Get the number of workers.
It returns the number of total workers registered
References ff::svector< T >::size().
|
inlineprotectedvirtual |
Loses time before retrying to get a message from the input buffer.
It loses time before retrying to get a message from the input buffer.
|
inlineprotectedvirtual |
Loses some time before sending the message to output buffer.
It loses some time before the message is sent to the output buffer.
|
inline |
Gets master worker.
It returns master worker
|
inlineprotectedvirtual |
Gets the number of tentatives before wasting some times.
The number of tentative before wasting some times and than retry.
|
inlineprotected |
Pop a task from buffer.
It pops the task from buffer.
task is a void pointer
true
if successful References losetime_in().
|
inlineprotected |
Pushes EOS to the worker.
It pushes the EOS to the queue of the worker.
nofreeze is a booleon value to determine if the EOS should be freezed or not
References losetime_out().
|
inline |
Registers the given node into the workers' list.
It registers the given node in the worker list.
w is the worker
References ff::svector< T >::push_back(), and ff::svector< T >::size().
|
inline |
Resets input buffer.
Warning resetting the buffer while the node is running may produce unexpected results.
|
inline |
Resets the channel id.
It reset the channel id to -2
|
inlinevirtual |
Spawns workers threads.
It spawns workers threads.
References ff::svector< T >::size().
|
inline |
Runs the loadbalancer.
It runs the load balancer.
References ff::svector< T >::size().
|
inlineprotectedvirtual |
Scheduling of tasks.
It is the main scheduling function. This is a virtual function and can be redefined to implement a custom scheduling policy.
task is a void pointer retry is the number of tries to schedule a task ticks are the number of ticks to be lost
true
, if successful, or false
if not successful. References losetime_out(), ntentative(), and selectworker().
|
inlineprotectedvirtual |
Virtual function that can be redefined to implement a new scheduling policy.
It is a virtual function that can be redefined to implement a new scheduling polity.
Reimplemented in ff::ofarm_lb.
|
inline |
Sets filter node.
It sets the filter with the FastFlow node.
f is FastFlow node
References ff_send_out_emitter().
|
inline |
Sets input buffer.
It sets the input buffer with the instance of FFBUFFER
buff is a pointer of FFBUFFER
Sets multiple input buffers.
It sets the multiple input buffers.
References ff::svector< T >::size().
|
inline |
Decides master-worker schema.
It desides the master-worker schema.
|
inline |
Skips first pop.
It sets skip1pop
to true
|
inline |
|
inlinevirtual |
Load balances task.
It is a virtual function which loadbalances the task.
Implements ff::ff_thread.
References collect_task(), get_in_buffer(), ff::ff_node::get_in_buffer(), pop(), push_eos(), schedule_task(), set_in_buffer(), ff::svector< T >::size(), skipfirstpop(), and ff::ff_node::svc().
|
inlinevirtual |
Finalizes the loadbalancer task.
It is a virtual function which finalises the loadbalancer task.
Reimplemented from ff::ff_thread.
References ff::ff_node::svc_end().
|
inlinevirtual |
Initializes the load balancer task.
It is a virtual function which initialises the loadbalancer task.
Reimplemented from ff::ff_thread.
References ff::ff_node::svc_init().
|
inlinevirtual |
Waits for workers to finish their task.
It waits for all workers to finish their tasks.
References ff::svector< T >::size().
|
inlinevirtual |
Waits for freezing.
It waits for the freezing of all threads.
|
inline |
Waits for load balancer.
It waits for the load balancer.
|
inlinevirtual |
FastFlow finish timing.
It returns the finishing of FastFlow timing.
class ff::Barrier |
Blocking barrier - Used only to start all nodes synchronously.
class ff::spinBarrier |
Nonblocking barrier - Used only to start all nodes synchronously.
class ff::ff_node |
The FastFlow abstract contanier for a parallel activity (actor).
Implements ff_node
, i.e. the general container for a parallel activity. From the orchestration viewpoint, the process model to be employed is a CSP/Actor hybrid model where activities (ff_nodes
) are named and the data paths between processes are clearly identified. ff_nodes
synchronise each another via abstract units of SPSC communications and synchronisation (namely 1:1 channels), which models data dependency between two ff_nodes
. It is used to encapsulate sequential portions of code implementing functions.
In
a multicore, a ff_node is implemented as non-blocking thread. It is not and should not be confused with a task. Typically a ff_node
uses the 100% of one CPU context (i.e. one core, either physical or HT, if any). Overall, the number of ff_nodes running should not exceed the number of logical cores of the platform.
A
ff_node behaves as a loop that gets an input (i.e. the parameter of svc
method) and produces one or more outputs (i.e. return parameter of svc
method or parameter of the ff_send_out
method that can be called in the svc
method). The loop complete on the output of the special value "end-of_stream" (EOS). The EOS is propagated across channels to the next ff_node
.
Key methods are: svc_init
, svc_end
(optional), and svc
(pure virtual, mandatory). The svc_init
method is called once at node initialization, while the svn_end
method is called after a EOS task has been returned.
This class is defined in node.hpp
Public Member Functions | |
virtual void * | svc (void *task)=0 |
The service callback (should be filled by user with parallel activity business code) More... | |
virtual int | svc_init () |
Service initialisation. More... | |
virtual void | svc_end () |
Service finalisation. More... | |
virtual bool | put (void *ptr) |
Nonblocking put onto output channel. More... | |
virtual bool | get (void **ptr) |
Noblocking pop from input channel. More... | |
virtual FFBUFFER * | get_in_buffer () const |
Gets input channel. More... | |
virtual FFBUFFER * | get_out_buffer () const |
Gets pointer to the output channel. More... | |
virtual bool | ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT)) |
Sends out the task. More... | |
Protected Member Functions | |
virtual void | skipfirstpop (bool sk) |
Set the ff_node to start with no input task. More... | |
bool | skipfirstpop () const |
Gets the status of spontaneous start. More... | |
virtual int | create_input_buffer (int nentries, bool fixedsize=true) |
Creates the input channel. More... | |
virtual int | create_output_buffer (int nentries, bool fixedsize=false) |
Creates the output channel. More... | |
virtual int | set_output_buffer (FFBUFFER *const o) |
Assign the output channelname to a channel. More... | |
virtual int | set_input_buffer (FFBUFFER *const i) |
Assign the input channelname to a channel. More... | |
virtual int | run (bool=false) |
Run the ff_node. More... | |
virtual int | wait () |
Wait ff_node termination. More... | |
virtual int | wait_freezing () |
Wait the freezing state. More... | |
virtual double | ffTime () |
Misure ff::ff_node execution time. More... | |
virtual double | wffTime () |
Misure ff_node::svc execution time. More... | |
virtual | ~ff_node () |
Destructor. | |
|
inlineprotectedvirtual |
Creates the input channel.
nentries | the number of elements of the buffer |
fixedsize | flag to decide whether the buffer is bound or unbound. Default is true . |
Reimplemented in ff::ff_minode, ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, and ff::ff_pipeline.
|
inlineprotectedvirtual |
Creates the output channel.
nentries | the number of elements of the buffer |
fixedsize | flag to decide whether the buffer is bound or unbound. Default is true . |
Reimplemented in ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, and ff::ff_pipeline.
|
inlinevirtual |
Sends out the task.
It allows to emit tasks on output stream without returning from the svc
method. Make the ff_node to emit zero or more tasks per input task
task | a pointer to the task |
retry | number of tries to put (nonbloking partial) the task to output channel |
ticks | delay between successive retries |
|
inlineprotectedvirtual |
Misure ff::ff_node execution time.
Reimplemented in ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, ff::ff_mdf, and ff::ff_pipeline.
|
inlinevirtual |
Noblocking pop from input channel.
Wait-free and fence-free (under TSO)
ptr | is a pointer to the task |
|
inlinevirtual |
Gets input channel.
It returns a pointer to the input buffer.
|
inlinevirtual |
Gets pointer to the output channel.
It returns a pointer to the output buffer.
|
inlinevirtual |
Nonblocking put onto output channel.
Wait-free and fence-free (under TSO)
ptr | is a pointer to the task |
|
inlineprotectedvirtual |
Run the ff_node.
Reimplemented in ff::ff_monode, ff::ff_minode, ff::ff_ofarm, ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, ff::ff_dnode< CommImpl >, ff::ff_dnode< zmq1_1 >, ff::ff_dnode< COMM2 >, ff::ff_dnode< COMM3 >, ff::ff_dnode< COMM1 >, ff::ff_dnode< COMM >, ff::ff_dnode< CommImplIn >, ff::ff_pipeline, ff::ff_dinout< CommImplIn, CommImplOut >, and ff::ff_dinout< COMM1, COMM2 >.
|
inlineprotectedvirtual |
|
inlineprotectedvirtual |
Assign the output channelname to a channel.
Attach the output of a ff_node
to an existing channel, typically the input channel of another ff_node
o | reference to a channel of type FFBUFFER |
Reimplemented in ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, and ff::ff_pipeline.
|
inlineprotectedvirtual |
Set the ff_node to start with no input task.
Setting it to true let the ff_node
execute the svc
method spontaneusly before receiving a task on the input channel. skipfirstpop
makes it possible to define a "producer" node that starts the network.
sk | true start spontaneously (*task will be NULL) |
Reimplemented in ff::ff_monode, ff::ff_minode, ff::ff_dnode< CommImpl >, ff::ff_dnode< zmq1_1 >, ff::ff_dnode< COMM2 >, ff::ff_dnode< COMM3 >, ff::ff_dnode< COMM1 >, ff::ff_dnode< COMM >, ff::ff_dnode< CommImplIn >, ff::ff_dinout< CommImplIn, CommImplOut >, ff::ff_dinout< COMM1, COMM2 >, ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, and ff::ff_farm< foralllb_t >.
|
inlineprotected |
Gets the status of spontaneous start.
If true
the ff_node
execute the svc
method spontaneusly before receiving a task on the input channel. skipfirstpop
makes it possible to define a "producer" node that produce the stream.
true
if skip-the-first-element mode is set, false
otherwiseExample: l1_ff_nodes_graph.cpp
|
pure virtual |
The service callback (should be filled by user with parallel activity business code)
task | is a the input data stream item pointer (task) |
Implemented in ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, ff::ff_pipeline, ff::ff_mdf, and ff::poolEvolution< T, env_t >.
|
inlinevirtual |
Service finalisation.
Called after EOS arrived (logical termination) but before shutdding down runtime support (can be useful for housekeeping)
Reimplemented in ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, and ff::ff_pipeline.
|
inlinevirtual |
Service initialisation.
Called after run-time initialisation (e.g. thread spawning) but before to start to get items from input stream (can be useful for initialisation of parallel activities, e.g. manual thread pinning that cannot be done in the costructor because threads stil do not exist).
Reimplemented in ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, and ff::ff_pipeline.
|
inlineprotectedvirtual |
Wait ff_node termination.
Reimplemented in ff::ff_monode, ff::ff_minode, ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, ff::ff_dnode< CommImpl >, ff::ff_dnode< zmq1_1 >, ff::ff_dnode< COMM2 >, ff::ff_dnode< COMM3 >, ff::ff_dnode< COMM1 >, ff::ff_dnode< COMM >, ff::ff_dnode< CommImplIn >, ff::ff_pipeline, ff::ff_dinout< CommImplIn, CommImplOut >, and ff::ff_dinout< COMM1, COMM2 >.
|
inlineprotectedvirtual |
Wait the freezing state.
It will happen on EOS arrival on the input channel
Reimplemented in ff::ff_minode, ff::ff_farm< lb_t, gt_t >, ff::ff_farm< ofarm_lb, ofarm_gt >, ff::ff_farm< foralllb_t >, and ff::ff_pipeline.
|
inlineprotectedvirtual |
Misure ff_node::svc execution time.
class ff::ff_oclNode |
OpenCL implementation of FastFlow node.
Public Member Functions | |
virtual void | svc_SetUpOclObjects (cl_device_id id)=0 |
Setup a OCL device. More... | |
virtual void | svc_releaseOclObjects ()=0 |
Releases a OCL device. | |
Public Member Functions inherited from ff::ff_node | |
virtual void * | svc (void *task)=0 |
The service callback (should be filled by user with parallel activity business code) More... | |
virtual int | svc_init () |
Service initialisation. More... | |
virtual void | svc_end () |
Service finalisation. More... | |
virtual bool | put (void *ptr) |
Nonblocking put onto output channel. More... | |
virtual bool | get (void **ptr) |
Noblocking pop from input channel. More... | |
virtual FFBUFFER * | get_in_buffer () const |
Gets input channel. More... | |
virtual FFBUFFER * | get_out_buffer () const |
Gets pointer to the output channel. More... | |
virtual bool | ff_send_out (void *task, unsigned int retry=((unsigned int)-1), unsigned int ticks=(TICKS2WAIT)) |
Sends out the task. More... | |
Protected Member Functions | |
bool | initOCLInstance () |
Initializes OpenCL instance. More... | |
ff_oclNode () | |
Constructor. More... | |
bool | device_rules (cl_device_id id) |
Device rules. More... | |
void | svc_createOCL () |
Creates OpenCL. | |
void | svc_releaseOCL () |
Releases OpenCL. | |
bool | evaluation () |
Evaluation. More... | |
Protected Member Functions inherited from ff::ff_node | |
virtual void | skipfirstpop (bool sk) |
Set the ff_node to start with no input task. More... | |
bool | skipfirstpop () const |
Gets the status of spontaneous start. More... | |
virtual int | create_input_buffer (int nentries, bool fixedsize=true) |
Creates the input channel. More... | |
virtual int | create_output_buffer (int nentries, bool fixedsize=false) |
Creates the output channel. More... | |
virtual int | set_output_buffer (FFBUFFER *const o) |
Assign the output channelname to a channel. More... | |
virtual int | set_input_buffer (FFBUFFER *const i) |
Assign the input channelname to a channel. More... | |
virtual int | run (bool=false) |
Run the ff_node. More... | |
virtual int | wait () |
Wait ff_node termination. More... | |
virtual int | wait_freezing () |
Wait the freezing state. More... | |
virtual double | ffTime () |
Misure ff::ff_node execution time. More... | |
virtual double | wffTime () |
Misure ff_node::svc execution time. More... | |
virtual | ~ff_node () |
Destructor. | |
|
inlineprotected |
Constructor.
It construct the OpenCL node for the device.
|
inlineprotected |
Device rules.
It defines the rules for the device.
id | is the identifier of the device |
true
is always returned
|
inlineprotected |
Evaluation.
true
, otherwise false
|
inlineprotected |
Initializes OpenCL instance.
true
is returned, otherwise false
is returned.
|
pure virtual |
Setup a OCL device.
id | is the identifier of the opencl device |
class ff::uSWSR_Ptr_Buffer |
Unbounded Single-Writer/Single-Reader buffer (FastFlow unbound channel)
The unbounded SWSR circular buffer is based on a pool of wait-free SWSR circular buffers (see buffer.hpp). The pool of buffers automatically grows and shrinks on demand. The implementation of the pool of buffers carefully tries to minimize the impact of dynamic memory allocation/deallocation by using caching strategies. The unbounded buffer is based on the INTERNAL_BUFFER_T.
Public Member Functions | |
uSWSR_Ptr_Buffer (unsigned long n, const bool fixedsize=false, const bool fillcache=false) | |
Constructor. | |
~uSWSR_Ptr_Buffer () | |
Destructor. | |
bool | init () |
Initialise the unbounded buffer. | |
bool | empty () |
Returns true if the buffer is empty. More... | |
bool | push (void *const data) |
Push. More... | |
bool | pop (void **data) |
Pop. More... | |
unsigned long | length () const |
number of elements in the queue More... | |
|
inline |
Returns true if the buffer is empty.
true
if empty, false
otherwise
|
inline |
number of elements in the queue
|
inline |
Pop.
[out] | data | pointer-pointer to data |
|
inline |
Push.
push the input value into the queue.
If fixedsize has been set to true
, this method may return false. This means EWOULDBLOCK and the call should be retried.
data | pointer to data to be pushed in the buffer |
false
if fixedsize
is set to true
OR if data
is NULL OR if there is not a buffer to write to.true
if the push succedes.