

UNIVERSITÀ DEGLI STUDI DI TORINO

UNIVERSITA DEGLI STUDI DI TORINO Politecnico di Milano Dipartimento di Elettronica e Informazione

December 5, 2012 Milano, Italy

# FastFlow: high-level programming patterns with non-blocking lock-free run-time support

Marco Aldinucci - Uni. of Torino, Italy

*Coordinator of the research activities on "Parallel and High-Performance Computing"* FabioTordini (PhD), Claudia Misale (PhD), Irfan Uddin (PostDoc), Guilherme Peretti Pezzi (PostDoc), Maurizio Drocco (SW engineer)

&

Massimo Torquati and Marco Danelutto - Uni. Pisa, Italy Massimiliano Meneghin - IBM Research, Ireland Peter Kilpatrick - Queen's Uni. Belfast, U.K.





SEVENTH FRAMEWORK PROGRAMME





- \* Concurrency and multi-core, the theoretical background
  - a personal perspective
- \* FastFlow
  - A programming model (and a library) for multicore (& manycore)
  - Fast core-to-core lock-free messaging
- \* Applications
- \* Discussion

## Our tool perspective







4

UNIVERSITÀ DEGLI STUDI DI TORINO

### Concurrency and multi-core theoretical background: a personal perspective



#### Parallel stochastic sim for system biology IEEE PDP 2011, HiBB 2011, Briefings in Bioinformatics (invited), Bio-IT world (invited), IEEE PDP 2013 (submitted), BMC Bioinformatics

| DSL | task | engine |  |
|-----|------|--------|--|
|     |      |        |  |

#### Table 2 - Performance (Intel 32 core platform)

|     | • |    |     |
|-----|---|----|-----|
| tas |   | CI | 176 |
| Las |   | J  |     |

| Model          | Single trajectory information |              | Overall data (20 sim eng, 3 stat eng) |                    |             |             |
|----------------|-------------------------------|--------------|---------------------------------------|--------------------|-------------|-------------|
|                | N. samples                    | Avg sim step | Sample time                           | Inter-arrival time | Throughput  | Output size |
| Neurospora     | 104                           | 7.80 µs      | 517.24 µs                             | 25.86 µs           | 11.87 MB/s  | 36.62 MB    |
| Neurospora     | 105                           | 8.37 µs      | 55.51 µs                              | 2.78 µs            | 11.98 MB/s  | 366.21 MB   |
| Neurospora     | 106                           | 75.63 µs     | 4.65 µs                               | 232.68 ns          | 201.63 MB/s | 3.58 GB     |
| EColi          | 106                           | 173.64 µs    | 0.58 µs                               | 28.81 ns           | 257.66 MB/s | 4.47 GB     |
| Lotka-Volterra | $10^{6}$                      | 22.86 µs     | 0.69 µs                               | 34.68 ns           | 147.11 MB/s | 2.68 GB     |





#### Simulation of transcriptional regulation in Neurospora

4

time

(predetors)

Parallel stochastic sim for system biology IEEE PDP 2011, HiBB 2011, Briefings in Bioinformatics (invited), Bio-IT world (invited), IEEE PDP 2013 (submitted), BMC Bioinformatics

Croadiant Clark in Neuronant

6

30

n. simulation engines (sim eng)

#### PARAPHRASE

### \* E.g. Intel Ivybridge, Haswell

- \* cache-coherent
- \* 12 or more core per socket (20 contexts)
- \* cc-NUMA (as matter of a fact)

#### \* NVidia/AMD GPGPU/Hybrid

- \* SIMD, no global synch
- \* performance only with proper and **not fully automatic** memory hierarchy management

#### \* Intel MIC CPU/GPGPU

- \* ring-based interconnection, variable coherency
- \* apparently even more NUMA

#### \* IBM powerEN

- \* general purpose cores
- \* specialised cores, soft cores?





Quickpath

......

From programming/tuning viewpoint ... the simplest is already too complex ...



\* Exploit cache coherence

