# FastFlow

## Fastflow architecture

Fastflow is conceptually designed as a stack of layers that progressively abstract out the shared memory, message-passing and SIMT programming models up to the definition of useful programming abstractions and parallel patterns. The abstraction process has two main goals:

1. to promote high-level, platform-independent structured parallel programming, and in particular skeletal programming (i.e. pattern-based explicit parallel programming).
2. to support the development of portable and efficient applications for homogenous and heterogenous of platforms, including multicore, many-core (e.g. GPGPU, FPGA), and distributed clusters of them.

These two goals have been perceived as a dichotomy for many years by many computer practitioners. Researchers in the skeletal and high-level parallel programming community never perceived it this way. Indeed, Fastflow is the nth-in-a-row high-level pattern-based high-level parallel programming frameworks we designed in the last fifteen years (detailed in contributors home pages: MarcoA, Massimo, MarcoD). Along this path we were happy to see that high-level parallel programming is becoming a mainstream approach, as demonstrated by large industry involvement in the field: Google (with MapReduce), Intel (with TBB and CnC), Microsoft (with TPL). A more comprehensive introduction to high-level parallel programming can be found in one of our recent talk.

FastFlow architecture is organised in three main tiers:

1. High-level patterns They are clearly characterised in a specific usage context and are targeted to the parallelisation of sequential (legacy) code. Examples are exploitation of loop parallelism, stream parallelism, data-parallel algorithms, execution of general workflows of tasks, etc. They are typically equipped with self-optimisation capabilities (e.g. load-balancing, grain auto-tuning, parallelism-degree auto-tuning) and exhibit limited nesting capability. Examples are: parallel-for, pipeline, stencil-reduce, mdf (macro-data-flow). Some of them target specific devices (e.g. GPGPUs). They are implemented on top of core patterns.
2. Core patterns They provide a general data-centric parallel programming model with its run-time support, which is designed to be minimal and reduce to the minimum typical sources of overheads in parallel programming. At this level there are two patterns (farm and pipeline) and one pattern-modifier (feedback). They make it possible to build very general (deadlock-free) cyclic process networks. They are not graphs of tasks, they are graphs of parallel executors (processes/threads). Tasks or data items flows across them. Overall, the programming model can be envisioned as a shared-memory streaming model, i.e. a shared-memory model equipped with message-passing synchronisations. They are implemented on top of building blocks.
3. Building blocks It provides the basic blocks to build (and generate via C++ header-only templates) the run-time support of core patterns. Typical objects at this level are queues (e.g. wait-free fence-free SPSC queues, bound and unbound), process and thread containers (as C++ classes) mediator threads/processes (extensible and configurable schedulers and gatherers). The shared-memory run-time support extensively uses nonblocking lock-free (and fence-free) algorithms, the distributed run-time support employs zero-copy messaging, the GPGPUs support exploits asynchrony and SIMT optimised algorithms.

Observe that this stack of layers is purely conceptual. As we shall see, many of these stacks are statically compiled and optimised in a cross-layer fashion into the binary code by way of generative programming techniques, such as inline C++ templates.

## FastFlow tiers details

### High-level Patterns

At this level patterns are presented as high-order functions), typically predefined methods of a class that can be instanced with a C++11 Lambda function. The main design target is to provide the programmer with an easy way to parallelise “in-place” a (semantically meaningful) chunk of code, e.g. the body of a loop. As it happens in OpenMP, a sequential for loop can be substituted with a parallel_for loop with almost no other changes to the code structure.

 // FastFlow (--std=c++11) ff::ParallelFor pf; pf.parallel_for(0L,N,[&A](const long i) { A[i]+=1; },nworkers); // OpenMP (-fopenmp) #pragma omp parallel for num_threads(nworkers) for(long i=0;i

