#include <sys/types.h>
#include <stdlib.h>
#include <vector>
#include <iostream>
#include <ff/cycle.h>
#if defined(USE_PROC_AFFINITY)
#endif
using namespace ff;
typedef unsigned long ff_task_t;
#if defined(USE_PROC_AFFINITY)
const int worker_mapping[] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31};
const int collector_mapping = 0;
const int PHYCORES = 32;
#endif
#if defined(USE_TBB)
#include <tbb/scalable_allocator.h>
static tbb::scalable_allocator<char> * tbballocator=0;
#define ALLOCATOR_INIT() {tbballocator=new tbb::scalable_allocator<char>();}
#define MALLOC(size) (tbballocator->allocate(size))
#define FREE(ptr,size) (tbballocator->deallocate((char *)ptr,size))
#elif defined(USE_CACHED_TBB)
#include <tbb/cache_aligned_allocator.h>
static tbb::cache_aligned_allocator<char> * tbballocator=0;
#include <tbb/cache_aligned_allocator.h>
#define ALLOCATOR_INIT() {tbballocator=new tbb::cache_aligned_allocator<char>();}
#define MALLOC(size) (tbballocator->allocate(size))
#define FREE(ptr,size) (tbballocator->deallocate((char*)ptr,size))
#elif defined(USE_STANDARD)
#define ALLOCATOR_INIT()
#define MALLOC(size) malloc(size)
#define FREE(ptr,size) free(ptr)
#else
#define FF_ALLOCATOR 1
#define ALLOCATOR_INIT()
#define MALLOC(size) (FFAllocator::instance()->malloc(size))
#if defined(DONT_USE_FFA)
#define MAX_NUM_THREADS 64
#define FREE(ptr,id) (MYALLOC[id]->free(ptr))
#else
#define FREE(ptr,unused) (FFAllocator::instance()->free(ptr))
#endif //DONT_USE_FFA
#endif
#if defined(TEST_FFA_MALLOC)
private:
void do_work(ff_task_t * task, int size, long long nticks) {
for(register int i=0;i<size;++i)
task[i]=i;
ticks_wait(nticks);
}
public:
Worker(int itemsize, int ntasks, long long nticks):
itemsize(itemsize),ntasks(ntasks),nticks(nticks) {}
void * svc(void *) {
ff_task_t * task;
for(int i=0;i<ntasks;++i) {
task = (ff_task_t*)MALLOC(itemsize*sizeof(ff_task_t));
bzero(task,itemsize*sizeof(ff_task_t));
do_work(&task[0],itemsize,nticks);
ff_send_out(task);
}
return NULL;
}
private:
int itemsize;
int ntasks;
long long nticks;
};
#else // in this case we test only FFA's frees
#if 1 //this one ...
private:
void do_work(ff_task_t * task, int size, long long nticks) {
for(register int i=0;i<size;++i)
task[i]=i;
ticks_wait(nticks);
}
public:
Worker(int itemsize, int ntasks, long long nticks):
myalloc(NULL),itemsize(itemsize),ntasks(ntasks),nticks(nticks) {
int slab = myalloc->getslabs(itemsize*sizeof(ff_task_t));
int nslabs[N_SLABBUFFER];
if (slab<0) {
if (myalloc->init()<0) abort();
} else {
for(int i=0;i<N_SLABBUFFER;++i) {
if (i==slab) nslabs[i]=8192;
else nslabs[i]=0;
}
if (myalloc->init(nslabs)<0) abort();
}
}
~Worker() {
if (myalloc) delete myalloc;
}
int svc_init() {
#if defined(USE_PROC_AFFINITY)
printf("Cannot map Worker %d CPU %d\n",get_my_id(),
worker_mapping[get_my_id() % PHYCORES]);
#endif
#if defined(FF_ALLOCATOR)
myalloc->registerAllocator();
#if defined(DONT_USE_FFA)
MYALLOC[get_my_id()]=myalloc;
#endif
#endif
return 0;
}
void * svc(void *) {
ff_task_t * task;
for(int i=0;i<ntasks;++i) {
#if defined(FF_ALLOCATOR)
task= (ff_task_t*)myalloc->malloc(itemsize*sizeof(ff_task_t));
#else
task = (ff_task_t*)MALLOC(itemsize*sizeof(ff_task_t));
#endif
memset(task,0,itemsize*sizeof(ff_task_t));
do_work(&task[0],itemsize,nticks);
ff_send_out(task);
}
return NULL;
}
private:
int itemsize;
int ntasks;
long long nticks;
};
#else // and this one.
public:
Worker(int itemsize, int ntasks,long long nticks):
myalloc(NULL),itemsize(itemsize),ntasks(ntasks),nticks(nticks) {
}
~Worker() {
if (myalloc)
}
int svc_init() {
#if defined(USE_PROC_AFFINITY)
printf("Cannot map Worker %d CPU %d\n",get_my_id(),
worker_mapping[get_my_id() % PHYCORES]);
#endif
#if defined(FF_ALLOCATOR)
if (!myalloc) {
error("Worker, newAllocator fails\n");
return -1;
}
#endif
return 0;
}
void * svc(void *) {
ff_task_t * task;
for(int i=0;i<ntasks;++i) {
#if defined(FF_ALLOCATOR)
task= (ff_task_t*)myalloc->malloc(itemsize*sizeof(ff_task_t));
#else
task = (ff_task_t*)MALLOC(itemsize*sizeof(ff_task_t));
#endif
bzero(task,itemsize*sizeof(ff_task_t));
do_work(&task[0],itemsize,nticks);
ff_send_out(task);
}
return NULL;
}
private:
ffa_wrapper * myalloc;
int itemsize;
int ntasks;
long long nticks;
};
#endif
#endif
public:
void broadcast(void * task) {
}
};
public:
Emitter(my_loadbalancer * const lb):lb(lb) {}
void * svc(void * ) {
std::cerr << "Emitter received task\n";
lb->broadcast(GO_ON);
return NULL;
}
private:
my_loadbalancer * lb;
};
public:
Collector(
int itemsize,
ff_gatherer*
const gt):itemsize(itemsize),gt(gt) {}
int svc_init() {
#if defined(USE_PROC_AFFINITY)
printf("Cannot map Collector to CPU %d\n",collector_mapping);
#endif
return 0;
}
void * svc(void * task) {
#if defined(DONT_USE_FFA)
FREE(task, gt->get_channel_id());
#else
FREE(task,itemsize*sizeof(ff_task_t));
#endif
return GO_ON;
}
private:
int itemsize;
};
int main(int argc, char * argv[]) {
unsigned int ntasks = 1000000;
unsigned int itemsize = 16;
int nworkers = 3;
long long nticks = 1000;
if (argc>1) {
if (argc<5) {
std::cerr
<< "use: " << argv[0]
<< " ntasks num-integer-x-item #n nticks\n";
return -1;
}
ntasks = atoi(argv[1]);
itemsize = atoi(argv[2]);
nworkers = atoi(argv[3]);
nticks = strtoll(argv[4],NULL,10);
}
std::cerr << "ticks " << nticks << "\n";
if (nworkers<0 || !ntasks) {
std::cerr << "Wrong parameters values\n";
return -1;
}
ALLOCATOR_INIT();
if (nworkers==0) {
ffTime(START_TIME);
for(unsigned int i=0;i<ntasks;++i) {
void * task= MALLOC(itemsize*sizeof(ff_task_t));
memset(task,0,itemsize*sizeof(ff_task_t));
FREE(task,itemsize*sizeof(ff_task_t));
}
ffTime(STOP_TIME);
std::cerr << "DONE, time= " << ffTime(GET_TIME) << " (ms)\n";
return 0;
}
std::vector<ff_node *> w;
for(int i=0;i<nworkers;++i)
w.push_back(new Worker(itemsize,ntasks/nworkers,nticks));
Collector C(itemsize,farm.
getgt());
error("running farm\n");
return -1;
}
std::cerr <<
"DONE, time= " << farm.
ffTime() <<
" (ms)\n";
return 0;
}