- Memory fences are expensive, increasing core count will make it worse
- \* Fine-grained parallelism is hard to achieve
  - I/O bound problems, High-throughput, Streaming, Irregular DP problems
  - Automatic and assisted parallelisation solves uniform&easy cases
- \* SIMD/GPGPU worsen the scenario
  - Atomic ops in memory (i.e. fences) are still needed
  - Not everything can be described with do independent (a.k.a. map)



Used to implement: parameter sweeping, master-worker, etc.

```
void Emitter () {
                                      int main () {
  for ( i =0; i <streamLen;++i){</pre>
                                         spawn thread( Emitter ) ;
                                         for ( i =0; i <nworkers;++i){</pre>
    task = create_task ();
    queue=SELECT_WORKER_QUEUE();
                                          spawn thread(Worker);
    queue ->PUSH(task);
                                         }
                                        wait_end () ;
                                       }
void Worker() {
 while (!end_of_stream) {
 myqueue ->POP(&task);
 do work(task) ;
                                                  Wn
```

PARAPHRASE

9

### Task farm with POSIX lock/unlock



Number of Cores

10



### \* Under relaxed memory models, using CAS/RW-ops

- nonblocking algorithms
- they perform better than lock-based
- they fence the memory and pay cache coherency reconciliation overhead
- in GPUs ...
  - CAS/atomic ... you have to go to the global memory



## Re-starting from the basics



UNIVERSITA DEGLI STUI DI TORINO

#### \* Reducing the problem to the bare bones

- Producer-Consumer model (streaming)
- Directly control thread blocking using non-blocking synchronisations
- Directly design the "data channel"
  - Having clear how data move in the whole memory hierarchy
- \* Restarting from the FIFO queue



13

## Producer-Consumer



#### \* Producer-Consumer queues

- fundamental data structures in concurrent systems
  - data/message channels synchronization, task scheduling, ...
  - work-stealing mechanisms (e.g. for OpenMP runtime)

#### \* Producer-Consumer vs Mutual Exclusion

- Mutex is inherently more complex (requires deadlock-freedom)
  - require interlocked ops (CAS, ...), that induces memory fences, thus cache invalidation
  - Dekker and Bakery algorithms requires Sequential Consistency
  - Producer Consumer is a cooperative (non cyclic) process
- \* Producer-Consumer vs Transactional Memories (?)
  - To be tested extensively, interesting to understand what happens when data is moved to another core (get an invalidation?)
  - Transactions happens at cache line level (IBM/BlueGene) or blocking decode unit (IBM/PPC x86\_64/ring0 wait on reservation)

### Concurrent queues



- \* Concurrency level
  - SPSC, SPMC, MCSP, MPMC
- \* Internal data structures
  - Array-based, List-based
- \* Size
  - Bounded, Unbounded
- \* Progress guarantees
  - No guarantee (blocking), Obstruction freedom, Lock freedom, Wait freedom

# Blocking vs non-blocking



several task-

based

approaches are

here

\* What are the performance implications of the progress properties ?

- \* For medium/coarse grain applications:
  - Blocking faster than Non-Blocking

\* For fine grain applications:

- Non-Blocking faster than Blocking
- Obstruction-Free faster than Lock-Free faster than Wait-Free
- \* In the general case:

I'm focusing here

6

Stronger properties are harder to maintain

Related Work: Lock-free, CAS-free, wait-free



- \* Single-Producer-Single-Consumer FIFO queues
  - Lamport et al. 1983 Trans. PLS (Sequential consistency only in memory)
  - Higham and Kavalsh. 1997 ISPAN (PICI TSO + proof in memory)
  - Giacomoni et al. 2008 PPoPP (TSO + cache slipping in memory)
  - BatchQueue & MCRingBuffer (TSO, double/multiple-buffering in memory)
- \* Multiple-Producers-Multiple-Consumers FIFO queues
  - Blocking 2-locks Michael and Scott
  - Nonblocking with CAS list-based Michael and Scott (PODC96)
    - Requires deferred reclamation/hazard pointers to avoid ABA problem
  - Nonblocking with CAS array-based Tsigas and Zhang (PAA01)
  - Nonblocking without CAS in memory Cannot be done
  - Nonblocking without CAS with mediator thread FastFlow



