git-svn-id: svn://scm.gforge.inria.fr/svnroot/paradiseo@790 331e1502-861f-0410-8da2-ba01fb791d7f

This commit is contained in:
atantar 2007-11-18 17:00:47 +00:00
commit adb6419766
60 changed files with 1017 additions and 1095 deletions

View file

@ -1,4 +1,4 @@
/*
/*
* <runner.cpp>
* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007
* (C) OPAC Team, LIFL, 2002-2007
@ -41,104 +41,163 @@
#include "peo_debug.h"
#include "messaging.h"
static unsigned num_act = 0; /* Number of active runners */
#include "../rmc/mpi/mess.h"
#include "../rmc/mpi/tags.h"
static std :: vector <pthread_t *> ll_threads; /* Low-level runner threads */
static std :: vector <pthread_t *> ll_threads; /* Low-level runner threads */
static std :: vector <Runner *> the_runners;
static unsigned num_runners = 0;
static unsigned num_def_runners = 0; /* Number of defined runners */
Runner :: Runner ()
{
static unsigned num_local_exec_runners = 0; /* Number of locally executing runners */
static unsigned num_exec_runners = 0; /* Number of globally executing runners */
id = ++ num_runners;
the_runners.push_back (this);
sem_init (& sem_start, 0, 0);
num_act ++;
}
extern int getNodeRank ();
extern int getNumberOfNodes ();
void unpackTerminationOfRunner ()
{
RUNNER_ID id;
unpack (id);
num_act --;
printDebugMessage ("I'm noticed of the termination of a runner");
if (! num_act)
{
printDebugMessage ("all the runners have terminated. Now stopping the reactive threads.");
stopReactiveThreads ();
}
Runner :: Runner () {
def_id = ++ num_def_runners;
the_runners.push_back (this);
sem_init (& sem_start, 0, 0);
sem_init (& sem_cntxt, 0, 0);
}
bool atLeastOneActiveRunner ()
{
RUNNER_ID Runner :: getDefinitionID () {
return num_act;
return def_id;
}
RUNNER_ID Runner :: getID ()
{
RUNNER_ID Runner :: getExecutionID () {
return id;
return def_id;
}
void Runner :: start ()
{
setActive ();
sem_post (& sem_start);
run ();
terminate ();
}
void Runner :: notifySendingTermination ()
{
/*
char b [1000];
sprintf (b, "Il reste encore %d !!!!!!!!!!!!", n);
printDebugMessage (b);
*/
printDebugMessage ("je suis informe que tout le monde a recu ma terminaison");
setPassive ();
}
void Runner :: waitStarting ()
{
sem_wait (& sem_start);
}
Runner * getRunner (RUNNER_ID __key)
{
Runner * getRunner (RUNNER_ID __key) {
return dynamic_cast <Runner *> (getCommunicable (__key));
}
void startRunners ()
{
void packExecutionContext () {
num_local_exec_runners = 0;
for (unsigned i = 0; i < the_runners.size (); i ++)
if (the_runners [i] -> isAssignedLocally ()) num_local_exec_runners ++;
pack(num_local_exec_runners);
}
void unpackExecutionContext () {
unsigned num_remote_runners;
unpack(num_remote_runners);
num_exec_runners += num_remote_runners;
}
void initializeContext () {
initMessage ();
packExecutionContext ();
sendMessageToAll (EXECUTION_CONTEXT_TAG);
int src, tag;
for (unsigned i = 0; i < getNumberOfNodes(); i ++) {
cleanBuffers ();
waitMessage ();
probeMessage ( src, tag );
receiveMessage( src, tag );
initMessage ();
unpackExecutionContext ();
}
cleanBuffers ();
synchronizeNodes ();
for (unsigned i = 0; i < the_runners.size (); i ++)
if (the_runners [i] -> isAssignedLocally ()) the_runners [i] -> notifyContextInitialized ();
}
void Runner :: waitStarting () {
sem_wait (& sem_start);
}
void Runner :: waitContextInitialization () {
sem_wait (& sem_cntxt);
}
void Runner :: start () {
setActive ();
sem_post (& sem_start);
waitContextInitialization ();
run ();
terminate ();
}
void startRunners () {
/* Runners */
for (unsigned i = 0; i < the_runners.size (); i ++)
if (the_runners [i] -> isLocal ())
{
addThread (the_runners [i], ll_threads);
the_runners [i] -> waitStarting ();
}
if (the_runners [i] -> isAssignedLocally ()) {
addThread (the_runners [i], ll_threads);
the_runners [i] -> waitStarting ();
}
printDebugMessage ("launched the parallel runners");
}
void joinRunners ()
{
void joinRunners () {
joinThreads (ll_threads);
}
bool atLeastOneActiveRunner () {
return num_exec_runners;
}
unsigned numberOfActiveRunners () {
return num_exec_runners;
}
void Runner :: notifyContextInitialized () {
sem_post (& sem_cntxt);
}
void Runner :: notifySendingTermination () {
printDebugMessage ("I am informed that everyone received my termination notification.");
setPassive ();
}
void unpackTerminationOfRunner () {
RUNNER_ID finished_id;
unpack (finished_id);
num_exec_runners --;
printDebugMessage ("I'm noticed of the termination of a runner");
if (!num_exec_runners) {
printDebugMessage ("All the runners have terminated - now stopping the reactive threads.");
stopReactiveThreads ();
}
}