git-svn-id: svn://scm.gforge.inria.fr/svnroot/paradiseo@794 331e1502-861f-0410-8da2-ba01fb791d7f

This commit is contained in:
atantar 2007-11-20 13:40:15 +00:00
commit aecc5ba8fc
35 changed files with 498 additions and 262 deletions

View file

@ -93,5 +93,9 @@ void Communicable :: resume () {
sem_post (& sem_stop); sem_post (& sem_stop);
} }
void initCommunicableEnv () {
key_to_comm.resize (1);
comm_to_key.clear ();
Communicable :: num_comm = 0;
}

View file

@ -39,6 +39,7 @@
#include <semaphore.h> #include <semaphore.h>
typedef unsigned COMM_ID; typedef unsigned COMM_ID;
class Communicable { class Communicable {
@ -56,7 +57,11 @@ public :
void stop (); /* It suspends the current process */ void stop (); /* It suspends the current process */
void resume (); /* It resumes ___________ */ void resume (); /* It resumes ___________ */
public :
static unsigned num_comm;
protected : protected :
COMM_ID key; COMM_ID key;
@ -64,10 +69,10 @@ protected :
sem_t sem_lock; sem_t sem_lock;
sem_t sem_stop; sem_t sem_stop;
static unsigned num_comm;
}; };
extern void initCommunicableEnv ();
extern Communicable * getCommunicable (COMM_ID __key); extern Communicable * getCommunicable (COMM_ID __key);
//extern COMM_ID getKey (const Communicable * __comm); //extern COMM_ID getKey (const Communicable * __comm);

View file