Deal with multiple replicas of the same location in different caches



take effect in memory (issued by either the same or different processors/cores)



write(A,3)

# FastFlow SPSC queues



```
push_nonbocking(data) {
 if (NEXT(head) == tail) {
    return EWOULDBLOCK;
 buffer[head] = data;
 head = NEXT(head);
  return 0;
}
pop_nonblocking(data) {
 if (head == tail) {
    return EWOULDBLOCK;
 data = buffer[tail];
 tail = NEXT(tail);
  return 0;
}
```

```
push_nonbocking(data) {
  if (NULL != buffer[head]) {
    return EWOULDBLOCK;
                             (WMB)
  buffer[head] = data;
                           For any
  head = NEXT(head);
                            model
  return 0;
                            weaker
}
                          than TSO
pop_nonblocking(data) {
  data = buffer[tail];
  if (NULL == data) {
    return EWOULDBLOCK;
  buffer[tail] = NULL;
 tail = NEXT(tail);
  return 0;
```

#### Lamport FIFO - 1983

derived from PICI (Higham and Kavalsh, ISPAN 1997) & FastForward (Giacomoni et al, PPoPP 2008)

FastFlow FIFO

## FastFlow SPSC queues



UNIVERSITÀ DEGLI STUDI DI TORINO

\_nonboeking(uucu) if (NEXT(head) == tail) { if (NULL != buffer[head]) { return Enould Lock, I CLUITI ENOULDD CK, (WMB) buffer[head] = datbuffer[head] = data; head = NEXT(head); head = NEXT(head); head and tail are return 0; return 0; mutually invalidated by producer read/write head } } producer and consumer consumer read/write tail I cache miss every push no misses pop\_nonblocking(data) { pop\_nonblocking(data) and pop (at least) data = buffer[tail] if (head == tail) { if (NULL == data) excluding "true" deps return EWOULDBLOCK; return EWOULDBLO extended domain buffer[tail] = NUL data = buffer[tail]; tail = NEXT(tail); tail = NEXT(tail); on void \* return 0; return 0; }

Lamport FIFO - 1983

FastFlow FIFO

derived from PICI (Higham and Kavalsh, ISPAN 1997) & FastForward (Giacomoni et al, PPoPP 2008)











PA

#### Medium grain (5 µS workload) UNIVERSITÀ DEGLI STUDI DI TORINO





4 sockets  $\times$  8 core  $\times$  2 contexts

Xeon E7-4820 @2.0GHz Sandy Bridge 18MB L3 shared cache, 256K L2







4 sockets  $\times$  8 core  $\times$  2 contexts

Xeon E7-4820 @2.0GHz Sandy Bridge 18MB L3 shared cache, 256K L2

MPI shmem impl
is ~190 ns at best
 (D.K. Panda)





#### Layer I: Simple streaming networks UNIVERSITÀ DEGLI STUDI DI TORINO



M.Aldinucci, S. Campa, M. Danelutto, M. Torquati. An Efficient Synchronisation Mechanism for Multi-Core Systems.

EuroPar 2012.

VVed 29

Aug - B3 multicore 14.30-16.00 PARAPHRASE

### Layer I: Simple streaming networks



DI TORINO

DEGLI STUDI

http://www.1024cores.net/home/technologies/fastflow











DI TORINO

DEGLI STUDI

UNIVERSITÀ

new(B) new(C) The graph is now cyclic and needs unbound delete(B) delete(A) · 🖽 🕂 delete(C) new(A) queues to avoid deadlocks new(C) 10M alloc/dealloc (32B) - 1µs tasks - 32-core Intel E7 2Ghz FF allocator 14 Hoard-3.9 libc-6 12 Ш **TBB-4.0** FastFlow 10 Ideal Time (s) \* Faster than posix, often faster than hoard 8 and TBB

- unpublished, but available on sourceforge
- needs lot of comparative testing to be published
- Implements deferred deallocation to avoid \* **ABA** problem

6

4

2

0

12

16

N. of (dealloc) threads

20

24

28

32



UNIVERSITÀ DEGLI STUDI DI TORINO

# FastFlow



## Lock-free and CAS-free?



\* Mutex cannot be done Single-Producer-Single-Consumer (SPSC) can be done

- Producer-Consumer is inherently weaker with respect to Mutex
- It does require the cooperation of partners whereas Mutex does not
- Expressive enough to build a streaming (or dataflow) programming framework
  - MPMC = SPSC + mediator threads
- \* But what about productivity at large scale?
  - Write a program is defining a graph encoding true dependencies ... not really easy

### FastFlow is based on producer-consumer



- Lock-free/fence-free non-blocking synchronisations
- C++ STL-like implementation
- thread-model agnostic (pthreads, QT, windows threads, ...)
- compliant with other synchronisation mechanisms in the business code (e.g. locks and semaphores)

PARAPHRASE

30

### Pattern-based approach: rationaleuniversità degli stud

- \* Abstract parallelism exploitation pattern by parametric code
  - E.g. higher order function, code factories, C++ templates, ...
  - Can composed and nested as programming language constructs + offloading
  - Stream and Data Parallel
- \* Platform independent
  - Implementations on different multi/many-cores
  - Support for hybrid architectures thanks to pattern compositionality
- \* Provide state-of-the-art, parametric implementation of each parallelism exploitation pattern
  - With natural way of extending patterns, i.e. OO
  - Functional (seq code) and tunable extra-functional (QoS) parameters

## Patterns, their implementation, and their purpose



UNIVERSITA DEGLI STUDI DI TORINO

farm start farm ₩ stream[0] stream[1] stream[n] on CPU - master-worker - parallelism exploitation on GPU - CUDA streams - automatic exploitation of asynch comm  $\Sigma$ farm start pipeline \*stream[k] on CPU - pipeline  $copy\_D2H$ kernel on GPU - sequence of kernel calls or global mem synch  $\nabla$ copy\_H2D \*map on CPU - master-worker - parallelism exploitation on GPU - CUDA SIMT - parallelism exploitation reduce \* on CPU - master-worker - parallelism exploitation on GPU - CUDA SIMT (reduction tree) - parallelism exploitation D&C \*on CPU - master-worker with feedback - // exploitation on GPU - working on it, maybe loop+farm 32

# Composition



- \* Composition via C++ template meta-programming
  - CPU: Graph composition
  - GPU: CUDA streams
  - CPU+GPU: offloading
- \* farm{ pipe }

\*

- \* pipe(farm, farm)
- \* pipe(map, reduce)



```
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
using namespace ff;
// generic worker
class Worker: public ff_node {
public:
    void * svc(void * task) {
        int * t = (int *)task;
        std::cout << "Worker " << ff_node::get_my_id()</pre>
                  << " received task " << *t << "\n";</pre>
        return task;
    }
    // I don't need the following functions for this test
    //int svc_init() { return 0; }
    //void svc_end() {}
};
// the gatherer filter
class Collector: public ff_node {
public:
    void * svc(void * task) {
        int * t = (int *)task;
        if (*t == -1) return NULL;
        return task;
    }
};
// the load-balancer filter
class Emitter: public ff_node {
public:
    Emitter(int max_task):ntask(max_task) {};
    void * svc(void *) {
        int * task = new int(ntask);
        --ntask:
        if (ntask<0) return NULL;
        return task;
    }
private:
    int ntask;
};
                                                       PHRASE
```

```
int main(int argc, char * argv[]) {
    if (argc<3) {</pre>
         std::cerr << "use: "</pre>
                   << argv[0]
                   << " nworkers streamlen\n";</pre>
        return -1;
    }
    int nworkers=atoi(argv[1]);
    int streamlen=atoi(argv[2]);
    if (!nworkers || !streamlen) {
        std::cerr << "Wrong parameters values\n";</pre>
         return -1;
    }
    ff_farm<> farm; // farm object
    Emitter E(streamlen);
    farm.add_emitter(&E);
    std::vector<ff_node *> w;
    for(int i=0;i<nworkers;++i) w.push_back(new Worker);</pre>
    farm.add_workers(w); // add all workers to the farm
    Collector C;
    farm.add_collector(&C);
    if (farm.run_and_wait_end()<0) {</pre>
        error("running farm\n");
        return -1;
    }
    std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n";</pre>
    farm.ffStats(std::cerr);
    return 0;
}
```

Wn

DI

34

+ distributed

Applications on multicore, many core & distributed platforms of multicores Efficient and portable - designed with high-level patterns

#### FastFlow



- \* Generic ff\_node is
   subclassed to ff\_dnode
- \* ff\_dnode can support network channels
  - P2P or collective
  - used as frontier node of streaming graph
  - can be used to merge graphs across distributed platforms
- \* No changes to programming model
  - at least require to "add" stub ff\_dnode
  - when passing pointers data is serialised
    - serialisation hand-managed (zero-copy, think to Java!)

M.Aldinucci, S. Campa, M. Danelutto, M. Torquati, P. Kilpatrick. Targeting distributed systems in FastFlow. CGW-Europar 2012

# + OpenCL (working on)



#### PARAPHRASE

UNIVERSITÀ DEGLI STUDI DI TORINO

# FastFlow: data-flow, not task-based

- FastFlow is NOT a task based framework, focus specifically on data movements and synchronizations (shmem/distr/GPU)
- it does not expose the task concept, it rather abstracts:
  - networks of nodes (threads/processes) that can synchronize efficiently (via message passing) and move data (via shared memory or message passing)
  - predefined, OO extendable, composable patterns (i.e. networks of nodes)
- orthogonal way of thinking w.r.t. tasks
  - nodes are pinned to core, no over-provisioning, ...
- it can middleware to build your own task based framework
  - inherit lock-free synchronization mechanisms (that aren't friendly guys)
  - just create an object, and pass the pointer
  - predefined facilities to manage load-balancing, data-placement, OO-extendable





### \* Patterns at the high-level

- Currently as C++ templates
- Set of patterns can be extended, semantics of patterns can be changed, complexity gracefully increase with semantic distance
- \* Used to generate cyclic streaming networks (of threads, ...)
  - Graphs, describing true data dependencies. Can be composed and transformed as graphs
  - Cyclic graphs need unbound queue
  - Heterogeneous cores, thread affinity, memory affinity, NUMA, can be managed while mapping graph onto the metal



\* "Coarse grain concurrency is nearly exhausted"

- \* "It is not about Flops, it is about data movement"
- \* "Programming systems should be designed to support fast data movement and enforce locality"
- \* "Variable coherency & inter-socket messaging"
- \* "Novel computing models are needed"
  - "A computer language is not a computing model. A library is not a computing model."
  - "System programmers should use the techniques they advocate"



## \* Hardware Transactional Memory

- Intel Haswell Transactional Synchronization Extensions (TSX)
- Pragmatically orthogonal and complementary to data-flow
  - Data-flow covers (true) data-dependencies, i.e. data passing
  - Transactional memories covers concurrency in resource access
- \* Intel IvyBridge and PPC BlueGene (and Intel ring0)
  - Wait-on-reservation
    - Blocks context on Instruction Decode
    - Can be used to overcome energy problem for active waiting on lock-free



UNIVERSITÀ DEGLI STUDI DI TORINO

41

# Applications



## 2012: Cholesky fact vs PLASMA libraries IEEE PDP 2012



UNIVERSITÀ DEGLI STUDI DI TORINO

Targeting multi cores by structured programming and data flow

M. Aldmard", L. Ansrita", M. Dassilutto", P. Kilpatrick', M. Torqust?

Dept. Computer Science, 1 Dept. Computer Science, U Dept. Computer Science, Queen

#### Abstract

Data files techniques have been around the sense used in compliers for sequential integrage district they wate also considered as a possible ing atthemut the integral inter seas limited. It has been identified as a condition for efficient programming models to multi-cost arithmetics the burdles of determining data for "macro" programmer, while the complice/van time to foreign scheduling of dees percentions. We diprogramming approach supporting automatic



Figure 10:  $mdf^3$  vs PLASMA library. Cholesky factorization for a single 1024x1024 complex matrix (Intel Nehalem).

## C4.5 (Fine grain - irregular D&C) PKDD 2011





Fig. 11: NAP strategy speedup.

Fig. 12: Speedup of the NAP strategy vs no. of attributes (for 1M sample subset of SuD10M9A).









## Two-phase denoising IEEE IPTA 2012



45



## Two-phase denoising IEEE IPTA 2012





45

## Two-phase denoising IEEE IPTA 2012 (Istanbul, 15-18 Oct)





Original Baboon standard test image 1024x1024

### Restored



PNSR 43.29dB MAE 0.35

PNSR 32.75dB MAE 2.67

PNSR 23.4 MAE 11.21



#### Parallel stochastic sim for system biology IEEE PDP 2011, HiBB 2011, Briefings in Bioinformatics (invited), Bio-IT world (invited), IEEE PDP 2013 (submitted), BMC Bioinformatics



DI TORINO



# Conclusions

- \* FastFlow C++ pattern-based framework
  - A tiny & open research framework (5K lines codebase)
  - 3 years old over 8K downloads 40K web contacts
  - x86/PPC/ARM + Linux/Mac/Win/iOS
    - Adopted as one run-time technology in ParaPhrase EU-FP7 STREP
    - a laboratory to experiment new run-time solutions
      - GPGPU integration (working on), Infiniband RDMA integration (working on), HW blocking reads (thinking on), HW transactional mem (thinking on) ...
      - Stream-specific parallel memory allocator: fast (but still under test)
  - Data-centric, focus on messaging and synchronization, thread model agnostic
  - High-level = performance & portability
    - Speedup starting from ~20 clock cycles workload on standard x86\_64 (TBB >20K)
    - Tested on dozen of apps, comparable or faster than TBB/OpenMP
    - http://di.unito.it/fastflow

| Unbounded Lock-Free Queue for Multi-Core<br>Systems, in: Proc. of Euro-Par 2012, Rhodes<br>Island, Greece. Aug 2012. |
|----------------------------------------------------------------------------------------------------------------------|
| CPU is an Apple A5: ARM Cortex-A9 dual-core<br>800Mhz, L1 cache coherent, with weakly ordered<br>memory consistency  |
| start                                                                                                                |
| Q-size 1024 StreamLen 1000000                                                                                        |
| FF unbound uSPSC 74.399000 (ns)                                                                                      |
| FF bound SPSC 46.206000 (ns)                                                                                         |
| Michael-Scott queue 1355 132000 (ns)                                                                                 |

#### 💶 TIM ᅙ 14:18 100%

#### FastFlow (FF) uSPSC - iPhone 4S

It measures the core-to-core communication latency (per message) using uSPSC unbounded buffer described in:

M. Aldinucci, M. Danelutto, P. Kilpatrick, M. Meneohin and M Torquati An Efficient

DI

# Thank you

- \* Paraphrase
  - Parallel Patterns for Adaptive Heterogeneous Multicore Systems
  - EU-FP7 STREP, 2011-2014, FastFlow is the background technology
- \* IMPACT
  - Innovative Methods for Particle Colliders at the Terascale
  - National, 2012-2015, FastFlow is the background technology
- \* HiPEAC
  - High Performance and Embedded Architecture and Compilation
  - + EU NOE, 2012-2016
- \* BETTY
  - Behavioral Types for Reliable Large-Scale Software Systems
  - EU Cost Action, 2012-2016
- \* CINA
  - CINA: Compositionality, Interaction, Negotiation, Autonomicity for the future ICT society

PARAPHRASE

50

✤ MIUR PRIN, 2012-2014