In the specific case, the only syntactic difference between OpenMP and FastFlow are that FastFlow provides programmers with C++ templates instead of compiler pragmas. It is worth to notice that despite the similar syntax, the implementation of the parallel_for and all other high-level patterns in FastFlow is quite different from OpenMP and other mainstream programming frameworks (Intel TBB, etc). FastFlow, instead of relying on a general task execution engine, generates a compile time a specific streaming network based on core patterns for each pattern. In the case of parallel_for this network is a parametric master-worker with active or passive (in memory) task scheduler (more details in the PDP2014 paper).

As in OpenMP, parallel_for comes in many variants (see reference manual). Other patterns at this level, to date, are: parallel_reduce, mdf (macro-data-flow), pool evolution (genetic algorithm), stencil. They cover most common parallel programming paradigms in data, stream and task parallelism. Notably, FastFlow patterns are C++ class templates and can be extended by end users according to the Object-Oriented methodology.

Iterative execution of kernels onto GPGPUs are addressed by a single but very flexible pattern, i.e. stencil-reduce, which also takes care of feeding GPGPUs with data and D2H/H2D synchronisations. More details can be found in GTC 2014 talk.

### Core Patterns

At its foundations FastFlow implements a (mid/low-level) concurrent programming model, which extends C++ language. From the orchestration viewpoint, the process model to be employed is a CSP/Actor hybrid model where processes (so-called ff_nodes) are named and the data paths between processes are clearly identified. The abstract units of communication and synchronisation are known as channels and represent a stream of data dependency between two processes. A ff_node is C++ class, after construction it enters in a infinite loop that 1) gets a task from input channel (i.e. a pointer); 2) execute business code on the task; 3) put a task into the output channel (i.e. a pointer). Representing communication and synchronisation as a channel ensures that synchronisation is tied to communication and allows layers of abstraction at higher levels to compose parallel programs where synchronisation is implicit.

At the core patterns level, patterns to build a graph of ff_nodes are defined. Since the graph of ff_nodes is a streaming network, any FastFlow graph is built using two streaming patterns (farm and pipeline) and one pattern-modifier (loopback, to build cyclic networks). These patterns can be arbitrarily nested to build large and complex graphs. However, not all graphs can be build. This enforce the correctness (by-construction) of all streaming networks that can be generated. In particular, they are deadlock-free and data-race free.

#### Nonblocking and Blocking behaviour

Blocking synchronisations fit well coarse grain parallelism (milliseconds tasks or more), whereas Nonblocking fine grain parallelism. Blocking synchronisations make it possible to exploit over-provisioning (e.g. for load balancing) and energy consumption. However, they exhibits large overheads (also due to OS involvement). Mixing blocking and nonblocking synchronisations is not trivial.

FastFlow run-time is designed to exhibit a nonblocking behaviour, with the possibility to switch to blocking behaviour. Overall, a FastFlow run is a sequence of nonblocking running phases. Among two phases the run-time can switch to a blocking phase by way of a (original, data-flow) distributed protocol. In the FastFlow terminology, a pattern (or a composition of patterns) can freeze (i.e. suspend), to be later resumed in the next nonblocking phase. This model makes it possible to address fine grain workloads, bursts of fine grain workloads, and coarse grain workloads. During nonblocking phase, the FastFlow run time employes only lock-free and wait-free algorithms in all synchronisation critical paths (whereas it uses pthreads locks in the blocking phase).

The implementation in term of streaming network (i.e. a network of threads or processes) of a pattern can be cyclic (e.g. master-worker, D&C, etc.). For this FastFlow uses its own unbound SPSC buffer to avoid deadlocks due to dependency cycles [ADK12].

#### Accelerator mode

The offloading feature is concerned with abstracting over auxiliary hardware or software accelerators (e.g. GPGPUs or set-of-cores). Offloading allows the model to be heterogeneous over the hardware in that code that is to be executed on a CPU which makes use of this layer may be partially offloaded onto accelerators.