@ -41,6 +41,7 @@
#include "messaging.h" #include "messaging.h"
template <class EOT> void pack (const eoPop <EOT> & __pop) { template <class EOT> void pack (const eoPop <EOT> & __pop) {
pack ((unsigned) __pop.size ()); pack ((unsigned) __pop.size ());
@ -57,4 +58,5 @@ template <class EOT> void unpack (eoPop <EOT> & __pop) {
for (unsigned i = 0; i < n; i ++) for (unsigned i = 0; i < n; i ++)
unpack (__pop [i]); unpack (__pop [i]);
} }
#endif #endif

View file

@ -41,6 +41,7 @@
#include "messaging.h" #include "messaging.h"
template <class F, class T> void pack (const eoVector <F, T> & __v) { template <class F, class T> void pack (const eoVector <F, T> & __v) {
pack (__v.fitness ()) ; pack (__v.fitness ()) ;

View file

@ -83,7 +83,6 @@ template <class U, class V> void pack (const std :: pair <U, V> & __pair) {
} }
//
/* Char */ /* Char */
extern void unpack (char & __c); extern void unpack (char & __c);

View file

@ -78,24 +78,28 @@ void endDebugging () {
for (unsigned i = 0; i < files.size (); i ++) for (unsigned i = 0; i < files.size (); i ++)
if (files [i] != stdout) if (files [i] != stdout)
fclose (files [i]); fclose (files [i]);
files.clear();
} }
void printDebugMessage (const char * __mess) { void printDebugMessage (const char * __mess) {
return;
if (debug) { if (debug) {
char buff [MAX_BUFF_SIZE]; char buff [MAX_BUFF_SIZE];
char localTime [MAX_BUFF_SIZE];
time_t t = time (0); time_t t = time (0);
/* Date */ /* Date */
sprintf (buff, "[%s][%s: ", host, ctime (& t)); strcpy( localTime, ctime (& t) );
* strchr (buff, '\n') = ']'; localTime[ strlen( localTime )-1 ] = ']';
sprintf (buff, "[%s][%s: ", host, localTime );
for (unsigned i = 0; i < files.size (); i ++) for (unsigned i = 0; i < files.size (); i ++)
fprintf (files [i], buff); fprintf (files [i], buff);
/* Message */ /* Message */
sprintf (buff, "%s", __mess); sprintf (buff, "%s", __mess);
for (unsigned i = 0; i < files.size (); i ++) { for (unsigned i = 0; i < files.size (); i ++) {
fputs (buff, files [i]); fputs (buff, files [i]);
fputs ("\n", files [i]); fputs ("\n", files [i]);

View file

@ -39,7 +39,6 @@
#include "runner.h" #include "runner.h"
#include "rmc.h" #include "rmc.h"
void peo :: finalize () { void peo :: finalize () {
printDebugMessage ("waiting for the termination of all threads"); printDebugMessage ("waiting for the termination of all threads");

View file

@ -38,7 +38,7 @@
#define __peo_finalize_h #define __peo_finalize_h
namespace peo { namespace peo {
extern void finalize (); extern void finalize ();
} }

View file

@ -41,6 +41,27 @@
#include "peo_debug.h" #include "peo_debug.h"
#include "rmc.h" #include "rmc.h"
extern void initCommunicableEnv ();
extern void initThreadsEnv ();
extern void initReactiveThreadsEnv ();
extern void initRunnersEnv ();
extern void initWorkersEnv ();
static void initExecutionEnv() {
initCommunicableEnv ();
initThreadsEnv ();
initReactiveThreadsEnv ();
initRunnersEnv ();
initWorkersEnv ();
}
namespace peo { namespace peo {
int * argc; int * argc;
@ -52,13 +73,16 @@ namespace peo {
argc = & __argc; argc = & __argc;
argv = & __argv; argv = & __argv;
/* Initializing the execution environment */
initExecutionEnv();
/* Initializing the the Resource Management and Communication */ /* Initializing the the Resource Management and Communication */
initRMC (__argc, __argv); initRMC (__argc, __argv);
/* Loading the common parameters */ /* Loading the common parameters */
loadParameters (__argc, __argv); loadParameters (__argc, __argv);
/* */ /* */
initDebugging (); initDebugging ();
} }

View file

@ -40,7 +40,7 @@
#include "runner.h" #include "runner.h"
void peo :: run () { void peo :: run () {
startRunners (); startRunners ();
runRMC (); runRMC ();

View file

@ -56,9 +56,18 @@ void ReactiveThread :: wakeUp () {
sem_post (& sem); sem_post (& sem);
} }
void initReactiveThreadsEnv () {
the_end = false;
reac_threads.clear ();
}
void stopReactiveThreads () { void stopReactiveThreads () {
the_end = true; the_end = true;
for (unsigned i = 0; i < reac_threads.size (); i ++) for (unsigned i = 0; i < reac_threads.size (); i ++)
reac_threads [i] -> wakeUp (); reac_threads [i] -> wakeUp ();
reac_threads.clear ();
} }
bool theEnd () { return the_end; }

View file

@ -41,6 +41,7 @@
#include "thread.h" #include "thread.h"
class ReactiveThread : public Thread { class ReactiveThread : public Thread {
public: public:
@ -51,13 +52,16 @@ public:
void sleep (); void sleep ();
void wakeUp (); void wakeUp ();
private: private:
sem_t sem; sem_t sem;
}; };
extern void initReactiveThreadsEnv ();
extern void stopReactiveThreads (); extern void stopReactiveThreads ();
#endif /*THREAD_H_*/ extern bool theEnd ();
#endif /*REAC_THREAD_H_*/

View file

@ -44,6 +44,9 @@
#include "../rmc/mpi/mess.h" #include "../rmc/mpi/mess.h"
#include "../rmc/mpi/tags.h" #include "../rmc/mpi/tags.h"
#include "../rmc/mpi/node.h"
#include "../rmc/mpi/schema.h"
static std :: vector <pthread_t *> ll_threads; /* Low-level runner threads */ static std :: vector <pthread_t *> ll_threads; /* Low-level runner threads */
@ -63,8 +66,11 @@ extern int getNumberOfNodes ();
Runner :: Runner () { Runner :: Runner () {
exec_id = 0;
def_id = ++ num_def_runners; def_id = ++ num_def_runners;
the_runners.push_back (this); the_runners.push_back (this);
sem_init (& sem_start, 0, 0); sem_init (& sem_start, 0, 0);
sem_init (& sem_cntxt, 0, 0); sem_init (& sem_cntxt, 0, 0);
} }
@ -76,7 +82,12 @@ RUNNER_ID Runner :: getDefinitionID () {
RUNNER_ID Runner :: getExecutionID () { RUNNER_ID Runner :: getExecutionID () {
return def_id; return exec_id;
}
void Runner :: setExecutionID (const RUNNER_ID& execution_id) {
exec_id = execution_id;
} }
Runner * getRunner (RUNNER_ID __key) { Runner * getRunner (RUNNER_ID __key) {
@ -101,7 +112,6 @@ void unpackExecutionContext () {
void initializeContext () { void initializeContext () {
initMessage (); initMessage ();
packExecutionContext (); packExecutionContext ();
sendMessageToAll (EXECUTION_CONTEXT_TAG); sendMessageToAll (EXECUTION_CONTEXT_TAG);
@ -121,6 +131,13 @@ void initializeContext () {
cleanBuffers (); cleanBuffers ();
// setting up the execution IDs
for (unsigned i = 0; i < the_runners.size (); i ++)
the_runners [i] -> setExecutionID ( my_node -> execution_id_run[ i ] );
// synchronizing - all the nodes have to finish initializing
// the context before actually executing the runners
synchronizeNodes (); synchronizeNodes ();
for (unsigned i = 0; i < the_runners.size (); i ++) for (unsigned i = 0; i < the_runners.size (); i ++)
@ -163,6 +180,7 @@ void startRunners () {
void joinRunners () { void joinRunners () {
joinThreads (ll_threads); joinThreads (ll_threads);
the_runners.clear();
} }
bool atLeastOneActiveRunner () { bool atLeastOneActiveRunner () {
@ -201,3 +219,13 @@ void unpackTerminationOfRunner () {
stopReactiveThreads (); stopReactiveThreads ();
} }
} }
void initRunnersEnv () {
ll_threads.clear ();
the_runners.clear ();
num_def_runners = 0;
num_local_exec_runners = 0;
num_exec_runners = 0;
}

View file

@ -56,6 +56,8 @@ public :
RUNNER_ID getExecutionID (); RUNNER_ID getExecutionID ();
void setExecutionID (const RUNNER_ID& execution_id);
bool isAssignedLocally (); bool isAssignedLocally ();
void waitStarting (); void waitStarting ();
@ -80,9 +82,12 @@ private :
sem_t sem_cntxt; sem_t sem_cntxt;
unsigned def_id; unsigned def_id;
unsigned exec_id;
}; };
extern void initRunnersEnv ();
extern Runner * getRunner (RUNNER_ID __key); extern Runner * getRunner (RUNNER_ID __key);
extern void initializeContext (); extern void initializeContext ();
@ -97,5 +102,4 @@ extern unsigned numberOfActiveRunners ();
extern void unpackTerminationOfRunner (); extern void unpackTerminationOfRunner ();
#endif #endif

View file

@ -51,9 +51,8 @@ Service * getService (SERVICE_ID __key) {
return dynamic_cast <Service *> (getCommunicable (__key)); return dynamic_cast <Service *> (getCommunicable (__key));
} }
void Service :: notifySendingData () { void Service :: notifySendingData () { }
}
void Service :: notifySendingResourceRequest () { void Service :: notifySendingResourceRequest () {
num_sent_rr --; num_sent_rr --;
@ -61,26 +60,14 @@ void Service :: notifySendingResourceRequest () {
notifySendingAllResourceRequests (); notifySendingAllResourceRequests ();
} }
void Service :: notifySendingAllResourceRequests () { void Service :: notifySendingAllResourceRequests () { }
} void Service :: packData () {}
void Service :: packData () { void Service :: unpackData () {}
} void Service :: execute () {}
void Service :: unpackData () { void Service :: packResult () {}
} void Service :: unpackResult () {}
void Service :: execute () {
}
void Service :: packResult () {
}
void Service :: unpackResult () {
}

View file

@ -40,6 +40,7 @@
#include "communicable.h" #include "communicable.h"
#include "thread.h" #include "thread.h"
typedef unsigned SERVICE_ID; typedef unsigned SERVICE_ID;
class Service : public Communicable { class Service : public Communicable {

View file

@ -73,6 +73,12 @@ void Thread :: setPassive () {
} }
} }
void initThreadsEnv () {
threads.clear ();
num_act = 0;
}
bool atLeastOneActiveThread () { bool atLeastOneActiveThread () {
return num_act; return num_act;
@ -95,6 +101,9 @@ void addThread (Thread * __hl_thread, std :: vector <pthread_t *> & __ll_threads
void joinThreads (std :: vector <pthread_t *> & __threads) { void joinThreads (std :: vector <pthread_t *> & __threads) {
for (unsigned i = 0; i < __threads.size (); i ++) for (unsigned i = 0; i < __threads.size (); i ++) {
pthread_join (* __threads [i], 0); pthread_join (* __threads [i], 0);
delete __threads [i];
}
__threads.clear();
} }

View file

@ -39,8 +39,8 @@
#include <vector> #include <vector>
/* A high-level thread */
/* A high-level thread */
class Thread { class Thread {
public: public:
@ -63,6 +63,8 @@ private :
bool act; bool act;
}; };
extern void initThreadsEnv ();
extern void addThread (Thread * __hl_thread, std :: vector <pthread_t *> & __ll_threads); extern void addThread (Thread * __hl_thread, std :: vector <pthread_t *> & __ll_threads);
extern void joinThreads (std :: vector <pthread_t *> & __ll_threads); extern void joinThreads (std :: vector <pthread_t *> & __ll_threads);

View file

@ -45,81 +45,117 @@
class peoParallelAlgorithmWrapper : public Runner class peoParallelAlgorithmWrapper : public Runner
{ {
public: public:
template< typename AlgorithmType > peoParallelAlgorithmWrapper( AlgorithmType& externalAlgorithm ) template< typename AlgorithmType > peoParallelAlgorithmWrapper( AlgorithmType& externalAlgorithm )
: algorithm( new Algorithm< AlgorithmType, void >( externalAlgorithm ) ) : algorithm( new Algorithm< AlgorithmType, void >( externalAlgorithm ) )
{} {}
template< typename AlgorithmType, typename AlgorithmDataType > peoParallelAlgorithmWrapper( AlgorithmType& externalAlgorithm, AlgorithmDataType& externalData ) template< typename AlgorithmType, typename AlgorithmDataType > peoParallelAlgorithmWrapper( AlgorithmType& externalAlgorithm, AlgorithmDataType& externalData )
: algorithm( new Algorithm< AlgorithmType, AlgorithmDataType >( externalAlgorithm, externalData ) ) : algorithm( new Algorithm< AlgorithmType, AlgorithmDataType >( externalAlgorithm, externalData ) )
{} {}
~peoParallelAlgorithmWrapper() template< typename AlgorithmReturnType > peoParallelAlgorithmWrapper( AlgorithmReturnType& (*externalAlgorithm)() )
: algorithm( new FunctionAlgorithm< AlgorithmReturnType, void >( externalAlgorithm ) )
{}
template< typename AlgorithmReturnType, typename AlgorithmDataType > peoParallelAlgorithmWrapper( AlgorithmReturnType& (*externalAlgorithm)( AlgorithmDataType& ), AlgorithmDataType& externalData )
: algorithm( new FunctionAlgorithm< AlgorithmReturnType, AlgorithmDataType >( externalAlgorithm, externalData ) )
{}
~peoParallelAlgorithmWrapper()
{ {
delete algorithm; delete algorithm;
} }
void run() void run()
{
algorithm->operator()();
}
private:
struct AbstractAlgorithm
{
// virtual destructor as we will be using inheritance and polymorphism
virtual ~AbstractAlgorithm()
{ }
// operator to be called for executing the algorithm
virtual void operator()()
{ }
};
template< typename AlgorithmType, typename AlgorithmDataType > struct Algorithm : public AbstractAlgorithm
{
Algorithm( AlgorithmType& externalAlgorithm, AlgorithmDataType& externalData )
: algorithm( externalAlgorithm ), algorithmData( externalData )
{}
virtual void operator()()
{ {
algorithm->operator()(); algorithm( algorithmData );
} }
AlgorithmType& algorithm;
private: AlgorithmDataType& algorithmData;
};
struct AbstractAlgorithm
{
// virtual destructor as we will be using inheritance and polymorphism
virtual ~AbstractAlgorithm()
{ }
// operator to be called for executing the algorithm
virtual void operator()()
{ }
};
template< typename AlgorithmType, typename AlgorithmDataType > struct Algorithm : public AbstractAlgorithm
{
Algorithm( AlgorithmType& externalAlgorithm, AlgorithmDataType& externalData )
: algorithm( externalAlgorithm ), algorithmData( externalData )
{}
virtual void operator()()
{
algorithm( algorithmData );
}
AlgorithmType& algorithm;
AlgorithmDataType& algorithmData;
};
template< typename AlgorithmType > struct Algorithm< AlgorithmType, void > : public AbstractAlgorithm template< typename AlgorithmType > struct Algorithm< AlgorithmType, void > : public AbstractAlgorithm
{ {
Algorithm( AlgorithmType& externalAlgorithm ) : algorithm( externalAlgorithm ) Algorithm( AlgorithmType& externalAlgorithm ) : algorithm( externalAlgorithm )
{} {}
virtual void operator()() virtual void operator()()
{ {
algorithm(); algorithm();
} }
AlgorithmType& algorithm; AlgorithmType& algorithm;
};
private:
AbstractAlgorithm* algorithm;
}; };
template< typename AlgorithmReturnType, typename AlgorithmDataType > struct FunctionAlgorithm : public AbstractAlgorithm
{
FunctionAlgorithm( AlgorithmReturnType (*externalAlgorithm)( AlgorithmDataType& ), AlgorithmDataType& externalData )
: algorithm( externalAlgorithm ), algorithmData( externalData )
{}
virtual void operator()()
{
algorithm( algorithmData );
}
AlgorithmReturnType (*algorithm)( AlgorithmDataType& );
AlgorithmDataType& algorithmData;
};
template< typename AlgorithmReturnType > struct FunctionAlgorithm< AlgorithmReturnType, void > : public AbstractAlgorithm
{
FunctionAlgorithm( AlgorithmReturnType (*externalAlgorithm)() )
: algorithm( externalAlgorithm )
{}
virtual void operator()()
{
algorithm();
}
AlgorithmReturnType (*algorithm)();
};
private:
AbstractAlgorithm* algorithm;
};
#endif #endif

View file

@ -230,8 +230,8 @@ template< class EOT > void peoSyncIslandMig< EOT > :: pack()
lock (); lock ();
pack( coop_em.front()->getKey() ); ::pack( coop_em.front()->getKey() );
pack( em.front() ); ::pack( em.front() );
coop_em.pop(); coop_em.pop();
em.pop(); em.pop();
@ -245,7 +245,7 @@ template< class EOT > void peoSyncIslandMig< EOT > :: unpack()
lock (); lock ();
eoPop< EOT > mig; eoPop< EOT > mig;
unpack( mig ); ::unpack( mig );
imm.push( mig ); imm.push( mig );
unlock(); unlock();

View file

@ -43,192 +43,227 @@
template < typename EntityType > class peoSynchronousMultiStart : public Service template < typename EntityType > class peoSynchronousMultiStart : public Service
{
public:
template < typename AlgorithmType > peoSynchronousMultiStart( AlgorithmType& externalAlgorithm )
{ {
public: singularAlgorithm = new Algorithm< AlgorithmType >( externalAlgorithm );
algorithms.push_back( singularAlgorithm );
template < typename AlgorithmType > peoSynchronousMultiStart( AlgorithmType& externalAlgorithm ) aggregationFunction = new NoAggregationFunction();
{ }
singularAlgorithm = new Algorithm< AlgorithmType >( externalAlgorithm ); template < typename AlgorithmReturnType, typename AlgorithmDataType > peoSynchronousMultiStart( AlgorithmReturnType (*externalAlgorithm)( AlgorithmDataType& ) )
algorithms.push_back( singularAlgorithm ); {
aggregationFunction = new NoAggregationFunction(); singularAlgorithm = new FunctionAlgorithm< AlgorithmReturnType, AlgorithmDataType >( externalAlgorithm );
} algorithms.push_back( singularAlgorithm );
template < typename AlgorithmType, typename AggregationFunctionType > peoSynchronousMultiStart( std::vector< AlgorithmType* >& externalAlgorithms, AggregationFunctionType& externalAggregationFunction ) aggregationFunction = new NoAggregationFunction();
{ }
for ( unsigned int index = 0; index < externalAlgorithms; index++ ) template < typename AlgorithmType, typename AggregationFunctionType > peoSynchronousMultiStart( std::vector< AlgorithmType* >& externalAlgorithms, AggregationFunctionType& externalAggregationFunction )
{ {
algorithms.push_back( new Algorithm< AlgorithmType >( *externalAlgorithms[ index ] ) ); for ( unsigned int index = 0; index < externalAlgorithms.size(); index++ )
}
aggregationFunction = new Algorithm< AggregationFunctionType >( externalAggregationFunction );
}
~peoSynchronousMultiStart()
{
for ( unsigned int index = 0; index < data.size(); index++ ) delete data[ index ];
for ( unsigned int index = 0; index < algorithms.size(); index++ ) delete algorithms[ index ];
delete aggregationFunction;
}
template < typename Type > void operator()( Type& externalData )
{
for ( typename Type::iterator externalDataIterator = externalData.begin(); externalDataIterator != externalData.end(); externalDataIterator++ )
{
data.push_back( new DataType< EntityType >( *externalDataIterator ) );
}
functionIndex = dataIndex = idx = num_term = 0;
requestResourceRequest( data.size() * algorithms.size() );
stop();
}
template < typename Type > void operator()( const Type& externalDataBegin, const Type& externalDataEnd )
{
for ( Type externalDataIterator = externalDataBegin; externalDataIterator != externalDataEnd; externalDataIterator++ )
{
data.push_back( new DataType< EntityType >( *externalDataIterator ) );
}
functionIndex = dataIndex = idx = num_term = 0;
requestResourceRequest( data.size() * algorithms.size() );
stop();
}
void packData();
void unpackData();
void execute();
void packResult();
void unpackResult();
void notifySendingData();
void notifySendingAllResourceRequests();
private:
template < typename Type > struct DataType;
struct AbstractDataType
{ {
virtual ~AbstractDataType() algorithms.push_back( new Algorithm< AlgorithmType >( *externalAlgorithms[ index ] ) );
{ } }
template < typename Type > operator Type& () aggregationFunction = new AggregationAlgorithm< AggregationFunctionType >( externalAggregationFunction );
{ }
return ( dynamic_cast< DataType< Type >& >( *this ) ).data; template < typename AlgorithmReturnType, typename AlgorithmDataType, typename AggregationFunctionType >
} peoSynchronousMultiStart( std::vector< AlgorithmReturnType (*)( AlgorithmDataType& ) >& externalAlgorithms,
}; AggregationFunctionType& externalAggregationFunction )
{
for ( unsigned int index = 0; index < externalAlgorithms.size(); index++ )
{
algorithms.push_back( new FunctionAlgorithm< AlgorithmReturnType, AlgorithmDataType >( externalAlgorithms[ index ] ) );
}
aggregationFunction = new AggregationAlgorithm< AggregationFunctionType >( externalAggregationFunction );
}
~peoSynchronousMultiStart()
{
for ( unsigned int index = 0; index < data.size(); index++ ) delete data[ index ];
for ( unsigned int index = 0; index < algorithms.size(); index++ ) delete algorithms[ index ];
delete aggregationFunction;
}
template < typename Type > void operator()( Type& externalData )
{
for ( typename Type::iterator externalDataIterator = externalData.begin(); externalDataIterator != externalData.end(); externalDataIterator++ )
{
data.push_back( new DataType< EntityType >( *externalDataIterator ) );
}
functionIndex = dataIndex = idx = num_term = 0;
requestResourceRequest( data.size() * algorithms.size() );
stop();
}
template < typename Type > void operator()( const Type& externalDataBegin, const Type& externalDataEnd )
{
for ( Type externalDataIterator = externalDataBegin; externalDataIterator != externalDataEnd; externalDataIterator++ )
{
data.push_back( new DataType< EntityType >( *externalDataIterator ) );
}
functionIndex = dataIndex = idx = num_term = 0;
requestResourceRequest( data.size() * algorithms.size() );
stop();
}
void packData();
void unpackData();
void execute();
void packResult();
void unpackResult();
void notifySendingData();
void notifySendingAllResourceRequests();
private:
template < typename Type > struct DataType;
struct AbstractDataType
{
virtual ~AbstractDataType()
{ }
template < typename Type > operator Type& ()
{
return ( dynamic_cast< DataType< Type >& >( *this ) ).data;
}
};
template < typename Type > struct DataType : public AbstractDataType template < typename Type > struct DataType : public AbstractDataType
{ {
DataType( Type& externalData ) : data( externalData ) DataType( Type& externalData ) : data( externalData )
{ } { }
Type& data; Type& data;
}; };
struct AbstractAlgorithm struct AbstractAlgorithm
{ {
virtual ~AbstractAlgorithm() virtual ~AbstractAlgorithm()
{ } { }
virtual void operator()( AbstractDataType& dataTypeInstance ) virtual void operator()( AbstractDataType& dataTypeInstance )
{} {}
}; };
template < typename AlgorithmType > struct Algorithm : public AbstractAlgorithm template < typename AlgorithmType > struct Algorithm : public AbstractAlgorithm
{ {
Algorithm( AlgorithmType& externalAlgorithm ) : algorithm( externalAlgorithm ) Algorithm( AlgorithmType& externalAlgorithm ) : algorithm( externalAlgorithm )
{ } { }
void operator()( AbstractDataType& dataTypeInstance ) void operator()( AbstractDataType& dataTypeInstance )
{ {
algorithm( dataTypeInstance ); algorithm( dataTypeInstance );
} }
AlgorithmType& algorithm; AlgorithmType& algorithm;
}; };
template < typename AlgorithmReturnType, typename AlgorithmDataType > struct FunctionAlgorithm : public AbstractAlgorithm
{
struct AbstractAggregationAlgorithm FunctionAlgorithm( AlgorithmReturnType (*externalAlgorithm)( AlgorithmDataType& ) ) : algorithm( externalAlgorithm )
{ { }
virtual ~AbstractAggregationAlgorithm() void operator()( AbstractDataType& dataTypeInstance )
{ } {
algorithm( dataTypeInstance );
}
virtual void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB ) AlgorithmReturnType (*algorithm)( AlgorithmDataType& );
{}; };
};
struct AbstractAggregationAlgorithm
{
virtual ~AbstractAggregationAlgorithm()
{ }
virtual void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB )
{};
};
template < typename AggregationAlgorithmType > struct AggregationAlgorithm : public AbstractAggregationAlgorithm template < typename AggregationAlgorithmType > struct AggregationAlgorithm : public AbstractAggregationAlgorithm
{ {
AggregationAlgorithm( AggregationAlgorithmType& externalAggregationAlgorithm ) : aggregationAlgorithm( externalAggregationAlgorithm ) AggregationAlgorithm( AggregationAlgorithmType& externalAggregationAlgorithm ) : aggregationAlgorithm( externalAggregationAlgorithm )
{ } { }
void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB ) void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB )
{ {
aggregationAlgorithm( dataTypeInstanceA, dataTypeInstanceB ); aggregationAlgorithm( dataTypeInstanceA, dataTypeInstanceB );
} }
AggregationAlgorithmType& aggregationAlgorithm; AggregationAlgorithmType& aggregationAlgorithm;
}; };
struct NoAggregationFunction : public AbstractAggregationAlgorithm struct NoAggregationFunction : public AbstractAggregationAlgorithm
{ {
void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB ) void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB )
{ {
static_cast< EntityType& >( dataTypeInstanceA ) = static_cast< EntityType& >( dataTypeInstanceB ); static_cast< EntityType& >( dataTypeInstanceA ) = static_cast< EntityType& >( dataTypeInstanceB );
} }
};
AbstractAlgorithm* singularAlgorithm;
std::vector< AbstractAlgorithm* > algorithms;
AbstractAggregationAlgorithm* aggregationFunction;
EntityType entityTypeInstance;
std::vector< AbstractDataType* > data;
unsigned idx;
unsigned num_term;
unsigned dataIndex;
unsigned functionIndex;
}; };
AbstractAlgorithm* singularAlgorithm;
std::vector< AbstractAlgorithm* > algorithms;
AbstractAggregationAlgorithm* aggregationFunction;
EntityType entityTypeInstance;
std::vector< AbstractDataType* > data;
unsigned idx;
unsigned num_term;
unsigned dataIndex;
unsigned functionIndex;
};
template < typename EntityType > void peoSynchronousMultiStart< EntityType >::packData() template < typename EntityType > void peoSynchronousMultiStart< EntityType >::packData()
{ {
@ -239,11 +274,11 @@ template < typename EntityType > void peoSynchronousMultiStart< EntityType >::pa
// done with functionIndex for the entire data set - moving to another // done with functionIndex for the entire data set - moving to another
// function/algorithm starting all over with the entire data set ( idx is set to 0 ) // function/algorithm starting all over with the entire data set ( idx is set to 0 )
if ( idx == data.size() ) if ( idx == data.size() )
{ {
++functionIndex; ++functionIndex;
idx = 0; idx = 0;
} }
} }
template < typename EntityType > void peoSynchronousMultiStart< EntityType >::unpackData() template < typename EntityType > void peoSynchronousMultiStart< EntityType >::unpackData()
@ -287,11 +322,11 @@ template < typename EntityType > void peoSynchronousMultiStart< EntityType >::un
num_term++; num_term++;
if ( num_term == data.size() * algorithms.size() ) if ( num_term == data.size() * algorithms.size() )
{ {
getOwner()->setActive(); getOwner()->setActive();
resume(); resume();
} }
} }
template < typename EntityType > void peoSynchronousMultiStart< EntityType >::notifySendingData() template < typename EntityType > void peoSynchronousMultiStart< EntityType >::notifySendingData()

View file

@ -55,7 +55,8 @@ Communicator :: Communicator (int * __argc, char * * * __argv) {
the_thread = this; the_thread = this;
initNode (__argc, __argv); initNode (__argc, __argv);
loadRMCParameters (* __argc, * __argv); loadRMCParameters (* __argc, * __argv);
sem_post (& sem_comm_init); sem_post (& sem_comm_init);
} }
@ -68,15 +69,18 @@ void Communicator :: start () {
sendMessages (); sendMessages ();
if (! atLeastOneActiveRunner ()) if (theEnd() || ! atLeastOneActiveRunner ())
break; break;
receiveMessages (); receiveMessages ();
} }
waitBuffers (); waitBuffers ();
sem_destroy(& sem_comm_init);
printDebugMessage ("finalizing"); printDebugMessage ("finalizing");
MPI_Finalize ();
synchronizeNodes ();
} }
void initCommunication () { void initCommunication () {

View file

@ -54,18 +54,18 @@ static std :: vector <MPI_Request *> act_req; /* Active requests */
void cleanBuffers () { void cleanBuffers () {
for (unsigned i = 0; i < act_req.size ();) { for (unsigned i = 0; i < act_req.size ();) {
MPI_Status stat ; MPI_Status stat ;
int flag ; int flag ;
MPI_Test (act_req [i], & flag, & stat) ; MPI_Test (act_req [i], & flag, & stat) ;
if (flag) { if (flag) {
delete[] act_buf [i] ; delete[] act_buf [i] ;
delete act_req [i] ; delete act_req [i] ;
act_buf [i] = act_buf.back () ; act_buf [i] = act_buf.back () ;
act_buf.pop_back () ; act_buf.pop_back () ;
act_req [i] = act_req.back () ; act_req [i] = act_req.back () ;
act_req.pop_back () ; act_req.pop_back () ;
} }

View file

@ -58,4 +58,3 @@ extern void waitMessage ();
extern void synchronizeNodes (); extern void synchronizeNodes ();
#endif #endif

View file

@ -40,6 +40,52 @@
#include <string> #include <string>
#include <cassert> #include <cassert>
class MPIThreadedEnv {
public:
static void init ( int * __argc, char * * * __argv ) {
static MPIThreadedEnv mpiThreadedEnv( __argc, __argv );
}
static void finalize () {
static bool finalizedEnvironment = false;
if (! finalizedEnvironment ) {
MPI_Finalize ();
finalizedEnvironment = true;
}
}
private:
/* No instance of this class can be created outside its domain */
MPIThreadedEnv ( int * __argc, char * * * __argv ) {
static bool MPIThreadedEnvInitialized = false;
int provided = 1;
if (! MPIThreadedEnvInitialized) {
MPI_Init_thread (__argc, __argv, MPI_THREAD_FUNNELED, & provided);
assert (provided == MPI_THREAD_FUNNELED); /* The MPI implementation must be multi-threaded.
Yet, only one thread performs the comm.
operations */
MPIThreadedEnvInitialized = true;
}
}
~MPIThreadedEnv() {
finalize ();
}
};
static int rk, sz; /* Rank & size */ static int rk, sz; /* Rank & size */
static std :: map <std :: string, int> name_to_rk; static std :: map <std :: string, int> name_to_rk;
@ -57,17 +103,19 @@ int getNumberOfNodes () {
} }
int getRankFromName (const std :: string & __name) { int getRankFromName (const std :: string & __name) {
return atoi (__name.c_str ()); return atoi (__name.c_str ());
} }
void initNode (int * __argc, char * * * __argv) { void initNode (int * __argc, char * * * __argv) {
int provided; rk_to_name.clear ();
MPI_Init_thread (__argc, __argv, MPI_THREAD_FUNNELED, & provided); name_to_rk.clear ();
assert (provided == MPI_THREAD_FUNNELED); /* The MPI implementation must be multi-threaded.
Yet, only one thread performs the comm.
operations */ MPIThreadedEnv :: init ( __argc, __argv );
MPI_Comm_rank (MPI_COMM_WORLD, & rk); /* Who ? */ MPI_Comm_rank (MPI_COMM_WORLD, & rk); /* Who ? */
MPI_Comm_size (MPI_COMM_WORLD, & sz); /* How many ? */ MPI_Comm_size (MPI_COMM_WORLD, & sz); /* How many ? */
@ -77,10 +125,9 @@ void initNode (int * __argc, char * * * __argv) {
/* Processor names */ /* Processor names */
MPI_Get_processor_name (names [0], & len); /* Me */ MPI_Get_processor_name (names [0], & len); /* Me */
MPI_Allgather (names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, MPI_COMM_WORLD); /* Broadcast */ MPI_Allgather (names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, MPI_COMM_WORLD); /* Broadcast */
for (int i = 0; i < sz; i ++) { for (int i = 0; i < sz; i ++) {
rk_to_name.push_back (names [i]); rk_to_name.push_back (names [i]);
name_to_rk [names [i]] = i; name_to_rk [names [i]] = i;
} }
} }

View file

@ -42,12 +42,16 @@
#include "../../core/peo_debug.h" #include "../../core/peo_debug.h"
static std :: vector <pthread_t *> ll_threads; /* Low level threads */ static std :: vector <pthread_t *> ll_threads; /* Low level threads */
static std :: vector <Worker *> worker_threads; /* Worker threads */
static Communicator* communicator_thread = NULL; /* Communicator thread */
void runRMC () { void runRMC () {
/* Worker(s) ? */ /* Worker(s) ? */
for (unsigned i = 0; i < my_node -> num_workers; i ++) for (unsigned i = 0; i < my_node -> num_workers; i ++) {
addThread (new Worker, ll_threads); worker_threads.push_back (new Worker);
addThread (worker_threads.back(), ll_threads);
}
wakeUpCommunicator (); wakeUpCommunicator ();
} }
@ -56,20 +60,26 @@ void initRMC (int & __argc, char * * & __argv) {
/* Communication */ /* Communication */
initCommunication (); initCommunication ();
addThread (new Communicator (& __argc, & __argv), ll_threads); communicator_thread = new Communicator (& __argc, & __argv);
addThread (communicator_thread, ll_threads);
waitNodeInitialization (); waitNodeInitialization ();
initSending (); initSending ();
/* Scheduler */ /* Scheduler */
if (isScheduleNode ()) if (isScheduleNode ())
initScheduler (); initScheduler ();
///
} }
void finalizeRMC () { void finalizeRMC () {
printDebugMessage ("before join threads RMC"); printDebugMessage ("before join threads RMC");
joinThreads (ll_threads); joinThreads (ll_threads);
for (unsigned i = 0; i < worker_threads.size(); i++ ) {
delete worker_threads [i];
}
worker_threads.clear ();
delete communicator_thread;
printDebugMessage ("after join threads RMC"); printDebugMessage ("after join threads RMC");
} }

View file

@ -48,11 +48,15 @@ static std :: queue <SCHED_REQUEST> requests; /* Requests */
static unsigned initNumberOfRes = 0; static unsigned initNumberOfRes = 0;
void initScheduler () { void initScheduler () {
resources = std :: queue <SCHED_RESOURCE> ();
requests = std :: queue <SCHED_REQUEST> ();
initNumberOfRes = 0;
for (unsigned i = 0; i < the_schema.size (); i ++) { for (unsigned i = 0; i < the_schema.size (); i ++) {
const Node & node = the_schema [i]; const Node & node = the_schema [i];
if (node.rk_sched == my_node -> rk) if (node.rk_sched == my_node -> rk)
for (unsigned j = 0; j < node.num_workers; j ++) for (unsigned j = 0; j < node.num_workers; j ++)
resources.push (std :: pair <RANK_ID, WORKER_ID> (i, j + 1)); resources.push (std :: pair <RANK_ID, WORKER_ID> (i, j + 1));

View file

@ -34,9 +34,8 @@
* *
*/ */
#include <iostream>
#include <set> #include <set>
#include <assert.h> #include <cassert>
#include "schema.h" #include "schema.h"
#include "xml_parser.h" #include "xml_parser.h"
@ -131,9 +130,9 @@ void loadSchema (const char * __filename) {
name = getNextNode (); name = getNextNode ();
assert (name == "schema"); assert (name == "schema");
the_schema.clear();
maxSpecifiedRunnerID = 0; maxSpecifiedRunnerID = 0;
while (true) { while (true) {
/* TAG: <group> | </schema> */ /* TAG: <group> | </schema> */

View file

@ -43,6 +43,7 @@
#include "../../core/runner.h" #include "../../core/runner.h"
typedef int RANK_ID; typedef int RANK_ID;
struct Node { struct Node {

View file

@ -61,9 +61,24 @@ static std :: queue <SEND_REQUEST> mess;
static sem_t sem_send; static sem_t sem_send;
static bool contextInitialized = false;
void initSending () { void initSending () {
sem_init (& sem_send, 0, 1); static bool initializedSem = false;
mess = std :: queue <SEND_REQUEST> ();
if (! initializedSem) {
sem_init (& sem_send, 0, 1);
initializedSem = true;
}
else {
sem_destroy(& sem_send);
sem_init (& sem_send, 0, 1);
}
contextInitialized = false;
} }
void send (Communicable * __comm, int __to, int __tag) { void send (Communicable * __comm, int __to, int __tag) {
@ -90,17 +105,15 @@ void sendMessages () {
sem_wait (& sem_send); sem_wait (& sem_send);
static bool contextInitialized = false;
if (! contextInitialized) { if (! contextInitialized) {
contextInitialized = true; contextInitialized = true;
initializeContext(); initializeContext();
} }
while (! mess.empty ()) { while (! mess.empty ()) {
SEND_REQUEST req = mess.front (); SEND_REQUEST req = mess.front ();
Communicable * comm = req.comm; Communicable * comm = req.comm;
initMessage (); initMessage ();
@ -131,12 +144,12 @@ void sendMessages () {
dynamic_cast <Worker *> (comm) -> packTaskDone (); dynamic_cast <Worker *> (comm) -> packTaskDone ();
dynamic_cast <Worker *> (comm) -> notifySendingTaskDone (); dynamic_cast <Worker *> (comm) -> notifySendingTaskDone ();
break; break;
default : default :
break; break;
}; };
if (req.to == TO_ALL) if (req.to == TO_ALL)
sendMessageToAll (req.tag); sendMessageToAll (req.tag);
else else

View file

@ -53,6 +53,6 @@ void Service :: packResourceRequest () {
SCHED_REQUEST req; SCHED_REQUEST req;
req.first = getNodeRank (); req.first = getNodeRank ();
req.second = getKey (); req.second = getKey ();
// printf ("demande de ressource pour %d\n", req.second);
:: pack (req); :: pack (req);
} }

View file

@ -37,18 +37,17 @@
#ifndef __tags_h #ifndef __tags_h
#define __tags_h #define __tags_h
#define EXECUTION_CONTEXT_TAG 1000
#define RUNNER_STOP_TAG 13 #define RUNNER_STOP_TAG 13
#define COOP_TAG 14 #define COOP_TAG 14
#define SCHED_REQUEST_TAG 16 #define SCHED_REQUEST_TAG 16
#define SCHED_RESULT_TAG 17 #define SCHED_RESULT_TAG 17
#define TASK_DATA_TAG 18
#define TASK_DATA_TAG 18
#define TASK_RESULT_TAG 19 #define TASK_RESULT_TAG 19
#define TASK_DONE_TAG 20 #define TASK_DONE_TAG 20
#define EXECUTION_CONTEXT_TAG 1000
#endif #endif

View file

@ -53,7 +53,7 @@ Worker * getWorker (WORKER_ID __key) {
Worker :: Worker () { Worker :: Worker () {
toto = false; recvAndCompleted = false;
id = key_to_worker.size (); id = key_to_worker.size ();
key_to_worker.push_back (this); key_to_worker.push_back (this);
} }
@ -84,7 +84,7 @@ void Worker :: packTaskDone () {
void Worker :: notifySendingResult () { void Worker :: notifySendingResult () {
/* Notifying the scheduler of the termination */ /* Notifying the scheduler of the termination */
toto = true; recvAndCompleted = true;
wakeUp (); wakeUp ();
} }
@ -101,15 +101,15 @@ void Worker :: setSource (int __rank) {
void Worker :: start () { void Worker :: start () {
while (true) { while (true) {
sleep (); sleep ();
if (! atLeastOneActiveRunner ()) if (! atLeastOneActiveRunner ())
break; break;
if (toto) { if (recvAndCompleted) {
send (this, my_node -> rk_sched, TASK_DONE_TAG); send (this, my_node -> rk_sched, TASK_DONE_TAG);
toto = false; recvAndCompleted = false;
} }
else { else {
@ -119,3 +119,8 @@ void Worker :: start () {
} }
} }
} }
void initWorkersEnv () {
key_to_worker.resize (1);
}

View file

@ -41,6 +41,7 @@
#include "../../core/reac_thread.h" #include "../../core/reac_thread.h"
#include "../../core/service.h" #include "../../core/service.h"
typedef unsigned WORKER_ID; typedef unsigned WORKER_ID;
class Worker : public Communicable, public ReactiveThread { class Worker : public Communicable, public ReactiveThread {
@ -70,9 +71,11 @@ private :
Service * serv; Service * serv;
int src; int src;
bool toto; bool recvAndCompleted;
}; };
extern void initWorkersEnv ();
extern Worker * getWorker (WORKER_ID __key); extern Worker * getWorker (WORKER_ID __key);
#endif #endif

View file

@ -83,7 +83,6 @@ std :: string getNextNode () {
xmlTextReaderRead (reader); xmlTextReaderRead (reader);
name = xmlTextReaderName (reader); name = xmlTextReaderName (reader);
value = xmlTextReaderValue (reader); value = xmlTextReaderValue (reader);
// printf ("value = %s\n", value);
} while (! strcmp ((char *) name, "#text") && isSep (value)); } while (! strcmp ((char *) name, "#text") && isSep (value));
std :: string str; std :: string str;