Style for PEO

git-svn-id: svn://scm.gforge.inria.fr/svnroot/paradiseo@906 331e1502-861f-0410-8da2-ba01fb791d7f
This commit is contained in:
canape 2008-01-25 16:14:06 +00:00
commit b74a446baa
82 changed files with 1946 additions and 1663 deletions

View file

@ -1,4 +1,4 @@
/*
/*
* <comm.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -52,29 +52,32 @@ static sem_t sem_comm_init;
static Communicator * the_thread;
Communicator :: Communicator (int * __argc, char * * * __argv) {
Communicator :: Communicator (int * __argc, char * * * __argv)
{
the_thread = this;
the_thread = this;
initNode (__argc, __argv);
loadRMCParameters (* __argc, * __argv);
sem_post (& sem_comm_init);
}
void Communicator :: start () {
void Communicator :: start ()
{
while (true) {
while (true)
{
/* Zzz Zzz Zzz :-))) */
sleep ();
/* Zzz Zzz Zzz :-))) */
sleep ();
sendMessages ();
sendMessages ();
if (! atLeastOneActiveRunner () && ! atLeastOneActiveThread() && allResourcesFree ())
break;
if (! atLeastOneActiveRunner () && ! atLeastOneActiveThread() && allResourcesFree ())
break;
receiveMessages ();
}
receiveMessages ();
}
waitBuffers ();
printDebugMessage ("finalizing");
@ -82,24 +85,28 @@ void Communicator :: start () {
//synchronizeNodes ();
}
void initCommunication () {
void initCommunication ()
{
static bool initializedSemaphore = false;
if (initializedSemaphore) {
sem_destroy(& sem_comm_init);
}
if (initializedSemaphore)
{
sem_destroy(& sem_comm_init);
}
sem_init (& sem_comm_init, 0, 0);
initializedSemaphore = true;
}
void waitNodeInitialization () {
void waitNodeInitialization ()
{
sem_wait (& sem_comm_init);
}
void wakeUpCommunicator () {
void wakeUpCommunicator ()
{
the_thread -> wakeUp ();
}

View file

@ -1,4 +1,4 @@
/*
/*
* <comm.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -40,15 +40,16 @@
#include "../../core/communicable.h"
#include "../../core/reac_thread.h"
class Communicator : public ReactiveThread {
class Communicator : public ReactiveThread
{
public :
public :
/* Ctor */
Communicator (int * __argc, char * * * __argv);
/* Ctor */
Communicator (int * __argc, char * * * __argv);
void start ();
};
void start ();
};
extern void initCommunication ();

View file

@ -1,4 +1,4 @@
/*
/*
* <coop.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -41,42 +41,48 @@
#include "mess.h"
#include "../../core/peo_debug.h"
Runner * Cooperative :: getOwner () {
Runner * Cooperative :: getOwner ()
{
return owner;
}
void Cooperative :: setOwner (Runner & __runner) {
void Cooperative :: setOwner (Runner & __runner)
{
owner = & __runner;
}
void Cooperative :: send (Cooperative * __coop) {
void Cooperative :: send (Cooperative * __coop)
{
:: send (this, getRankOfRunner (__coop -> getOwner () -> getDefinitionID ()), COOP_TAG);
// stop ();
}
void Cooperative :: synchronizeCoopEx () {
void Cooperative :: synchronizeCoopEx ()
{
:: send (this, my_node -> rk_sched, SYNCHRONIZE_REQ_TAG);
}
Cooperative * getCooperative (COOP_ID __key) {
Cooperative * getCooperative (COOP_ID __key)
{
return dynamic_cast <Cooperative *> (getCommunicable (__key));
}
void Cooperative :: notifySending () {
void Cooperative :: notifySending ()
{
//getOwner -> setPassive ();
// resume ();
}
void Cooperative :: notifyReceiving () {
}
void Cooperative :: notifyReceiving ()
{}
void Cooperative :: notifySendingSyncReq () {
}
void Cooperative :: notifySendingSyncReq ()
{}
void Cooperative :: notifySynchronized () {
}
void Cooperative :: notifySynchronized ()
{}

View file

@ -1,4 +1,4 @@
/*
/*
* <mess.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -42,62 +42,69 @@
#include "node.h"
#define MPI_BUF_SIZE 1024*64
static char mpi_buf [MPI_BUF_SIZE];
static int pos_buf;
static std :: vector <char *> act_buf; /* Active buffers */
static std :: vector <MPI_Request *> act_req; /* Active requests */
void initBuffers () {
void initBuffers ()
{
pos_buf = 0;
act_buf.clear ();
act_req.clear ();
}
void cleanBuffers () {
void cleanBuffers ()
{
for (unsigned i = 0; i < act_req.size ();) {
for (unsigned i = 0; i < act_req.size ();)
{
MPI_Status stat ;
int flag ;
MPI_Status stat ;
int flag ;
MPI_Test (act_req [i], & flag, & stat) ;
if (flag) {
MPI_Test (act_req [i], & flag, & stat) ;
if (flag)
{
delete[] act_buf [i] ;
delete act_req [i] ;
act_buf [i] = act_buf.back () ;
act_buf.pop_back () ;
delete[] act_buf [i] ;
delete act_req [i] ;
act_req [i] = act_req.back () ;
act_req.pop_back () ;
act_buf [i] = act_buf.back () ;
act_buf.pop_back () ;
act_req [i] = act_req.back () ;
act_req.pop_back () ;
}
else
i ++;
}
else
i ++;
}
}
void waitBuffers () {
void waitBuffers ()
{
printDebugMessage ("waiting the termination of the asynchronous operations to complete");
for (unsigned i = 0; i < act_req.size (); i ++) {
for (unsigned i = 0; i < act_req.size (); i ++)
{
MPI_Status stat ;
MPI_Status stat ;
MPI_Wait (act_req [i], & stat) ;
MPI_Wait (act_req [i], & stat) ;
delete[] act_buf [i] ;
delete act_req [i] ;
}
delete[] act_buf [i] ;
delete act_req [i] ;
}
}
bool probeMessage (int & __src, int & __tag) {
bool probeMessage (int & __src, int & __tag)
{
int flag;
@ -111,19 +118,22 @@ bool probeMessage (int & __src, int & __tag) {
return flag;
}
void waitMessage () {
void waitMessage ()
{
MPI_Status stat;
MPI_Status stat;
MPI_Probe (MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, & stat);
}
void initMessage () {
void initMessage ()
{
pos_buf = 0;
}
void sendMessage (int __to, int __tag) {
void sendMessage (int __to, int __tag)
{
cleanBuffers ();
act_buf.push_back (new char [pos_buf]);
@ -132,13 +142,15 @@ void sendMessage (int __to, int __tag) {
MPI_Isend (act_buf.back (), pos_buf, MPI_PACKED, __to, __tag, MPI_COMM_WORLD, act_req.back ());
}
void sendMessageToAll (int __tag) {
void sendMessageToAll (int __tag)
{
for (int i = 0; i < getNumberOfNodes (); i ++)
sendMessage (i, __tag);
}
void receiveMessage (int __from, int __tag) {
void receiveMessage (int __from, int __tag)
{
MPI_Status stat;
MPI_Request req;
@ -147,80 +159,93 @@ void receiveMessage (int __from, int __tag) {
MPI_Wait (& req, & stat);
}
void synchronizeNodes () {
void synchronizeNodes ()
{
MPI_Barrier ( MPI_COMM_WORLD );
}
/* Char */
void pack (const char & __c) {
void pack (const char & __c)
{
MPI_Pack ((void *) & __c, 1, MPI_CHAR, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Boolean */
void pack (const bool & __b, int __nitem){
void pack (const bool & __b, int __nitem)
{
MPI_Pack ((void *) & __b, __nitem, MPI_INT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Float */
void pack (const float & __f, int __nitem) {
void pack (const float & __f, int __nitem)
{
MPI_Pack ((void *) & __f, __nitem, MPI_FLOAT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Double */
void pack (const double & __d, int __nitem) {
void pack (const double & __d, int __nitem)
{
MPI_Pack ((void *) & __d, __nitem, MPI_DOUBLE, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Integer */
void pack (const int & __i, int __nitem) {
void pack (const int & __i, int __nitem)
{
MPI_Pack ((void *) & __i, __nitem, MPI_INT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Unsigned int. */
void pack (const unsigned int & __ui, int __nitem) {
void pack (const unsigned int & __ui, int __nitem)
{
MPI_Pack ((void *) & __ui, __nitem, MPI_UNSIGNED, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Short int. */
void pack (const short & __sh, int __nitem) {
void pack (const short & __sh, int __nitem)
{
MPI_Pack ((void *) & __sh, __nitem, MPI_SHORT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Unsigned short */
void pack (const unsigned short & __ush, int __nitem) {
void pack (const unsigned short & __ush, int __nitem)
{
MPI_Pack ((void *) & __ush, __nitem, MPI_UNSIGNED_SHORT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Long */
void pack (const long & __l, int __nitem) {
void pack (const long & __l, int __nitem)
{
MPI_Pack ((void *) & __l, __nitem, MPI_LONG, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Unsigned long */
void pack (const unsigned long & __ul, int __nitem) {
void pack (const unsigned long & __ul, int __nitem)
{
MPI_Pack ((void *) & __ul, __nitem, MPI_UNSIGNED_LONG, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* String */
void pack (const char * __str) {
void pack (const char * __str)
{
int len = strlen (__str) + 1;
MPI_Pack (& len, 1, MPI_INT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
MPI_Pack ((void *) __str, len, MPI_CHAR, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
void pack (const std::string & __str) {
void pack (const std::string & __str)
{
size_t size = __str.size() + 1;
char * buffer = new char[ size ];
@ -230,79 +255,91 @@ void pack (const std::string & __str) {
}
/* Char */
void unpack (char & __c) {
void unpack (char & __c)
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __c, 1, MPI_CHAR, MPI_COMM_WORLD);
}
/* Boolean */
extern void unpack (bool & __b, int __nitem ){
extern void unpack (bool & __b, int __nitem )
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __b, __nitem, MPI_INT, MPI_COMM_WORLD);
}
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __b, __nitem, MPI_INT, MPI_COMM_WORLD);
}
/* Float */
void unpack (float & __f, int __nitem) {
void unpack (float & __f, int __nitem)
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __f, __nitem, MPI_FLOAT, MPI_COMM_WORLD);
}
/* Double */
void unpack (double & __d, int __nitem) {
void unpack (double & __d, int __nitem)
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __d, __nitem, MPI_DOUBLE, MPI_COMM_WORLD);
}
/* Integer */
void unpack (int & __i, int __nitem) {
void unpack (int & __i, int __nitem)
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __i, __nitem, MPI_INT, MPI_COMM_WORLD);
}
/* Unsigned int. */
void unpack (unsigned int & __ui, int __nitem) {
void unpack (unsigned int & __ui, int __nitem)
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __ui, __nitem, MPI_UNSIGNED, MPI_COMM_WORLD);
}
/* Short int. */
void unpack (short & __sh, int __nitem) {
void unpack (short & __sh, int __nitem)
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __sh, __nitem, MPI_SHORT, MPI_COMM_WORLD);
}
/* Unsigned short */
void unpack (unsigned short & __ush, int __nitem) {
void unpack (unsigned short & __ush, int __nitem)
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __ush, __nitem, MPI_UNSIGNED_SHORT, MPI_COMM_WORLD);
}
/* Long */
void unpack (long & __l, int __nitem) {
void unpack (long & __l, int __nitem)
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __l, __nitem, MPI_LONG, MPI_COMM_WORLD);
}
/* Unsigned long */
void unpack (unsigned long & __ul, int __nitem) {
void unpack (unsigned long & __ul, int __nitem)
{
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __ul, __nitem, MPI_UNSIGNED_LONG, MPI_COMM_WORLD);
}
/* String */
void unpack (char * __str) {
void unpack (char * __str)
{
int len;
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & len, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, __str, len, MPI_CHAR, MPI_COMM_WORLD);
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, __str, len, MPI_CHAR, MPI_COMM_WORLD);
}
void unpack (std::string & __str) {
void unpack (std::string & __str)
{
char * buffer;
int len;
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & len, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, buffer, len, MPI_CHAR, MPI_COMM_WORLD);
__str.assign( buffer );
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, buffer, len, MPI_CHAR, MPI_COMM_WORLD);
__str.assign( buffer );
}

View file

@ -1,4 +1,4 @@
/*
/*
* <mess.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007

View file

@ -1,4 +1,4 @@
/*
/*
* <node.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -43,50 +43,57 @@
#include "mess.h"
class MPIThreadedEnv {
class MPIThreadedEnv
{
public:
public:
static void init ( int * __argc, char * * * __argv ) {
static void init ( int * __argc, char * * * __argv )
{
static MPIThreadedEnv mpiThreadedEnv( __argc, __argv );
}
static void finalize () {
static bool finalizedEnvironment = false;
if (! finalizedEnvironment ) {
MPI_Finalize ();
finalizedEnvironment = true;
static MPIThreadedEnv mpiThreadedEnv( __argc, __argv );
}
}
private:
static void finalize ()
{
/* No instance of this class can be created outside its domain! */
MPIThreadedEnv ( int * __argc, char * * * __argv ) {
static bool finalizedEnvironment = false;
static bool MPIThreadedEnvInitialized = false;
int provided = 1;
if (! finalizedEnvironment )
{
if (! MPIThreadedEnvInitialized) {
MPI_Init_thread (__argc, __argv, MPI_THREAD_FUNNELED, & provided);
assert (provided == MPI_THREAD_FUNNELED); /* The MPI implementation must be multi-threaded.
Yet, only one thread performs the comm.
operations */
MPIThreadedEnvInitialized = true;
MPI_Finalize ();
finalizedEnvironment = true;
}
}
}
~MPIThreadedEnv() {
private:
finalize ();
}
};
/* No instance of this class can be created outside its domain! */
MPIThreadedEnv ( int * __argc, char * * * __argv )
{
static bool MPIThreadedEnvInitialized = false;
int provided = 1;
if (! MPIThreadedEnvInitialized)
{
MPI_Init_thread (__argc, __argv, MPI_THREAD_FUNNELED, & provided);
assert (provided == MPI_THREAD_FUNNELED); /* The MPI implementation must be multi-threaded.
Yet, only one thread performs the comm.
operations */
MPIThreadedEnvInitialized = true;
}
}
~MPIThreadedEnv()
{
finalize ();
}
};
static int rk, sz; /* Rank & size */
@ -96,27 +103,32 @@ static std :: map <std :: string, int> name_to_rk;
static std :: vector <std :: string> rk_to_name;
int getNodeRank () {
int getNodeRank ()
{
return rk;
}
int getNumberOfNodes () {
int getNumberOfNodes ()
{
return sz;
}
void collectiveCountOfRunners ( unsigned int* num_local_exec_runners, unsigned int* num_exec_runners ) {
void collectiveCountOfRunners ( unsigned int* num_local_exec_runners, unsigned int* num_exec_runners )
{
MPI_Allreduce( num_local_exec_runners, num_exec_runners, 1, MPI_UNSIGNED, MPI_SUM, MPI_COMM_WORLD );
}
int getRankFromName (const std :: string & __name) {
int getRankFromName (const std :: string & __name)
{
return atoi (__name.c_str ());
return atoi (__name.c_str ());
}
void initNode (int * __argc, char * * * __argv) {
void initNode (int * __argc, char * * * __argv)
{
rk_to_name.clear ();
name_to_rk.clear ();
@ -130,12 +142,13 @@ void initNode (int * __argc, char * * * __argv) {
char names [sz] [MPI_MAX_PROCESSOR_NAME];
int len;
/* Processor names */
MPI_Get_processor_name (names [0], & len); /* Me */
/* Processor names */
MPI_Get_processor_name (names [0], & len); /* Me */
MPI_Allgather (names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, MPI_COMM_WORLD); /* Broadcast */
for (int i = 0; i < sz; i ++) {
rk_to_name.push_back (names [i]);
name_to_rk [names [i]] = i;
}
for (int i = 0; i < sz; i ++)
{
rk_to_name.push_back (names [i]);
name_to_rk [names [i]] = i;
}
}

View file

@ -1,4 +1,4 @@
/*
/*
* <node.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -44,15 +44,16 @@
typedef int RANK_ID;
struct Node {
struct Node
{
RANK_ID rk; /* Rank */
std :: string name; /* Host name */
unsigned num_workers; /* Number of parallel workers */
int rk_sched; /* rank of the scheduler */
std :: vector <RUNNER_ID> id_run; /* List of runner def. IDs */
std :: vector <RUNNER_ID> execution_id_run; /* List of runtime execution runner IDs */
};
RANK_ID rk; /* Rank */
std :: string name; /* Host name */
unsigned num_workers; /* Number of parallel workers */
int rk_sched; /* rank of the scheduler */
std :: vector <RUNNER_ID> id_run; /* List of runner def. IDs */
std :: vector <RUNNER_ID> execution_id_run; /* List of runtime execution runner IDs */
};
extern Node * my_node;

View file

@ -1,4 +1,4 @@
/*
/*
* <param.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -38,7 +38,8 @@
#include "schema.h"
void loadRMCParameters (int & __argc, char * * & __argv) {
void loadRMCParameters (int & __argc, char * * & __argv)
{
eoParser parser (__argc, __argv);

View file

@ -1,4 +1,4 @@
/*
/*
* <param.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007

View file

@ -1,4 +1,4 @@
/*
/*
* <recv.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -45,106 +45,112 @@
#include "../../core/cooperative.h"
#include "../../core/peo_debug.h"
void receiveMessages () {
void receiveMessages ()
{
cleanBuffers ();
do {
do
{
if (! atLeastOneActiveThread ()) {
waitMessage ();
}
int src, tag;
while (probeMessage (src, tag)) {
receiveMessage (src, tag);
initMessage ();
switch (tag) {
case RUNNER_STOP_TAG:
unpackTerminationOfRunner ();
break;
case SYNCHRONIZE_REQ_TAG:
unpackSynchronRequest ();
break;
case SYNCHRONIZED_TAG:
if (! atLeastOneActiveThread ())
{
RUNNER_ID runner_id;
unpack (runner_id);
COOP_ID coop_id;
unpack (coop_id);
getCooperative (coop_id) -> notifySynchronized ();
break;
waitMessage ();
}
case COOP_TAG:
COOP_ID coop_id;
unpack (coop_id);
getCooperative (coop_id) -> unpack ();
getCooperative (coop_id) -> notifyReceiving ();
break;
int src, tag;
case SCHED_REQUEST_TAG:
unpackResourceRequest ();
break;
case SCHED_RESULT_TAG:
while (probeMessage (src, tag))
{
/* Unpacking the resource */
SERVICE_ID serv_id;
unpack (serv_id);
Service * serv = getService (serv_id);
int dest;
unpack (dest);
WORKER_ID worker_id;
unpack (worker_id);
/* Going back ... */
receiveMessage (src, tag);
initMessage ();
pack (worker_id);
pack (serv_id);
serv -> packData ();
serv -> notifySendingData ();
sendMessage (dest, TASK_DATA_TAG);
break;
switch (tag)
{
case RUNNER_STOP_TAG:
unpackTerminationOfRunner ();
break;
case SYNCHRONIZE_REQ_TAG:
unpackSynchronRequest ();
break;
case SYNCHRONIZED_TAG:
{
RUNNER_ID runner_id;
unpack (runner_id);
COOP_ID coop_id;
unpack (coop_id);
getCooperative (coop_id) -> notifySynchronized ();
break;
}
case COOP_TAG:
COOP_ID coop_id;
unpack (coop_id);
getCooperative (coop_id) -> unpack ();
getCooperative (coop_id) -> notifyReceiving ();
break;
case SCHED_REQUEST_TAG:
unpackResourceRequest ();
break;
case SCHED_RESULT_TAG:
{
/* Unpacking the resource */
SERVICE_ID serv_id;
unpack (serv_id);
Service * serv = getService (serv_id);
int dest;
unpack (dest);
WORKER_ID worker_id;
unpack (worker_id);
/* Going back ... */
initMessage ();
pack (worker_id);
pack (serv_id);
serv -> packData ();
serv -> notifySendingData ();
sendMessage (dest, TASK_DATA_TAG);
break;
}
case TASK_DATA_TAG:
{
WORKER_ID worker_id;
unpack (worker_id);
Worker * worker = getWorker (worker_id);
worker -> setSource (src);
worker -> unpackData ();
worker -> wakeUp ();
break;
}
case TASK_RESULT_TAG:
{
SERVICE_ID serv_id;
unpack (serv_id);
Service * serv = getService (serv_id);
serv -> unpackResult ();
break;
}
case TASK_DONE_TAG:
unpackTaskDone ();
break;
default:
;
};
}
case TASK_DATA_TAG:
{
WORKER_ID worker_id;
unpack (worker_id);
Worker * worker = getWorker (worker_id);
worker -> setSource (src);
worker -> unpackData ();
worker -> wakeUp ();
break;
}
case TASK_RESULT_TAG:
{
SERVICE_ID serv_id;
unpack (serv_id);
Service * serv = getService (serv_id);
serv -> unpackResult ();
break;
}
case TASK_DONE_TAG:
unpackTaskDone ();
break;
default:
;
};
}
} while ( ! atLeastOneActiveThread () && atLeastOneActiveRunner () /*&& ! allResourcesFree ()*/ );
while ( ! atLeastOneActiveThread () && atLeastOneActiveRunner () /*&& ! allResourcesFree ()*/ );
}

View file

@ -1,4 +1,4 @@
/*
/*
* <recv.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007

View file

@ -1,4 +1,4 @@
/*
/*
* <rmc.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -46,18 +46,21 @@ static std :: vector <Worker *> worker_threads; /* Worker threads */
static Communicator* communicator_thread = NULL; /* Communicator thread */
void runRMC () {
void runRMC ()
{
/* Worker(s) ? */
for (unsigned i = 0; i < my_node -> num_workers; i ++) {
worker_threads.push_back (new Worker);
addThread (worker_threads.back(), ll_threads);
}
for (unsigned i = 0; i < my_node -> num_workers; i ++)
{
worker_threads.push_back (new Worker);
addThread (worker_threads.back(), ll_threads);
}
wakeUpCommunicator ();
}
void initRMC (int & __argc, char * * & __argv) {
void initRMC (int & __argc, char * * & __argv)
{
/* Communication */
initCommunication ();
@ -71,14 +74,16 @@ void initRMC (int & __argc, char * * & __argv) {
initScheduler ();
}
void finalizeRMC () {
void finalizeRMC ()
{
printDebugMessage ("before join threads RMC");
joinThreads (ll_threads);
for (unsigned i = 0; i < worker_threads.size(); i++ ) {
delete worker_threads [i];
}
for (unsigned i = 0; i < worker_threads.size(); i++ )
{
delete worker_threads [i];
}
worker_threads.clear ();
delete communicator_thread;

View file

@ -1,4 +1,4 @@
/*
/*
* <runner.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -43,7 +43,8 @@
#include "schema.h"
bool Runner :: isAssignedLocally () {
bool Runner :: isAssignedLocally ()
{
for (unsigned i = 0; i < my_node -> id_run.size (); i ++)
if (my_node -> id_run [i] == def_id)
@ -51,12 +52,14 @@ bool Runner :: isAssignedLocally () {
return false;
}
void Runner :: terminate () {
void Runner :: terminate ()
{
sendToAll (this, RUNNER_STOP_TAG);
}
void Runner :: packTermination () {
void Runner :: packTermination ()
{
pack (def_id);
}

View file

@ -1,4 +1,4 @@
/*
/*
* <scheduler.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -49,52 +49,59 @@ static unsigned initNumberOfRes = 0;
extern void wakeUpCommunicator();
void initScheduler () {
void initScheduler ()
{
resources = std :: queue <SCHED_RESOURCE> ();
requests = std :: queue <SCHED_REQUEST> ();
initNumberOfRes = 0;
for (unsigned i = 0; i < the_schema.size (); i ++) {
for (unsigned i = 0; i < the_schema.size (); i ++)
{
const Node & node = the_schema [i];
const Node & node = the_schema [i];
if (node.rk_sched == my_node -> rk)
for (unsigned j = 0; j < node.num_workers; j ++)
resources.push (std :: pair <RANK_ID, WORKER_ID> (i, j + 1));
}
if (node.rk_sched == my_node -> rk)
for (unsigned j = 0; j < node.num_workers; j ++)
resources.push (std :: pair <RANK_ID, WORKER_ID> (i, j + 1));
}
initNumberOfRes = resources.size ();
}
bool allResourcesFree () {
bool allResourcesFree ()
{
return resources.size () == initNumberOfRes;
}
unsigned numResourcesFree () {
unsigned numResourcesFree ()
{
return resources.size ();
}
static void update () {
static void update ()
{
unsigned num_alloc = std :: min (resources.size (), requests.size ());
for (unsigned i = 0; i < num_alloc; i ++) {
for (unsigned i = 0; i < num_alloc; i ++)
{
SCHED_REQUEST req = requests.front ();
requests.pop ();
SCHED_REQUEST req = requests.front ();
requests.pop ();
SCHED_RESOURCE res = resources.front ();
resources.pop ();
SCHED_RESOURCE res = resources.front ();
resources.pop ();
printDebugMessage ("allocating a resource.");
initMessage ();
pack (req.second);
pack (res);
sendMessage (req.first, SCHED_RESULT_TAG);
}
printDebugMessage ("allocating a resource.");
initMessage ();
pack (req.second);
pack (res);
sendMessage (req.first, SCHED_RESULT_TAG);
}
}
void unpackResourceRequest () {
void unpackResourceRequest ()
{
printDebugMessage ("queuing a resource request.");
SCHED_REQUEST req;
@ -103,7 +110,8 @@ void unpackResourceRequest () {
update ();
}
void unpackTaskDone () {
void unpackTaskDone ()
{
printDebugMessage ("I'm notified a worker is now idle.");
SCHED_RESOURCE res;

View file

@ -1,4 +1,4 @@
/*
/*
* <scheduler.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -53,7 +53,7 @@ extern void initScheduler ();
extern void unpackResourceRequest ();
/* Being known a worker is now idle :-) */
extern void unpackTaskDone ();
extern void unpackTaskDone ();
extern bool allResourcesFree ();

View file

@ -1,4 +1,4 @@
/*
/*
* <schema.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -51,17 +51,19 @@ Node * my_node;
static unsigned maxSpecifiedRunnerID = 0;
RANK_ID getRankOfRunner (RUNNER_ID __key) {
RANK_ID getRankOfRunner (RUNNER_ID __key)
{
for (unsigned i = 0; i < the_schema.size (); i ++)
for (unsigned j = 0; j < the_schema [i].id_run.size (); j ++)
if (the_schema [i].id_run [j] == __key)
return the_schema [i].rk;
assert (false);
return 0;
return 0;
}
static void loadNode (int __rk_sched) {
static void loadNode (int __rk_sched)
{
Node node;
@ -72,55 +74,62 @@ static void loadNode (int __rk_sched) {
/* ATT: num_workers */
node.num_workers = atoi (getAttributeValue ("num_workers").c_str ());
while (true) {
while (true)
{
/* TAG: <runner> | </node> */
std :: string name = getNextNode ();
assert (name == "runner" || name == "node");
if (name == "runner") {
/* TAG: </node> */
node.id_run.push_back (atoi (getNextNode ().c_str ()));
if ( node.id_run.back() > maxSpecifiedRunnerID )
maxSpecifiedRunnerID = node.id_run.back();
/* TAG: </runner> */
assert (getNextNode () == "runner");
/* TAG: <runner> | </node> */
std :: string name = getNextNode ();
assert (name == "runner" || name == "node");
if (name == "runner")
{
/* TAG: </node> */
node.id_run.push_back (atoi (getNextNode ().c_str ()));
if ( node.id_run.back() > maxSpecifiedRunnerID )
maxSpecifiedRunnerID = node.id_run.back();
/* TAG: </runner> */
assert (getNextNode () == "runner");
}
else
{
/* TAG: </node> */
node.execution_id_run = node.id_run;
the_schema.push_back (node);
break;
}
}
else {
/* TAG: </node> */
node.execution_id_run = node.id_run;
the_schema.push_back (node);
break;
}
}
}
static void loadGroup () {
static void loadGroup ()
{
std :: string name;
/* ATT: scheduler*/
int rk_sched = getRankFromName (getAttributeValue ("scheduler"));
while (true) {
while (true)
{
/* TAG: <node> | </group> */
name = getNextNode ();
assert (name == "node" || name == "group");
if (name == "node")
/* TAG: <node> */
loadNode (rk_sched);
else
/* TAG: </group> */
break;
}
/* TAG: <node> | </group> */
name = getNextNode ();
assert (name == "node" || name == "group");
if (name == "node")
/* TAG: <node> */
loadNode (rk_sched);
else
/* TAG: </group> */
break;
}
}
bool isScheduleNode () {
bool isScheduleNode ()
{
return my_node -> rk == my_node -> rk_sched;
}
void loadSchema (const char * __filename) {
void loadSchema (const char * __filename)
{
openXMLDocument (__filename);
@ -133,37 +142,43 @@ void loadSchema (const char * __filename) {
the_schema.clear();
maxSpecifiedRunnerID = 0;
while (true) {
while (true)
{
/* TAG: <group> | </schema> */
name = getNextNode ();
assert (name == "group" || name == "schema");
if (name == "group")
/* TAG: <group> */
loadGroup ();
else
/* TAG: </schema> */
break;
}
std :: set<unsigned> uniqueRunnerIDs; unsigned nbUniqueIDs = 0;
for (unsigned i = 0; i < the_schema.size (); i ++) {
for (unsigned j = 0; j < the_schema [i].id_run.size(); j ++) {
uniqueRunnerIDs.insert( the_schema [i].id_run[j] );
/* In case a duplicate ID has been found */
if ( uniqueRunnerIDs.size() == nbUniqueIDs ) {
the_schema [i].execution_id_run[j] = ++maxSpecifiedRunnerID;
}
nbUniqueIDs = uniqueRunnerIDs.size();
/* TAG: <group> | </schema> */
name = getNextNode ();
assert (name == "group" || name == "schema");
if (name == "group")
/* TAG: <group> */
loadGroup ();
else
/* TAG: </schema> */
break;
}
std :: set<unsigned> uniqueRunnerIDs;
unsigned nbUniqueIDs = 0;
for (unsigned i = 0; i < the_schema.size (); i ++)
{
for (unsigned j = 0; j < the_schema [i].id_run.size(); j ++)
{
uniqueRunnerIDs.insert( the_schema [i].id_run[j] );
/* In case a duplicate ID has been found */
if ( uniqueRunnerIDs.size() == nbUniqueIDs )
{
the_schema [i].execution_id_run[j] = ++maxSpecifiedRunnerID;
}
nbUniqueIDs = uniqueRunnerIDs.size();
}
}
}
/* Looking for my node */
for (unsigned i = 0; i < the_schema.size (); i ++) {
if (the_schema [i].rk == getNodeRank ())
my_node = & (the_schema [i]);
}
for (unsigned i = 0; i < the_schema.size (); i ++)
{
if (the_schema [i].rk == getNodeRank ())
my_node = & (the_schema [i]);
}
/* About me */
@ -175,16 +190,18 @@ void loadSchema (const char * __filename) {
if (isScheduleNode ())
printDebugMessage ("I'am a scheduler");
for (unsigned i = 0; i < my_node -> id_run.size (); i ++) {
sprintf (mess, "I manage the runner %d", my_node -> id_run [i]);
printDebugMessage (mess);
}
for (unsigned i = 0; i < my_node -> id_run.size (); i ++)
{
sprintf (mess, "I manage the runner %d", my_node -> id_run [i]);
printDebugMessage (mess);
}
if (my_node -> num_workers) {
if (my_node -> num_workers)
{
sprintf (mess, "I manage %d worker(s)", my_node -> num_workers);
printDebugMessage (mess);
}
sprintf (mess, "I manage %d worker(s)", my_node -> num_workers);
printDebugMessage (mess);
}
closeXMLDocument ();
}

View file

@ -1,4 +1,4 @@
/*
/*
* <schema.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007

View file

@ -1,4 +1,4 @@
/*
/*
* <send.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -50,13 +50,15 @@
#define TO_ALL -1
typedef struct {
typedef struct
{
Communicable * comm;
int to;
int tag;
Communicable * comm;
int to;
int tag;
} SEND_REQUEST;
}
SEND_REQUEST;
static std :: queue <SEND_REQUEST> mess;
@ -66,15 +68,17 @@ static sem_t sem_send;
static bool contextInitialized = false;
void initSending () {
void initSending ()
{
static bool initializedSemaphore = false;
mess = std :: queue <SEND_REQUEST> ();
if (initializedSemaphore) {
sem_destroy(& sem_send);
}
if (initializedSemaphore)
{
sem_destroy(& sem_send);
}
sem_init (& sem_send, 0, 1);
initializedSemaphore = true;
@ -82,9 +86,10 @@ void initSending () {
contextInitialized = false;
}
void send (Communicable * __comm, int __to, int __tag) {
void send (Communicable * __comm, int __to, int __tag)
{
SEND_REQUEST req;
SEND_REQUEST req;
req.comm = __comm;
req.to = __to;
req.tag = __tag;
@ -95,74 +100,79 @@ void send (Communicable * __comm, int __to, int __tag) {
wakeUpCommunicator ();
}
void sendToAll (Communicable * __comm, int __tag) {
void sendToAll (Communicable * __comm, int __tag)
{
send (__comm, TO_ALL, __tag);
}
extern void initializeContext ();
void sendMessages () {
void sendMessages ()
{
if (! contextInitialized) {
contextInitialized = true;
initializeContext();
}
if (! contextInitialized)
{
contextInitialized = true;
initializeContext();
}
sem_wait (& sem_send);
while (! mess.empty ()) {
while (! mess.empty ())
{
SEND_REQUEST req = mess.front ();
SEND_REQUEST req = mess.front ();
Communicable * comm = req.comm;
Communicable * comm = req.comm;
initMessage ();
initMessage ();
switch (req.tag) {
switch (req.tag)
{
case RUNNER_STOP_TAG:
dynamic_cast <Runner *> (comm) -> packTermination ();
dynamic_cast <Runner *> (comm) -> notifySendingTermination ();
break;
case RUNNER_STOP_TAG:
dynamic_cast <Runner *> (comm) -> packTermination ();
dynamic_cast <Runner *> (comm) -> notifySendingTermination ();
break;
case COOP_TAG:
dynamic_cast <Cooperative *> (comm) -> pack ();
dynamic_cast <Cooperative *> (comm) -> notifySending ();
break;
case COOP_TAG:
dynamic_cast <Cooperative *> (comm) -> pack ();
dynamic_cast <Cooperative *> (comm) -> notifySending ();
break;
case SYNCHRONIZE_REQ_TAG:
dynamic_cast <Cooperative *> (comm) -> packSynchronizeReq ();
dynamic_cast <Cooperative *> (comm) -> notifySendingSyncReq ();
break;
case SYNCHRONIZE_REQ_TAG:
dynamic_cast <Cooperative *> (comm) -> packSynchronizeReq ();
dynamic_cast <Cooperative *> (comm) -> notifySendingSyncReq ();
break;
case SCHED_REQUEST_TAG:
dynamic_cast <Service *> (comm) -> packResourceRequest ();
dynamic_cast <Service *> (comm) -> notifySendingResourceRequest ();
break;
case SCHED_REQUEST_TAG:
dynamic_cast <Service *> (comm) -> packResourceRequest ();
dynamic_cast <Service *> (comm) -> notifySendingResourceRequest ();
break;
case TASK_RESULT_TAG:
dynamic_cast <Worker *> (comm) -> packResult ();
dynamic_cast <Worker *> (comm) -> notifySendingResult ();
break;
case TASK_RESULT_TAG:
dynamic_cast <Worker *> (comm) -> packResult ();
dynamic_cast <Worker *> (comm) -> notifySendingResult ();
break;
case TASK_DONE_TAG:
dynamic_cast <Worker *> (comm) -> packTaskDone ();
dynamic_cast <Worker *> (comm) -> notifySendingTaskDone ();
break;
case TASK_DONE_TAG:
dynamic_cast <Worker *> (comm) -> packTaskDone ();
dynamic_cast <Worker *> (comm) -> notifySendingTaskDone ();
break;
default :
break;
default :
break;
};
};
if (req.to == TO_ALL)
sendMessageToAll (req.tag);
else
sendMessage (req.to, req.tag);
if (req.to == TO_ALL)
sendMessageToAll (req.tag);
else
sendMessage (req.to, req.tag);
mess.pop ();
}
mess.pop ();
}
sem_post (& sem_send);
}

View file

@ -1,4 +1,4 @@
/*
/*
* <send.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007

View file

@ -1,4 +1,4 @@
/*
/*
* <service.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -41,14 +41,16 @@
#include "send.h"
#include "scheduler.h"
void Service :: requestResourceRequest (unsigned __how_many) {
void Service :: requestResourceRequest (unsigned __how_many)
{
num_sent_rr = __how_many;
for (unsigned i = 0; i < __how_many; i ++)
send (this, my_node -> rk_sched, SCHED_REQUEST_TAG);
}
void Service :: packResourceRequest () {
void Service :: packResourceRequest ()
{
SCHED_REQUEST req;
req.first = getNodeRank ();

View file

@ -1,4 +1,4 @@
/*
/*
* <scheduler.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -50,26 +50,30 @@ extern void wakeUpCommunicator();
extern RANK_ID getRankOfRunner (RUNNER_ID __key);
/* Initializing the list of runners to be synchronized */
void initSynchron () {
void initSynchron ()
{
syncRunners = SYNC();
}
/* packing a synchronization request from a service */
void packSynchronRequest ( const std :: vector <Cooperative *>& coops ) {
void packSynchronRequest ( const std :: vector <Cooperative *>& coops )
{
/* Number of coops to synchronize */
pack( (unsigned)( coops.size() ) );
/* Coops to synchronize */
for (unsigned i = 0; i < coops.size(); i ++) {
pack( coops[ i ]->getOwner()->getDefinitionID() );
pack( coops[ i ]->getKey() );
}
for (unsigned i = 0; i < coops.size(); i ++)
{
pack( coops[ i ]->getOwner()->getDefinitionID() );
pack( coops[ i ]->getKey() );
}
}
/* Processing a synchronization request from a service */
void unpackSynchronRequest () {
void unpackSynchronRequest ()
{
unsigned req_num_entries;
unpack (req_num_entries);
@ -79,45 +83,50 @@ void unpackSynchronRequest () {
/* Adding entries for each of the runners to be synchronized */
SyncEntry req_entry;
for (unsigned i = 0; i < req_num_entries; i ++) {
for (unsigned i = 0; i < req_num_entries; i ++)
{
unpack (req_entry.runner);
unpack (req_entry.coop);
unpack (req_entry.runner);
unpack (req_entry.coop);
req_sync.first.push_back (req_entry);
}
req_sync.first.push_back (req_entry);
}
/* Looking for the sync vector */
SYNC::iterator sync_it = syncRunners.find (req_sync);
/* The vector does not exist - insert a new sync */
if (sync_it == syncRunners.end ()) {
req_sync.second = 1;
syncRunners.insert (req_sync);
}
else {
/* The vector exists - updating the entry */
std::pair< SYNC_RUNNERS, unsigned >& sync_req_entry = const_cast< std::pair< SYNC_RUNNERS, unsigned >& > (*sync_it);
sync_req_entry.second ++;
/* All the runners to be synchronized sent the SYNC_REQUEST signal */
if (sync_req_entry.second == sync_req_entry.first.size()) {
/* Remove the entry */
syncRunners.erase (sync_it);
/* Send SYNCHRONIZED signals to all the coop objects */
for (unsigned i = 0; i < req_sync.first.size(); i ++) {
initMessage ();
pack (req_sync.first [i].runner);
pack (req_sync.first [i].coop);
RANK_ID dest_rank = getRankOfRunner (req_sync.first [i].runner);
sendMessage (dest_rank, SYNCHRONIZED_TAG);
}
if (sync_it == syncRunners.end ())
{
req_sync.second = 1;
syncRunners.insert (req_sync);
}
else
{
/* The vector exists - updating the entry */
std::pair< SYNC_RUNNERS, unsigned >& sync_req_entry = const_cast< std::pair< SYNC_RUNNERS, unsigned >& > (*sync_it);
sync_req_entry.second ++;
/* All the runners to be synchronized sent the SYNC_REQUEST signal */
if (sync_req_entry.second == sync_req_entry.first.size())
{
/* Remove the entry */
syncRunners.erase (sync_it);
/* Send SYNCHRONIZED signals to all the coop objects */
for (unsigned i = 0; i < req_sync.first.size(); i ++)
{
initMessage ();
pack (req_sync.first [i].runner);
pack (req_sync.first [i].coop);
RANK_ID dest_rank = getRankOfRunner (req_sync.first [i].runner);
sendMessage (dest_rank, SYNCHRONIZED_TAG);
}
}
}
}
}

View file

@ -1,4 +1,4 @@
/*
/*
* <synchron.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -44,31 +44,39 @@
#include "../../core/runner.h"
#include "../../core/cooperative.h"
struct SyncEntry {
struct SyncEntry
{
RUNNER_ID runner;
COOP_ID coop;
};
RUNNER_ID runner;
COOP_ID coop;
};
struct SyncCompare {
struct SyncCompare
{
bool operator()( const std::pair< std::vector< SyncEntry >, unsigned >& A, const std::pair< std::vector< SyncEntry >, unsigned >& B ) {
bool operator()( const std::pair< std::vector< SyncEntry >, unsigned >& A, const std::pair< std::vector< SyncEntry >, unsigned >& B )
{
const std::vector< SyncEntry >& syncA = A.first;
const std::vector< SyncEntry >& syncB = B.first;
const std::vector< SyncEntry >& syncA = A.first;
const std::vector< SyncEntry >& syncB = B.first;
if ( syncA.size() == syncB.size() ) {
std::vector< SyncEntry >::const_iterator itA = syncA.begin();
std::vector< SyncEntry >::const_iterator itB = syncB.begin();
if ( syncA.size() == syncB.size() )
{
std::vector< SyncEntry >::const_iterator itA = syncA.begin();
std::vector< SyncEntry >::const_iterator itB = syncB.begin();
while ( (*itA).runner < (*itB).runner && itA != syncA.end() ) { itA++; itB++; }
while ( (*itA).runner < (*itB).runner && itA != syncA.end() )
{
itA++;
itB++;
}
return itA == syncA.end();
return itA == syncA.end();
}
return syncA.size() < syncB.size();
}
return syncA.size() < syncB.size();
}
};
};
typedef std::vector< SyncEntry > SYNC_RUNNERS;
typedef std::set< std::pair< SYNC_RUNNERS, unsigned >, SyncCompare > SYNC;

View file

@ -1,4 +1,4 @@
/*
/*
* <tags.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007

View file

@ -1,4 +1,4 @@
/*
/*
* <worker.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -48,12 +48,14 @@ static std :: vector <Worker *> key_to_worker (1); /* Vector of registered worke
extern void wakeUpCommunicator ();
Worker * getWorker (WORKER_ID __key) {
Worker * getWorker (WORKER_ID __key)
{
return key_to_worker [__key];
}
Worker :: Worker () {
Worker :: Worker ()
{
recvAndCompleted = false;
taskAssigned = 0;
@ -63,69 +65,79 @@ Worker :: Worker () {
sem_init( &sem_task_done, 0, 0 );
}
void Worker :: packResult () {
void Worker :: packResult ()
{
pack (serv_id);
serv -> packResult ();
}
void Worker :: unpackData () {
void Worker :: unpackData ()
{
taskAssigned ++;
printDebugMessage ("unpacking the ID. of the service.");
unpack (serv_id);
serv = getService (serv_id);
serv = getService (serv_id);
printDebugMessage ("found the service.");
serv -> unpackData ();
serv -> unpackData ();
printDebugMessage ("unpacking the data.");
setActive ();
}
void Worker :: packTaskDone () {
void Worker :: packTaskDone ()
{
pack (getNodeRank ());
pack (id);
}
void Worker :: notifySendingResult () {
void Worker :: notifySendingResult ()
{
/* Notifying the scheduler of the termination */
recvAndCompleted = true;
wakeUp ();
}
void Worker :: notifySendingTaskDone () {
void Worker :: notifySendingTaskDone ()
{
sem_post(&sem_task_done);
setPassive ();
}
void Worker :: setSource (int __rank) {
void Worker :: setSource (int __rank)
{
src = __rank;
}
void Worker :: start () {
void Worker :: start ()
{
while (true) {
while (true)
{
sleep ();
sleep ();
if (! atLeastOneActiveRunner () && ! taskAssigned)
break;
if (! atLeastOneActiveRunner () && ! taskAssigned)
break;
if (recvAndCompleted) {
send (this, my_node -> rk_sched, TASK_DONE_TAG);
recvAndCompleted = false;
sem_wait(&sem_task_done);
taskAssigned --;
if (recvAndCompleted)
{
send (this, my_node -> rk_sched, TASK_DONE_TAG);
recvAndCompleted = false;
sem_wait(&sem_task_done);
taskAssigned --;
}
else
{
serv -> execute ();
send (this, src, TASK_RESULT_TAG);
}
}
else {
serv -> execute ();
send (this, src, TASK_RESULT_TAG);
}
}
printDebugMessage ("Worker finished execution.");
setPassive ();
@ -133,7 +145,8 @@ void Worker :: start () {
wakeUpCommunicator();
}
void initWorkersEnv () {
void initWorkersEnv ()
{
key_to_worker.resize (1);
}

View file

@ -1,4 +1,4 @@
/*
/*
* <worker.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -42,41 +42,42 @@
#include "../../core/service.h"
typedef unsigned WORKER_ID;
typedef unsigned WORKER_ID;
class Worker : public Communicable, public ReactiveThread {
class Worker : public Communicable, public ReactiveThread
{
public :
public :
Worker ();
Worker ();
void start ();
void start ();
void packResult ();
void packResult ();
void unpackData ();
void unpackData ();
void packTaskDone ();
void packTaskDone ();
void notifySendingResult ();
void notifySendingResult ();
void notifySendingTaskDone ();
void notifySendingTaskDone ();
void setSource (int __rank);
void setSource (int __rank);
private :
private :
WORKER_ID id;
SERVICE_ID serv_id;
Service * serv;
int src;
WORKER_ID id;
SERVICE_ID serv_id;
Service * serv;
int src;
bool recvAndCompleted;
unsigned taskAssigned;
bool recvAndCompleted;
unsigned taskAssigned;
sem_t sem_task_done;
sem_t sem_task_asgn;
};
sem_t sem_task_done;
sem_t sem_task_asgn;
};
extern void initWorkersEnv ();

View file

@ -1,4 +1,4 @@
/*
/*
* <xml_parser.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -40,23 +40,27 @@
static xmlTextReaderPtr reader;
void openXMLDocument (const char * __filename) {
void openXMLDocument (const char * __filename)
{
reader = xmlNewTextReaderFilename (__filename);
if (! reader) {
if (! reader)
{
fprintf (stderr, "unable to open '%s'.\n", __filename);
exit (1);
}
fprintf (stderr, "unable to open '%s'.\n", __filename);
exit (1);
}
}
void closeXMLDocument () {
void closeXMLDocument ()
{
xmlFreeTextReader (reader);
}
std :: string getAttributeValue (const std :: string & __attr) {
std :: string getAttributeValue (const std :: string & __attr)
{
xmlChar * value = xmlTextReaderGetAttribute (reader, (const xmlChar *) __attr.c_str ());
@ -67,7 +71,8 @@ std :: string getAttributeValue (const std :: string & __attr) {
return str;
}
static bool isSep (const xmlChar * __text) {
static bool isSep (const xmlChar * __text)
{
for (unsigned i = 0; i < strlen ((char *) __text); i ++)
if (__text [i] != ' ' && __text [i] != '\t' && __text [i] != '\n')
@ -75,15 +80,18 @@ static bool isSep (const xmlChar * __text) {
return true;
}
std :: string getNextNode () {
std :: string getNextNode ()
{
xmlChar * name, * value;
do {
xmlTextReaderRead (reader);
name = xmlTextReaderName (reader);
value = xmlTextReaderValue (reader);
} while (! strcmp ((char *) name, "#text") && isSep (value));
do
{
xmlTextReaderRead (reader);
name = xmlTextReaderName (reader);
value = xmlTextReaderValue (reader);
}
while (! strcmp ((char *) name, "#text") && isSep (value));
std :: string str;

View file

@ -1,4 +1,4 @@
/*
/*
* <xml_parser.h>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007