At the level of core patterns, all pattern composition can work in accelerator mode. A software accelerator can be feed with asynchronous offloading requests from the main thread (or any other thread), which basically generate a input stream for the accelerator. The primary aim of offloading is to provide the high-level pattern (or application programmer) programmer with an easy and semi-automatic path to introducing parallelism into a C/C++ sequential code by moving or copying parts of the original code into the body of C++ methods, which will be executed in parallel according to a FastFlow pattern (or pattern composition). Results computed from an accelerator can be collected either synchronously or asynchronously.

A FastFlow accelerator provides the programmer with one (untyped) streaming input channel and one (untyped) streaming output channel that can be dynamically created (and destroyed) from a C++ code (either sequential or multi-threaded) as a C++ object. Thanks to the underlying shared memory architecture, messages flowing into these channels may carry both values and pointers to data structures.

An accelerator, which is a collection of threads, has a global lifecycle with two stable states: \emph{running} and \emph{frozen}, plus several transient states. The running state happens when all threads are logically able to run (i.e. they are ready or running at the O.S. level). The frozen state happens when all threads are suspended (at the O.S. level). Transitions from these two states involve calls to the underlying threading library (and to the O.S.).

More details on FastFlow accelerator technology can be found in [ADK11].

### Building Blocks

At this level, the FastFlow programming model can be thought as a hybrid shared-memory/message-passing model. A process (ff_node) is sequential, a channel models a true data dependency between processes. Processes typically stream data items (they are not tasks) onto channels, they can be either references (e.g. pointers in the shared-memory) or messages with a payload (e.g. in a distributed platform). In both cases, the data item acts as synchronisation token. In general, no further synchronisation primitives are needed (e.g. locks, semaphores) even thought their usage is not forbidden (they are simply useless and a source of additional overhead). Overall, at this level, FastFlow building blocks make it possible to realise arbitrary streaming networks over lock-less channels.

In summary, the FastFlow building blocks layer realizes the two basic features:

1. parallelism exploitation, i.e. the creation, destruction and life cycle control of different flows of controls, and
2. asynchronous communication channels, supporting the synchronization of different flows of control.
##### Multicore

Implementation-wise, a ff_node is a C++ object that is mapped onto a OS thread (POSIX or OS native threads). Typically ff_nodes have a nonblocking behaviour, i.e. do not suspend on pushing or popping messages from channels. Empty and full channels are managed via busy waiting. If needed, a graph of nodes can be switched from nonblocking to blocking behaviour, and vice-versa (via a native distributed protocol). Nonblocking behaviour, coupled with lock-less (actually wait-free) channels enforce a very high throughput and very low latency onto cache-coherent shared-memory multicore (~20 clock cycles per message). The possibility to switch from blocking to nonblocking behaviour is useful to manage bursts of activity interweaved by periods of inactivity.

Channels are inspired to P1C1 [HK97] and Fastforward queues [GMV08] and Lamport's wait-free protocols [Lam83], and provides mechanisms to define simple streaming networks whose run-time support is implemented through correct and efficient lock-free Single-Producer-Single-Consumer (SPSC) queues equipped with non-blocking push and pop operations (more details about FastFlow's SPSC queues can be found in [ADK12].

