git-svn-id: svn://scm.gforge.inria.fr/svnroot/paradiseo@810 331e1502-861f-0410-8da2-ba01fb791d7f

This commit is contained in:
atantar 2007-11-23 16:41:23 +00:00
commit 7526792805
49 changed files with 449 additions and 420 deletions

View file

@ -46,6 +46,7 @@ static std :: map <const Communicable *, unsigned> comm_to_key; /* Map of regist
unsigned Communicable :: num_comm = 0; unsigned Communicable :: num_comm = 0;
Communicable :: Communicable () { Communicable :: Communicable () {
comm_to_key [this] = key = ++ num_comm; comm_to_key [this] = key = ++ num_comm;

View file

@ -75,6 +75,4 @@ extern void initCommunicableEnv ();
extern Communicable * getCommunicable (COMM_ID __key); extern Communicable * getCommunicable (COMM_ID __key);
//extern COMM_ID getKey (const Communicable * __comm);
#endif #endif

View file

@ -44,7 +44,14 @@
template <class F, class T> void pack (const eoVector <F, T> & __v) { template <class F, class T> void pack (const eoVector <F, T> & __v) {
if (__v.invalid()) {
pack((unsigned)0);
}
else {
pack((unsigned)1);
pack (__v.fitness ()); pack (__v.fitness ());
}
unsigned len = __v.size (); unsigned len = __v.size ();
pack (len); pack (len);
for (unsigned i = 0 ; i < len; i ++) for (unsigned i = 0 ; i < len; i ++)
@ -53,9 +60,16 @@ template <class F, class T> void pack (const eoVector <F, T> & __v) {
template <class F, class T> void unpack (eoVector <F, T> & __v) { template <class F, class T> void unpack (eoVector <F, T> & __v) {
unsigned valid; unpack(valid);
if (! valid) {
__v.invalidate();
}
else {
F fit; F fit;
unpack (fit); unpack (fit);
__v.fitness (fit); __v.fitness (fit);
}
unsigned len; unsigned len;
unpack (len); unpack (len);

View file

@ -129,5 +129,4 @@ template <class U, class V> void unpack (std :: pair <U, V> & __pair) {
unpack (__pair.second); unpack (__pair.second);
} }
#endif #endif

View file

@ -50,12 +50,14 @@
#define DEBUG_PATH "./log/" #define DEBUG_PATH "./log/"
static bool debug = true; static bool debug = true;
static char host [MAX_BUFF_SIZE]; static char host [MAX_BUFF_SIZE];
std :: vector <FILE *> files; std :: vector <FILE *> files;
void setDebugMode (bool __dbg) { void setDebugMode (bool __dbg) {
debug = __dbg; debug = __dbg;

View file

@ -44,7 +44,6 @@ void peo :: finalize () {
printDebugMessage ("waiting for the termination of all threads"); printDebugMessage ("waiting for the termination of all threads");
joinRunners (); joinRunners ();
finalizeRMC (); finalizeRMC ();
printDebugMessage ("this is the end"); printDebugMessage ("this is the end");

View file

@ -38,11 +38,14 @@
#include "peo_init.h" #include "peo_init.h"
#include "peo_param.h" #include "peo_param.h"
#include "peo_debug.h" #include "peo_debug.h"
#include "rmc.h" #include "rmc.h"
#include "runner.h"
extern void initCommunicableEnv (); extern void initCommunicableEnv ();
extern void initBuffers ();
extern void initThreadsEnv (); extern void initThreadsEnv ();
extern void initReactiveThreadsEnv (); extern void initReactiveThreadsEnv ();
@ -50,10 +53,14 @@ extern void initReactiveThreadsEnv ();
extern void initRunnersEnv (); extern void initRunnersEnv ();
extern void initWorkersEnv (); extern void initWorkersEnv ();
extern void initScheduler ();
static void initExecutionEnv() { static void initExecutionEnv() {
initCommunicableEnv (); initCommunicableEnv ();
initBuffers ();
initScheduler();
initThreadsEnv (); initThreadsEnv ();
initReactiveThreadsEnv (); initReactiveThreadsEnv ();
@ -62,6 +69,7 @@ static void initExecutionEnv() {
initWorkersEnv (); initWorkersEnv ();
} }
namespace peo { namespace peo {
int * argc; int * argc;
@ -77,12 +85,12 @@ namespace peo {
/* Initializing the execution environment */ /* Initializing the execution environment */
initExecutionEnv(); initExecutionEnv();
/* Initializing the the Resource Management and Communication */
initRMC (__argc, __argv);
/* Loading the common parameters */ /* Loading the common parameters */
loadParameters (__argc, __argv); loadParameters (__argc, __argv);
/* Initializing the the Resource Management and Communication */
initRMC ( *peo::argc, *peo::argv);
/* */ /* */
initDebugging (); initDebugging ();
} }