Shared-memory channels exhibits a number of performance pitfalls on commodity shared-memory cache-coherent multiprocessors (as many commodity multi-core are). In particular, traditional lock-free implementations (such as Lamport's solution [Lam83]) of SPSC queues are correct under sequential consistency only, where none of the current multi-cores implement sequential consistency. Also, some correct queue implementations induce a very high invalidation rate - and thus reduced performance - because they exhibit the sharing of locations that are subject to alternative invalidations from communication partners (e.g. head and tail of a circular buffers).

The FastFlow implementation does not suffer from the ABA problem [MS98], and it remains correct also in the case that only a reference instead of the full message is communicated using the queues. The FastFlow SPSC queue implementation is largely inspired by Fastforward queues [GMV08]. As with Fastforward queues, the push operation (issued by the producer) always reads and writes pwrite (i.e. tail pointer) only, and the pop (issued by the consumer) always reads and writes pread (i.e. head pointer) only. This approach substantially differs from the traditional one (e.g. in Lamport's queues) where both the producer and the consumer access both the head and tail pointers causing the continuous invalidation of cache lines holding head and tail pointers.

Fastflow SPSC queues can be directly used to write parallel programs by writing a C++ program that spawns a set of ff_nodes and orchestrates them in pairs each of them sharing (at least) a SPSC queue descriptor. Each thread in the pair has a fixed role in using the queue, either producer or consumer. The bulk of created threads can then start and eventually synchronise using SPSP queues, for example in pipeline fashion. Examples of streaming networks that can be build are:

Other orchestrations are also possible, although their correct exploitation may be non-trivial. In particular expressing N-to-1, 1-to-M, and N-to-M streaming networks might be complex and require mutual exclusion, which is typically a source of a non-negligible overhead in multi-core architectures.

One small, but significant, abstraction step consists in providing one-to-many (SPMC), many-to-one (MPSC), and many-to-many (MPMC) channels. SPMC, MPSC, and MPMC queues can be realized in several different ways, for example using locks, or in a lock-free fashion in order to avoid lock overhead. However, these queues could not be directly programmed in a lock-free fashion without using at least one atomic operation, which is typically used to enforce the correct serialization of updates from either many producers or many consumers at the same end of the queue (see an example). These operations, however, induce a memory fence, thus a cache invalidation/update, which can seriously impair the performance of parallel programs exhibiting frequent synchronizations (e.g. for fine-grain parallelism). Notice that building a lock also requires an atomic operation unless working under sequential consistency for which a number of algorithms that do not require atomic operations exist, e.g. Lamport's Bakery algorithm[Lam74].

With FastFlow we advocate a different approach to the implementation of these queues, which require neither locks nor atomic operations. SPMC, MPSC channels are realised by using only SPSC queues and a mediator thread, which enforce the correct serialisation of producers and consumers. As shown in architectural figure, there are two main kind of mediator nodes: multi-input (minode) and multi-output (monode).

It is interesting to observe that

• a collective channel (e.g. SPMC, MPSC) implemented via SPSCs+mediator are sometime faster with respect to CAS-based implementation (this really depends on the platform, parallelism degree and computation grain).
• Mediator thread makes it possible to easily program scheduling policy for both item distribution and gathering.
• Nonblocking mediator threads couple very well with hyper threading technology because they typically execute lot of instructions that never arrives to execute stage in the processor pipeline.
• Fastflow relies on wait-free, non-blocking synchronizations. The approach has pros and cons. The main advantage consists in performance: avoiding memory fences dramatically reduces cache coherence overhead.

#### Distributed platforms

Documentation in progress

### Hardware

FastFlow has been originally designed for cache-coherent multiprocessors, and in particular commodity homogenous multi-core (e.g. Intel core, AMD K10, etc.). It has been then extended to distributed platforms, GPGPUs and heterogenous platforms.

##### Multicore

FastFlow supports multiprocessors exploiting any memory consistency, including very weak consistency models. FastFlow implementation is lock-free, and for several memory consistency models is also memory fence-free (e.g., sequential consistency, total store ordering, and the x86 model). On other models (e.g., Itanium and Power4, 5, and 6), a store fence before an enqueue is needed [GMV08].

##### GPGPUs

GPGPUs are supported by way of OpenCL and/or CUDA. At the current development status, kernel business code should be written either in OpenCL or CUDA. FastFlow takes care of H2D/D2H (asynchronous) data transfers and synchronisations. stencil-reduce` pattern makes it possible to write most of the typical GPGPUs kernels as they were C/C++ code since intra-block and inter-blocks synchronisations (including reduce code) are transparently provided by the pattern. Still, the programmer can use OpenCL/CUDA directives in the kernel.

##### Distributed

Distributed platforms build on top of TCP/IP and Infiniband/OFED protocols are also supported. FPGA support is planned but not yet fully developed.

## Rationale

Parallelism exploitation patterns (a.k.a. skeletons) are usually categorised in three main classes: Task, Data, and Stream Parallelism.

• Stream Parallelism can be used when there exists a partial or total order in a computation. By processing data elements in order, local state may be maintained in each filter. The set of skeletons provided by FastFlow could be further extended by building new C++ templates on top of the Fastflow low-level programming layer.
• Task Parallelism is explicit in the algorithm and consists of running the same or different code on different executors (cores, processors, machines, etc.). Different flows-of-control (threads, processes, etc.) may communicate with one another as they work. Communication usually takes place to pass data from one thread to the next as part of the same data-flow graph.
• Data Parallelism is a method for parallelizing a single task by processing independent data elements of this task in parallel. The flexibility of the technique relies upon stateless processing routines implying that the data elements must be fully independent. Data Parallelism also supports Loop-level Parallelism where successive iterations of a loop working on independent or read-only data are parallelized in different flows-of-control and concurrently executed.

FastFlow is designed to support all of them over a stream parallel programming model (provided by core patterns tier). The set of patterns provided by FastFlow could be further extended by building new C++ templates.

While many of the programming frameworks for multi-core offer Data and Task Parallel skeletons, only few of them offer Stream Parallel skeletons (such as TBB's pipeline). None of them offers the farm skeleton, which exploits functional replication of a set of workers and abstracts out the parallel filtering of successive independent items of the stream under the control of a scheduler, as a first-class concept.

## References

[Lam74] L. Lamport. A new solution of dijkstra’s concurrent programming problem. Commun. ACM, 17(8):453–455, 1974.

[Lam83] L. Lamport. Specifying concurrent program modules. ACM Trans. Program. Lang. Syst., 5(2):190–222, 1983.

[HK97] . Higham, J. Kawash. Critical sections and producer/consumer queues in weak memory systems. In: Proc of the Intl. Symposium on Parallel Architectures, Algorithms and Networks (ISPAN), pages 56–63, 1997. IEEE.

[MS98] M. M. Michael and M. L. Scott. Nonblocking algorithms and preemption-safe locking on multiprogrammed shared memory multiprocessors. Journal of Parallel and Distributed Computing, 51(1):1–26, 1998.

[GMV08] J. Giacomoni, T. Moseley, and M. Vachharajani. Fastforward for efficient pipeline parallelism: a cache-optimized concurrent lock-free queue. In Proc. of the 13th ACM SIGPLAN Symposium on Principles and practice of parallel programming (PPoPP), pages 43-52, New York, NY, USA, 2008. ACM.

[AB+09] K. Asanovic, R. Bodik, J. Demmel, T. Keaveny, K. Keutzer, J. Kubiatowicz, N. Morgan, D. Patterson, K. Sen, J. Wawrzynek, D. Wessel, and K. Yelick. A view of the parallel computing landscape. Commun. ACM 52, 10 (Oct. 2009), 56-67. DOI:10.1145/1562764.1562783

[ADK11] M. Aldinucci, M. Danelutto, P. Kilpatrick, M. Meneghin, and M. Torquati. Accelerating code on multi- cores with fastflow. In Proc. of 17th Intl. Euro-Par 2011 Parallel Processing, volume 6853 of LNCS, pages 170–181, Bordeaux, France, Aug. 2011. Springer.

[ADK12] M. Aldinucci, M. Danelutto, P. Kilpatrick, M. Meneghin, and M. Torquati. An efficient unbounded lock-free queue for multi-core systems. In Proc. of 18th Intl. Euro-Par 2012 Parallel Processing, volume 7484 of LNCS, pages 662–673, Rhodes Island, Greece, aug 2012. Springer.