View file

@ -40,7 +40,6 @@
#include "peo_debug.h" #include "peo_debug.h"
void peo :: loadParameters (int & __argc, char * * & __argv) { void peo :: loadParameters (int & __argc, char * * & __argv) {
eoParser parser (__argc, __argv); eoParser parser (__argc, __argv);

View file

@ -34,11 +34,11 @@
* *
*/ */
#include "peo_init.h"
#include "peo_run.h" #include "peo_run.h"
#include "rmc.h" #include "rmc.h"
#include "runner.h" #include "runner.h"
void peo :: run () { void peo :: run () {
startRunners (); startRunners ();

View file

@ -40,6 +40,7 @@ static bool the_end = false;
static std :: vector <ReactiveThread *> reac_threads; static std :: vector <ReactiveThread *> reac_threads;
ReactiveThread :: ReactiveThread () { ReactiveThread :: ReactiveThread () {
reac_threads.push_back (this); reac_threads.push_back (this);

View file

@ -62,6 +62,4 @@ extern void initReactiveThreadsEnv ();
extern void stopReactiveThreads (); extern void stopReactiveThreads ();
extern bool theEnd ();
#endif /*REAC_THREAD_H_*/ #endif /*REAC_THREAD_H_*/

View file

@ -95,50 +95,19 @@ Runner * getRunner (RUNNER_ID __key) {
return dynamic_cast <Runner *> (getCommunicable (__key)); return dynamic_cast <Runner *> (getCommunicable (__key));
} }
void packExecutionContext () {
num_local_exec_runners = 0;
for (unsigned i = 0; i < the_runners.size (); i ++)
if (the_runners [i] -> isAssignedLocally ()) num_local_exec_runners ++;
pack(num_local_exec_runners);
}
void unpackExecutionContext () {
unsigned num_remote_runners;
unpack(num_remote_runners);
num_exec_runners += num_remote_runners;
}
void initializeContext () { void initializeContext () {
initMessage (); num_local_exec_runners = 0;
packExecutionContext ();
sendMessageToAll (EXECUTION_CONTEXT_TAG);
int src, tag; // setting up the execution IDs & counting the number of local exec. runners
for (unsigned i = 0; i < getNumberOfNodes(); i ++) { for (unsigned i = 0; i < the_runners.size (); i ++) {
the_runners [i] -> setExecutionID ( my_node -> execution_id_run[ i ] );
cleanBuffers (); if (the_runners [i] -> isAssignedLocally ()) num_local_exec_runners ++;
waitMessage ();
probeMessage ( src, tag );
receiveMessage( src, tag );
initMessage ();
unpackExecutionContext ();
} }
cleanBuffers (); collectiveCountOfRunners( &num_local_exec_runners, &num_exec_runners );
// setting up the execution IDs // synchronizeNodes ();
for (unsigned i = 0; i < the_runners.size (); i ++)
the_runners [i] -> setExecutionID ( my_node -> execution_id_run[ i ] );
// synchronizing - all the nodes have to finish initializing
// the context before actually executing the runners
synchronizeNodes ();
for (unsigned i = 0; i < the_runners.size (); i ++) for (unsigned i = 0; i < the_runners.size (); i ++)
if (the_runners [i] -> isAssignedLocally ()) the_runners [i] -> notifyContextInitialized (); if (the_runners [i] -> isAssignedLocally ()) the_runners [i] -> notifyContextInitialized ();
@ -217,6 +186,7 @@ void unpackTerminationOfRunner () {
printDebugMessage ("All the runners have terminated - now stopping the reactive threads."); printDebugMessage ("All the runners have terminated - now stopping the reactive threads.");
stopReactiveThreads (); stopReactiveThreads ();
printDebugMessage ("Reactive threads stopped!");
} }
} }

View file

@ -42,6 +42,7 @@ static std :: vector <Thread *> threads;
unsigned num_act = 0; unsigned num_act = 0;
Thread :: Thread () { Thread :: Thread () {
threads.push_back (this); threads.push_back (this);
@ -53,8 +54,6 @@ Thread :: ~ Thread () {
/* Nothing ! */ /* Nothing ! */
} }
extern int getNodeRank ();
void Thread :: setActive () { void Thread :: setActive () {
if (! act) { if (! act) {

View file

@ -45,4 +45,3 @@ void Topology :: add (Cooperative & __mig) {
mig.push_back (& __mig) ; mig.push_back (& __mig) ;
} }

View file

@ -112,8 +112,8 @@ template < class EOT > class peoEA : public Runner
//! @param eoPop< EOT >& __pop - initial population of the algorithm, to be iteratively evolved; //! @param eoPop< EOT >& __pop - initial population of the algorithm, to be iteratively evolved;
void operator()( eoPop< EOT >& __pop ); void operator()( eoPop< EOT >& __pop );
private:
private:
eoContinue< EOT >& cont; eoContinue< EOT >& cont;
peoPopEval< EOT >& pop_eval; peoPopEval< EOT >& pop_eval;
@ -150,7 +150,7 @@ template< class EOT > void peoEA< EOT > :: operator ()( eoPop< EOT >& __pop )
template< class EOT > void peoEA< EOT > :: run() template< class EOT > void peoEA< EOT > :: run()
{ {
printDebugMessage( "performing the first evaluation of the population." ); printDebugMessage( "peoEA: performing the first evaluation of the population." );
pop_eval( *pop ); pop_eval( *pop );
do do
@ -158,22 +158,21 @@ template< class EOT > void peoEA< EOT > :: run()
eoPop< EOT > off; eoPop< EOT > off;
printDebugMessage( "performing the selection step." ); printDebugMessage( "peoEA: performing the selection step." );
select( *pop, off ); select( *pop, off );
trans( off ); trans( off );
printDebugMessage( "performing the evaluation of the population." ); printDebugMessage( "peoEA: performing the evaluation of the population." );
pop_eval( off ); pop_eval( off );
printDebugMessage( "performing the replacement of the population." ); printDebugMessage( "peoEA: performing the replacement of the population." );
replace( *pop, off ); replace( *pop, off );
printDebugMessage( "deciding of the continuation." ); printDebugMessage( "peoEA: deciding of the continuation." );
} }
while ( cont( *pop ) ); while ( cont( *pop ) );
} }
#endif #endif

View file

@ -167,7 +167,7 @@ template< class EOT > void peoParaSGATransform< EOT > :: unpackResult()
template< class EOT > void peoParaSGATransform< EOT > :: operator()( eoPop < EOT >& __pop ) template< class EOT > void peoParaSGATransform< EOT > :: operator()( eoPop < EOT >& __pop )
{ {
printDebugMessage( "performing the parallel transformation step." ); printDebugMessage( "peoParaSGATransform: performing the parallel transformation step." );
pop = &__pop; pop = &__pop;
idx = 0; idx = 0;
num_term = 0; num_term = 0;

View file

@ -53,5 +53,4 @@ template< class EOT > class peoPopEval : public Service
virtual void operator()( eoPop< EOT >& __pop ) = 0; virtual void operator()( eoPop< EOT >& __pop ) = 0;
}; };
#endif #endif

View file

@ -268,7 +268,7 @@ template< class EOT > void peoSyncIslandMig< EOT > :: emigrate()
em.push( mig ); em.push( mig );
coop_em.push( out[ i ] ); coop_em.push( out[ i ] );
send( out[ i ] ); send( out[ i ] );
printDebugMessage( "sending some emigrants." ); printDebugMessage( "peoSyncIslandMig: sending some emigrants." );
} }
} }
@ -282,7 +282,7 @@ template< class EOT > void peoSyncIslandMig< EOT > :: immigrate()
assert( imm.size() ); assert( imm.size() );
replace( destination, imm.front() ) ; replace( destination, imm.front() ) ;
imm.pop(); imm.pop();
printDebugMessage( "receiving some immigrants." ); printDebugMessage( "peoSyncIslandMig: receiving some immigrants." );
} }
unlock(); unlock();
} }
@ -317,7 +317,7 @@ template< class EOT > void peoSyncIslandMig< EOT > :: notifySending()
if ( imm.empty() ) if ( imm.empty() )
{ {
printDebugMessage( "entering pasive mode\n" ); printDebugMessage( "peoSyncIslandMig: entering pasive mode\n" );
getOwner()->setPassive(); getOwner()->setPassive();
} }
} }

View file

@ -51,6 +51,7 @@ static sem_t sem_comm_init;
static Communicator * the_thread; static Communicator * the_thread;
Communicator :: Communicator (int * __argc, char * * * __argv) { Communicator :: Communicator (int * __argc, char * * * __argv) {
the_thread = this; the_thread = this;
@ -69,23 +70,28 @@ void Communicator :: start () {
sendMessages (); sendMessages ();
if (theEnd() || ! atLeastOneActiveRunner ()) if (! atLeastOneActiveRunner () && ! atLeastOneActiveThread() && allResourcesFree ())
break; break;
receiveMessages (); receiveMessages ();
} }
waitBuffers (); waitBuffers ();
sem_destroy(& sem_comm_init);
printDebugMessage ("finalizing"); printDebugMessage ("finalizing");
synchronizeNodes (); //synchronizeNodes ();
} }
void initCommunication () { void initCommunication () {
static bool initializedSemaphore = false;
if (initializedSemaphore) {
sem_destroy(& sem_comm_init);
}
sem_init (& sem_comm_init, 0, 0); sem_init (& sem_comm_init, 0, 0);
initializedSemaphore = true;
} }
void waitNodeInitialization () { void waitNodeInitialization () {
@ -97,6 +103,3 @@ void wakeUpCommunicator () {
the_thread -> wakeUp (); the_thread -> wakeUp ();
} }

View file

@ -51,12 +51,20 @@ static std :: vector <char *> act_buf; /* Active buffers */
static std :: vector <MPI_Request *> act_req; /* Active requests */ static std :: vector <MPI_Request *> act_req; /* Active requests */
void initBuffers () {
pos_buf = 0;
act_buf.clear ();
act_req.clear ();
}
void cleanBuffers () { void cleanBuffers () {
for (unsigned i = 0; i < act_req.size ();) { for (unsigned i = 0; i < act_req.size ();) {
MPI_Status stat ; MPI_Status stat ;
int flag ; int flag ;
MPI_Test (act_req [i], & flag, & stat) ; MPI_Test (act_req [i], & flag, & stat) ;
if (flag) { if (flag) {
@ -267,4 +275,3 @@ void unpack (char * __str) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & len, 1, MPI_INT, MPI_COMM_WORLD); MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & len, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, __str, len, MPI_CHAR, MPI_COMM_WORLD); MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, __str, len, MPI_CHAR, MPI_COMM_WORLD);
} }

View file

@ -47,6 +47,8 @@ extern void sendMessageToAll (int __tag);
extern void receiveMessage (int __from, int __tag); extern void receiveMessage (int __from, int __tag);
extern void initBuffers ();
extern void cleanBuffers (); extern void cleanBuffers ();
extern void waitBuffers (); extern void waitBuffers ();

View file

@ -40,6 +40,8 @@
#include <string> #include <string>
#include <cassert> #include <cassert>
#include "mess.h"
class MPIThreadedEnv { class MPIThreadedEnv {
@ -63,7 +65,7 @@ public:
private: private:
/* No instance of this class can be created outside its domain */ /* No instance of this class can be created outside its domain! */
MPIThreadedEnv ( int * __argc, char * * * __argv ) { MPIThreadedEnv ( int * __argc, char * * * __argv ) {
static bool MPIThreadedEnvInitialized = false; static bool MPIThreadedEnvInitialized = false;
@ -72,6 +74,7 @@ private:
if (! MPIThreadedEnvInitialized) { if (! MPIThreadedEnvInitialized) {
MPI_Init_thread (__argc, __argv, MPI_THREAD_FUNNELED, & provided); MPI_Init_thread (__argc, __argv, MPI_THREAD_FUNNELED, & provided);
assert (provided == MPI_THREAD_FUNNELED); /* The MPI implementation must be multi-threaded. assert (provided == MPI_THREAD_FUNNELED); /* The MPI implementation must be multi-threaded.
Yet, only one thread performs the comm. Yet, only one thread performs the comm.
operations */ operations */
@ -92,6 +95,7 @@ static std :: map <std :: string, int> name_to_rk;
static std :: vector <std :: string> rk_to_name; static std :: vector <std :: string> rk_to_name;
int getNodeRank () { int getNodeRank () {
return rk; return rk;
@ -102,6 +106,11 @@ int getNumberOfNodes () {
return sz; return sz;
} }
void collectiveCountOfRunners ( unsigned int* num_local_exec_runners, unsigned int* num_exec_runners ) {
MPI_Allreduce( num_local_exec_runners, num_exec_runners, 1, MPI_UNSIGNED, MPI_SUM, MPI_COMM_WORLD );
}
int getRankFromName (const std :: string & __name) { int getRankFromName (const std :: string & __name) {
return atoi (__name.c_str ()); return atoi (__name.c_str ());
@ -112,9 +121,8 @@ void initNode (int * __argc, char * * * __argv) {
rk_to_name.clear (); rk_to_name.clear ();
name_to_rk.clear (); name_to_rk.clear ();
MPIThreadedEnv :: init ( __argc, __argv ); MPIThreadedEnv :: init ( __argc, __argv );
//synchronizeNodes();
MPI_Comm_rank (MPI_COMM_WORLD, & rk); /* Who ? */ MPI_Comm_rank (MPI_COMM_WORLD, & rk); /* Who ? */
MPI_Comm_size (MPI_COMM_WORLD, & sz); /* How many ? */ MPI_Comm_size (MPI_COMM_WORLD, & sz); /* How many ? */

View file

@ -44,6 +44,8 @@ extern int getNodeRank (); /* It gives the rank of the calling process */
extern int getNumberOfNodes (); /* It gives the size of the environment (Total number of nodes) */ extern int getNumberOfNodes (); /* It gives the size of the environment (Total number of nodes) */
extern void collectiveCountOfRunners ( unsigned int* num_local_exec_runners, unsigned int* num_exec_runners );
extern int getRankFromName (const std :: string & __name); /* It gives the rank of the process extern int getRankFromName (const std :: string & __name); /* It gives the rank of the process
expressed by its name */ expressed by its name */

View file

@ -131,6 +131,4 @@ void receiveMessages () {
} }
} while ( ! atLeastOneActiveThread () && atLeastOneActiveRunner () /*&& ! allResourcesFree ()*/ ); } while ( ! atLeastOneActiveThread () && atLeastOneActiveRunner () /*&& ! allResourcesFree ()*/ );
cleanBuffers ();
} }

View file

@ -45,6 +45,7 @@ static std :: vector <pthread_t *> ll_threads; /* Low level threads */
static std :: vector <Worker *> worker_threads; /* Worker threads */ static std :: vector <Worker *> worker_threads; /* Worker threads */
static Communicator* communicator_thread = NULL; /* Communicator thread */ static Communicator* communicator_thread = NULL; /* Communicator thread */
void runRMC () { void runRMC () {
/* Worker(s) ? */ /* Worker(s) ? */

View file

@ -47,6 +47,8 @@ static std :: queue <SCHED_REQUEST> requests; /* Requests */
static unsigned initNumberOfRes = 0; static unsigned initNumberOfRes = 0;
extern void wakeUpCommunicator();
void initScheduler () { void initScheduler () {
resources = std :: queue <SCHED_RESOURCE> (); resources = std :: queue <SCHED_RESOURCE> ();
@ -65,10 +67,13 @@ void initScheduler () {
} }
bool allResourcesFree () { bool allResourcesFree () {
return resources.size () == initNumberOfRes; return resources.size () == initNumberOfRes;
} }
unsigned numResourcesFree () {
return resources.size ();
}
static void update () { static void update () {
unsigned num_alloc = std :: min (resources.size (), requests.size ()); unsigned num_alloc = std :: min (resources.size (), requests.size ());
@ -107,4 +112,5 @@ void unpackTaskDone () {
if (resources.size () == initNumberOfRes) if (resources.size () == initNumberOfRes)
printDebugMessage ("all the resources are now free."); printDebugMessage ("all the resources are now free.");
update (); update ();
wakeUpCommunicator();
} }

View file

@ -57,4 +57,6 @@ extern void unpackTaskDone ();
extern bool allResourcesFree (); extern bool allResourcesFree ();
extern unsigned numResourcesFree ();
#endif #endif

View file

@ -49,6 +49,7 @@
#define TO_ALL -1 #define TO_ALL -1
typedef struct { typedef struct {
Communicable * comm; Communicable * comm;
@ -57,27 +58,27 @@ typedef struct {
} SEND_REQUEST; } SEND_REQUEST;
static std :: queue <SEND_REQUEST> mess; static std :: queue <SEND_REQUEST> mess;
static sem_t sem_send; static sem_t sem_send;
static bool contextInitialized = false; static bool contextInitialized = false;
void initSending () { void initSending () {
static bool initializedSem = false; static bool initializedSemaphore = false;
mess = std :: queue <SEND_REQUEST> (); mess = std :: queue <SEND_REQUEST> ();
if (! initializedSem) { if (initializedSemaphore) {
sem_init (& sem_send, 0, 1);
initializedSem = true;
}
else {
sem_destroy(& sem_send); sem_destroy(& sem_send);
sem_init (& sem_send, 0, 1);
} }
sem_init (& sem_send, 0, 1);
initializedSemaphore = true;
contextInitialized = false; contextInitialized = false;
} }
@ -103,13 +104,13 @@ extern void initializeContext ();
void sendMessages () { void sendMessages () {
sem_wait (& sem_send);
if (! contextInitialized) { if (! contextInitialized) {
contextInitialized = true; contextInitialized = true;
initializeContext(); initializeContext();
} }
sem_wait (& sem_send);
while (! mess.empty ()) { while (! mess.empty ()) {
SEND_REQUEST req = mess.front (); SEND_REQUEST req = mess.front ();

View file

@ -48,6 +48,4 @@
#define TASK_RESULT_TAG 19 #define TASK_RESULT_TAG 19
#define TASK_DONE_TAG 20 #define TASK_DONE_TAG 20
#define EXECUTION_CONTEXT_TAG 1000
#endif #endif

View file

@ -46,6 +46,8 @@
static std :: vector <Worker *> key_to_worker (1); /* Vector of registered workers */ static std :: vector <Worker *> key_to_worker (1); /* Vector of registered workers */
extern void wakeUpCommunicator ();
Worker * getWorker (WORKER_ID __key) { Worker * getWorker (WORKER_ID __key) {
return key_to_worker [__key]; return key_to_worker [__key];
@ -54,8 +56,11 @@ Worker * getWorker (WORKER_ID __key) {
Worker :: Worker () { Worker :: Worker () {
recvAndCompleted = false; recvAndCompleted = false;
taskAssigned = 0;
id = key_to_worker.size (); id = key_to_worker.size ();
key_to_worker.push_back (this); key_to_worker.push_back (this);
sem_init( &sem_task_done, 0, 0 );
} }
void Worker :: packResult () { void Worker :: packResult () {
@ -66,6 +71,7 @@ void Worker :: packResult () {
void Worker :: unpackData () { void Worker :: unpackData () {
taskAssigned ++;
printDebugMessage ("unpacking the ID. of the service."); printDebugMessage ("unpacking the ID. of the service.");
unpack (serv_id); unpack (serv_id);
serv = getService (serv_id); serv = getService (serv_id);
@ -90,6 +96,7 @@ void Worker :: notifySendingResult () {
void Worker :: notifySendingTaskDone () { void Worker :: notifySendingTaskDone () {
sem_post(&sem_task_done);
setPassive (); setPassive ();
} }
@ -104,20 +111,26 @@ void Worker :: start () {
sleep (); sleep ();
if (! atLeastOneActiveRunner ()) if (! atLeastOneActiveRunner () && ! taskAssigned)
break; break;
if (recvAndCompleted) { if (recvAndCompleted) {
send (this, my_node -> rk_sched, TASK_DONE_TAG); send (this, my_node -> rk_sched, TASK_DONE_TAG);
recvAndCompleted = false; recvAndCompleted = false;
sem_wait(&sem_task_done);
taskAssigned --;
} }
else { else {
printDebugMessage ("executing the task.");
serv -> execute (); serv -> execute ();
send (this, src, TASK_RESULT_TAG); send (this, src, TASK_RESULT_TAG);
} }
} }
printDebugMessage ("Worker finished execution.");
setPassive ();
wakeUpCommunicator();
} }
void initWorkersEnv () { void initWorkersEnv () {

View file

@ -72,6 +72,10 @@ private :
int src; int src;
bool recvAndCompleted; bool recvAndCompleted;
unsigned taskAssigned;
sem_t sem_task_done;
sem_t sem_task_asgn;
}; };
extern void initWorkersEnv (); extern void initWorkersEnv ();

View file

@ -99,4 +99,3 @@ std :: string getNextNode () {
return str; return str;
} }