diff --git a/trunk/paradiseo-peo/src/core/communicable.cpp b/trunk/paradiseo-peo/src/core/communicable.cpp index 6ca4fa80f..3caf6ee77 100644 --- a/trunk/paradiseo-peo/src/core/communicable.cpp +++ b/trunk/paradiseo-peo/src/core/communicable.cpp @@ -93,5 +93,9 @@ void Communicable :: resume () { sem_post (& sem_stop); } +void initCommunicableEnv () { - + key_to_comm.resize (1); + comm_to_key.clear (); + Communicable :: num_comm = 0; +} diff --git a/trunk/paradiseo-peo/src/core/communicable.h b/trunk/paradiseo-peo/src/core/communicable.h index 11348707e..7ec179d1a 100644 --- a/trunk/paradiseo-peo/src/core/communicable.h +++ b/trunk/paradiseo-peo/src/core/communicable.h @@ -39,6 +39,7 @@ #include + typedef unsigned COMM_ID; class Communicable { @@ -56,7 +57,11 @@ public : void stop (); /* It suspends the current process */ void resume (); /* It resumes ___________ */ - + +public : + + static unsigned num_comm; + protected : COMM_ID key; @@ -64,10 +69,10 @@ protected : sem_t sem_lock; sem_t sem_stop; - - static unsigned num_comm; }; +extern void initCommunicableEnv (); + extern Communicable * getCommunicable (COMM_ID __key); //extern COMM_ID getKey (const Communicable * __comm); diff --git a/trunk/paradiseo-peo/src/core/eoPop_mesg.h b/trunk/paradiseo-peo/src/core/eoPop_mesg.h index c0b6cc009..948e17ae2 100644 --- a/trunk/paradiseo-peo/src/core/eoPop_mesg.h +++ b/trunk/paradiseo-peo/src/core/eoPop_mesg.h @@ -41,6 +41,7 @@ #include "messaging.h" + template void pack (const eoPop & __pop) { pack ((unsigned) __pop.size ()); @@ -57,4 +58,5 @@ template void unpack (eoPop & __pop) { for (unsigned i = 0; i < n; i ++) unpack (__pop [i]); } + #endif diff --git a/trunk/paradiseo-peo/src/core/eoVector_mesg.h b/trunk/paradiseo-peo/src/core/eoVector_mesg.h index c6de15d53..f5da8945e 100644 --- a/trunk/paradiseo-peo/src/core/eoVector_mesg.h +++ b/trunk/paradiseo-peo/src/core/eoVector_mesg.h @@ -41,6 +41,7 @@ #include "messaging.h" + template void pack (const eoVector & __v) { pack (__v.fitness ()) ; diff --git a/trunk/paradiseo-peo/src/core/messaging.h b/trunk/paradiseo-peo/src/core/messaging.h index 4b9d0b3d9..0a632fa05 100644 --- a/trunk/paradiseo-peo/src/core/messaging.h +++ b/trunk/paradiseo-peo/src/core/messaging.h @@ -83,7 +83,6 @@ template void pack (const std :: pair & __pair) { } -// /* Char */ extern void unpack (char & __c); diff --git a/trunk/paradiseo-peo/src/core/peo_debug.cpp b/trunk/paradiseo-peo/src/core/peo_debug.cpp index 973493aa7..bd0d346ee 100644 --- a/trunk/paradiseo-peo/src/core/peo_debug.cpp +++ b/trunk/paradiseo-peo/src/core/peo_debug.cpp @@ -78,24 +78,28 @@ void endDebugging () { for (unsigned i = 0; i < files.size (); i ++) if (files [i] != stdout) fclose (files [i]); + files.clear(); } void printDebugMessage (const char * __mess) { - return; + if (debug) { char buff [MAX_BUFF_SIZE]; + char localTime [MAX_BUFF_SIZE]; time_t t = time (0); /* Date */ - sprintf (buff, "[%s][%s: ", host, ctime (& t)); - * strchr (buff, '\n') = ']'; + strcpy( localTime, ctime (& t) ); + localTime[ strlen( localTime )-1 ] = ']'; + sprintf (buff, "[%s][%s: ", host, localTime ); + for (unsigned i = 0; i < files.size (); i ++) fprintf (files [i], buff); /* Message */ sprintf (buff, "%s", __mess); - + for (unsigned i = 0; i < files.size (); i ++) { fputs (buff, files [i]); fputs ("\n", files [i]); diff --git a/trunk/paradiseo-peo/src/core/peo_fin.cpp b/trunk/paradiseo-peo/src/core/peo_fin.cpp index d4e2e5a4f..c6b41cceb 100644 --- a/trunk/paradiseo-peo/src/core/peo_fin.cpp +++ b/trunk/paradiseo-peo/src/core/peo_fin.cpp @@ -39,7 +39,6 @@ #include "runner.h" #include "rmc.h" - void peo :: finalize () { printDebugMessage ("waiting for the termination of all threads"); diff --git a/trunk/paradiseo-peo/src/core/peo_fin.h b/trunk/paradiseo-peo/src/core/peo_fin.h index 9cdaf2bf8..cf23cbfdb 100644 --- a/trunk/paradiseo-peo/src/core/peo_fin.h +++ b/trunk/paradiseo-peo/src/core/peo_fin.h @@ -38,7 +38,7 @@ #define __peo_finalize_h namespace peo { - + extern void finalize (); } diff --git a/trunk/paradiseo-peo/src/core/peo_init.cpp b/trunk/paradiseo-peo/src/core/peo_init.cpp index 301090a34..f90f27476 100644 --- a/trunk/paradiseo-peo/src/core/peo_init.cpp +++ b/trunk/paradiseo-peo/src/core/peo_init.cpp @@ -41,6 +41,27 @@ #include "peo_debug.h" #include "rmc.h" + +extern void initCommunicableEnv (); + +extern void initThreadsEnv (); +extern void initReactiveThreadsEnv (); + +extern void initRunnersEnv (); +extern void initWorkersEnv (); + + +static void initExecutionEnv() { + + initCommunicableEnv (); + + initThreadsEnv (); + initReactiveThreadsEnv (); + + initRunnersEnv (); + initWorkersEnv (); +} + namespace peo { int * argc; @@ -52,13 +73,16 @@ namespace peo { argc = & __argc; argv = & __argv; - + + /* Initializing the execution environment */ + initExecutionEnv(); + /* Initializing the the Resource Management and Communication */ initRMC (__argc, __argv); /* Loading the common parameters */ loadParameters (__argc, __argv); - + /* */ initDebugging (); } diff --git a/trunk/paradiseo-peo/src/core/peo_run.cpp b/trunk/paradiseo-peo/src/core/peo_run.cpp index b379146af..ceb01da6f 100644 --- a/trunk/paradiseo-peo/src/core/peo_run.cpp +++ b/trunk/paradiseo-peo/src/core/peo_run.cpp @@ -40,7 +40,7 @@ #include "runner.h" void peo :: run () { - + startRunners (); runRMC (); diff --git a/trunk/paradiseo-peo/src/core/reac_thread.cpp b/trunk/paradiseo-peo/src/core/reac_thread.cpp index d771a8ad4..ca341e475 100644 --- a/trunk/paradiseo-peo/src/core/reac_thread.cpp +++ b/trunk/paradiseo-peo/src/core/reac_thread.cpp @@ -56,9 +56,18 @@ void ReactiveThread :: wakeUp () { sem_post (& sem); } +void initReactiveThreadsEnv () { + + the_end = false; + reac_threads.clear (); +} + void stopReactiveThreads () { the_end = true; for (unsigned i = 0; i < reac_threads.size (); i ++) - reac_threads [i] -> wakeUp (); + reac_threads [i] -> wakeUp (); + reac_threads.clear (); } + +bool theEnd () { return the_end; } diff --git a/trunk/paradiseo-peo/src/core/reac_thread.h b/trunk/paradiseo-peo/src/core/reac_thread.h index b3cd71912..91dad6221 100644 --- a/trunk/paradiseo-peo/src/core/reac_thread.h +++ b/trunk/paradiseo-peo/src/core/reac_thread.h @@ -41,6 +41,7 @@ #include "thread.h" + class ReactiveThread : public Thread { public: @@ -51,13 +52,16 @@ public: void sleep (); void wakeUp (); - + private: sem_t sem; - }; +extern void initReactiveThreadsEnv (); + extern void stopReactiveThreads (); -#endif /*THREAD_H_*/ +extern bool theEnd (); + +#endif /*REAC_THREAD_H_*/ diff --git a/trunk/paradiseo-peo/src/core/runner.cpp b/trunk/paradiseo-peo/src/core/runner.cpp index 827d20f85..199ae5a2a 100644 --- a/trunk/paradiseo-peo/src/core/runner.cpp +++ b/trunk/paradiseo-peo/src/core/runner.cpp @@ -44,6 +44,9 @@ #include "../rmc/mpi/mess.h" #include "../rmc/mpi/tags.h" +#include "../rmc/mpi/node.h" +#include "../rmc/mpi/schema.h" + static std :: vector ll_threads; /* Low-level runner threads */ @@ -63,8 +66,11 @@ extern int getNumberOfNodes (); Runner :: Runner () { + exec_id = 0; def_id = ++ num_def_runners; + the_runners.push_back (this); + sem_init (& sem_start, 0, 0); sem_init (& sem_cntxt, 0, 0); } @@ -76,7 +82,12 @@ RUNNER_ID Runner :: getDefinitionID () { RUNNER_ID Runner :: getExecutionID () { - return def_id; + return exec_id; +} + +void Runner :: setExecutionID (const RUNNER_ID& execution_id) { + + exec_id = execution_id; } Runner * getRunner (RUNNER_ID __key) { @@ -101,7 +112,6 @@ void unpackExecutionContext () { void initializeContext () { - initMessage (); packExecutionContext (); sendMessageToAll (EXECUTION_CONTEXT_TAG); @@ -121,6 +131,13 @@ void initializeContext () { cleanBuffers (); + // setting up the execution IDs + for (unsigned i = 0; i < the_runners.size (); i ++) + the_runners [i] -> setExecutionID ( my_node -> execution_id_run[ i ] ); + + + // synchronizing - all the nodes have to finish initializing + // the context before actually executing the runners synchronizeNodes (); for (unsigned i = 0; i < the_runners.size (); i ++) @@ -163,6 +180,7 @@ void startRunners () { void joinRunners () { joinThreads (ll_threads); + the_runners.clear(); } bool atLeastOneActiveRunner () { @@ -201,3 +219,13 @@ void unpackTerminationOfRunner () { stopReactiveThreads (); } } + +void initRunnersEnv () { + + ll_threads.clear (); + the_runners.clear (); + + num_def_runners = 0; + num_local_exec_runners = 0; + num_exec_runners = 0; +} diff --git a/trunk/paradiseo-peo/src/core/runner.h b/trunk/paradiseo-peo/src/core/runner.h index ba9ba56ac..fd6bf294e 100644 --- a/trunk/paradiseo-peo/src/core/runner.h +++ b/trunk/paradiseo-peo/src/core/runner.h @@ -56,6 +56,8 @@ public : RUNNER_ID getExecutionID (); + void setExecutionID (const RUNNER_ID& execution_id); + bool isAssignedLocally (); void waitStarting (); @@ -80,9 +82,12 @@ private : sem_t sem_cntxt; unsigned def_id; + unsigned exec_id; }; +extern void initRunnersEnv (); + extern Runner * getRunner (RUNNER_ID __key); extern void initializeContext (); @@ -97,5 +102,4 @@ extern unsigned numberOfActiveRunners (); extern void unpackTerminationOfRunner (); - #endif diff --git a/trunk/paradiseo-peo/src/core/service.cpp b/trunk/paradiseo-peo/src/core/service.cpp index c843aec10..8bf981af0 100644 --- a/trunk/paradiseo-peo/src/core/service.cpp +++ b/trunk/paradiseo-peo/src/core/service.cpp @@ -51,9 +51,8 @@ Service * getService (SERVICE_ID __key) { return dynamic_cast (getCommunicable (__key)); } -void Service :: notifySendingData () { +void Service :: notifySendingData () { } -} void Service :: notifySendingResourceRequest () { num_sent_rr --; @@ -61,26 +60,14 @@ void Service :: notifySendingResourceRequest () { notifySendingAllResourceRequests (); } -void Service :: notifySendingAllResourceRequests () { +void Service :: notifySendingAllResourceRequests () { } -} +void Service :: packData () {} -void Service :: packData () { +void Service :: unpackData () {} -} +void Service :: execute () {} -void Service :: unpackData () { +void Service :: packResult () {} -} - -void Service :: execute () { - -} - -void Service :: packResult () { - -} - -void Service :: unpackResult () { - -} +void Service :: unpackResult () {} diff --git a/trunk/paradiseo-peo/src/core/service.h b/trunk/paradiseo-peo/src/core/service.h index 67a0fef03..fda8daf78 100644 --- a/trunk/paradiseo-peo/src/core/service.h +++ b/trunk/paradiseo-peo/src/core/service.h @@ -40,6 +40,7 @@ #include "communicable.h" #include "thread.h" + typedef unsigned SERVICE_ID; class Service : public Communicable { diff --git a/trunk/paradiseo-peo/src/core/thread.cpp b/trunk/paradiseo-peo/src/core/thread.cpp index 8b5e07ac0..e1fa309c8 100644 --- a/trunk/paradiseo-peo/src/core/thread.cpp +++ b/trunk/paradiseo-peo/src/core/thread.cpp @@ -73,6 +73,12 @@ void Thread :: setPassive () { } } +void initThreadsEnv () { + + threads.clear (); + num_act = 0; +} + bool atLeastOneActiveThread () { return num_act; @@ -95,6 +101,9 @@ void addThread (Thread * __hl_thread, std :: vector & __ll_threads void joinThreads (std :: vector & __threads) { - for (unsigned i = 0; i < __threads.size (); i ++) - pthread_join (* __threads [i], 0); + for (unsigned i = 0; i < __threads.size (); i ++) { + pthread_join (* __threads [i], 0); + delete __threads [i]; + } + __threads.clear(); } diff --git a/trunk/paradiseo-peo/src/core/thread.h b/trunk/paradiseo-peo/src/core/thread.h index 16762db76..4f4925c3d 100644 --- a/trunk/paradiseo-peo/src/core/thread.h +++ b/trunk/paradiseo-peo/src/core/thread.h @@ -39,8 +39,8 @@ #include -/* A high-level thread */ +/* A high-level thread */ class Thread { public: @@ -63,6 +63,8 @@ private : bool act; }; +extern void initThreadsEnv (); + extern void addThread (Thread * __hl_thread, std :: vector & __ll_threads); extern void joinThreads (std :: vector & __ll_threads); diff --git a/trunk/paradiseo-peo/src/peoParallelAlgorithmWrapper.h b/trunk/paradiseo-peo/src/peoParallelAlgorithmWrapper.h index c7b430e6a..192bc282f 100644 --- a/trunk/paradiseo-peo/src/peoParallelAlgorithmWrapper.h +++ b/trunk/paradiseo-peo/src/peoParallelAlgorithmWrapper.h @@ -45,81 +45,117 @@ class peoParallelAlgorithmWrapper : public Runner - { +{ - public: + public: - template< typename AlgorithmType > peoParallelAlgorithmWrapper( AlgorithmType& externalAlgorithm ) - : algorithm( new Algorithm< AlgorithmType, void >( externalAlgorithm ) ) + template< typename AlgorithmType > peoParallelAlgorithmWrapper( AlgorithmType& externalAlgorithm ) + : algorithm( new Algorithm< AlgorithmType, void >( externalAlgorithm ) ) {} - template< typename AlgorithmType, typename AlgorithmDataType > peoParallelAlgorithmWrapper( AlgorithmType& externalAlgorithm, AlgorithmDataType& externalData ) - : algorithm( new Algorithm< AlgorithmType, AlgorithmDataType >( externalAlgorithm, externalData ) ) + template< typename AlgorithmType, typename AlgorithmDataType > peoParallelAlgorithmWrapper( AlgorithmType& externalAlgorithm, AlgorithmDataType& externalData ) + : algorithm( new Algorithm< AlgorithmType, AlgorithmDataType >( externalAlgorithm, externalData ) ) {} - ~peoParallelAlgorithmWrapper() + template< typename AlgorithmReturnType > peoParallelAlgorithmWrapper( AlgorithmReturnType& (*externalAlgorithm)() ) + : algorithm( new FunctionAlgorithm< AlgorithmReturnType, void >( externalAlgorithm ) ) + {} + + template< typename AlgorithmReturnType, typename AlgorithmDataType > peoParallelAlgorithmWrapper( AlgorithmReturnType& (*externalAlgorithm)( AlgorithmDataType& ), AlgorithmDataType& externalData ) + : algorithm( new FunctionAlgorithm< AlgorithmReturnType, AlgorithmDataType >( externalAlgorithm, externalData ) ) + {} + + ~peoParallelAlgorithmWrapper() { - + delete algorithm; } - void run() + void run() + { + algorithm->operator()(); + } + + + private: + + struct AbstractAlgorithm + { + + // virtual destructor as we will be using inheritance and polymorphism + virtual ~AbstractAlgorithm() + { } + + // operator to be called for executing the algorithm + virtual void operator()() + { } + }; + + template< typename AlgorithmType, typename AlgorithmDataType > struct Algorithm : public AbstractAlgorithm + { + + Algorithm( AlgorithmType& externalAlgorithm, AlgorithmDataType& externalData ) + : algorithm( externalAlgorithm ), algorithmData( externalData ) + {} + + virtual void operator()() { - algorithm->operator()(); + algorithm( algorithmData ); } - - private: - - struct AbstractAlgorithm - { - - // virtual destructor as we will be using inheritance and polymorphism - virtual ~AbstractAlgorithm() - { } - - // operator to be called for executing the algorithm - virtual void operator()() - { } - }; - - - template< typename AlgorithmType, typename AlgorithmDataType > struct Algorithm : public AbstractAlgorithm - { - - Algorithm( AlgorithmType& externalAlgorithm, AlgorithmDataType& externalData ) - : algorithm( externalAlgorithm ), algorithmData( externalData ) - {} - - virtual void operator()() - { - algorithm( algorithmData ); - } - - AlgorithmType& algorithm; - AlgorithmDataType& algorithmData; - }; - + AlgorithmType& algorithm; + AlgorithmDataType& algorithmData; + }; template< typename AlgorithmType > struct Algorithm< AlgorithmType, void > : public AbstractAlgorithm - { + { - Algorithm( AlgorithmType& externalAlgorithm ) : algorithm( externalAlgorithm ) - {} + Algorithm( AlgorithmType& externalAlgorithm ) : algorithm( externalAlgorithm ) + {} - virtual void operator()() - { - algorithm(); - } - - AlgorithmType& algorithm; - }; - - - private: - - AbstractAlgorithm* algorithm; + virtual void operator()() + { + algorithm(); + } + + AlgorithmType& algorithm; }; + template< typename AlgorithmReturnType, typename AlgorithmDataType > struct FunctionAlgorithm : public AbstractAlgorithm + { + + FunctionAlgorithm( AlgorithmReturnType (*externalAlgorithm)( AlgorithmDataType& ), AlgorithmDataType& externalData ) + : algorithm( externalAlgorithm ), algorithmData( externalData ) + {} + + virtual void operator()() + { + algorithm( algorithmData ); + } + + AlgorithmReturnType (*algorithm)( AlgorithmDataType& ); + AlgorithmDataType& algorithmData; + }; + + template< typename AlgorithmReturnType > struct FunctionAlgorithm< AlgorithmReturnType, void > : public AbstractAlgorithm + { + + FunctionAlgorithm( AlgorithmReturnType (*externalAlgorithm)() ) + : algorithm( externalAlgorithm ) + {} + + virtual void operator()() + { + algorithm(); + } + + AlgorithmReturnType (*algorithm)(); + }; + + private: + + AbstractAlgorithm* algorithm; +}; + #endif diff --git a/trunk/paradiseo-peo/src/peoSyncIslandMig.h b/trunk/paradiseo-peo/src/peoSyncIslandMig.h index 65d91ebf7..71abae0e3 100644 --- a/trunk/paradiseo-peo/src/peoSyncIslandMig.h +++ b/trunk/paradiseo-peo/src/peoSyncIslandMig.h @@ -230,8 +230,8 @@ template< class EOT > void peoSyncIslandMig< EOT > :: pack() lock (); - pack( coop_em.front()->getKey() ); - pack( em.front() ); + ::pack( coop_em.front()->getKey() ); + ::pack( em.front() ); coop_em.pop(); em.pop(); @@ -245,7 +245,7 @@ template< class EOT > void peoSyncIslandMig< EOT > :: unpack() lock (); eoPop< EOT > mig; - unpack( mig ); + ::unpack( mig ); imm.push( mig ); unlock(); diff --git a/trunk/paradiseo-peo/src/peoSynchronousMultiStart.h b/trunk/paradiseo-peo/src/peoSynchronousMultiStart.h index aa2a00b1d..1011dc0a5 100644 --- a/trunk/paradiseo-peo/src/peoSynchronousMultiStart.h +++ b/trunk/paradiseo-peo/src/peoSynchronousMultiStart.h @@ -43,192 +43,227 @@ template < typename EntityType > class peoSynchronousMultiStart : public Service +{ + + public: + + template < typename AlgorithmType > peoSynchronousMultiStart( AlgorithmType& externalAlgorithm ) { - public: + singularAlgorithm = new Algorithm< AlgorithmType >( externalAlgorithm ); + algorithms.push_back( singularAlgorithm ); - template < typename AlgorithmType > peoSynchronousMultiStart( AlgorithmType& externalAlgorithm ) - { + aggregationFunction = new NoAggregationFunction(); + } - singularAlgorithm = new Algorithm< AlgorithmType >( externalAlgorithm ); - algorithms.push_back( singularAlgorithm ); + template < typename AlgorithmReturnType, typename AlgorithmDataType > peoSynchronousMultiStart( AlgorithmReturnType (*externalAlgorithm)( AlgorithmDataType& ) ) + { - aggregationFunction = new NoAggregationFunction(); - } + singularAlgorithm = new FunctionAlgorithm< AlgorithmReturnType, AlgorithmDataType >( externalAlgorithm ); + algorithms.push_back( singularAlgorithm ); - template < typename AlgorithmType, typename AggregationFunctionType > peoSynchronousMultiStart( std::vector< AlgorithmType* >& externalAlgorithms, AggregationFunctionType& externalAggregationFunction ) - { + aggregationFunction = new NoAggregationFunction(); + } - for ( unsigned int index = 0; index < externalAlgorithms; index++ ) - { + template < typename AlgorithmType, typename AggregationFunctionType > peoSynchronousMultiStart( std::vector< AlgorithmType* >& externalAlgorithms, AggregationFunctionType& externalAggregationFunction ) + { - algorithms.push_back( new Algorithm< AlgorithmType >( *externalAlgorithms[ index ] ) ); - } - - aggregationFunction = new Algorithm< AggregationFunctionType >( externalAggregationFunction ); - } - - - ~peoSynchronousMultiStart() - { - - for ( unsigned int index = 0; index < data.size(); index++ ) delete data[ index ]; - for ( unsigned int index = 0; index < algorithms.size(); index++ ) delete algorithms[ index ]; - - delete aggregationFunction; - } - - - template < typename Type > void operator()( Type& externalData ) - { - - for ( typename Type::iterator externalDataIterator = externalData.begin(); externalDataIterator != externalData.end(); externalDataIterator++ ) - { - - data.push_back( new DataType< EntityType >( *externalDataIterator ) ); - } - - functionIndex = dataIndex = idx = num_term = 0; - requestResourceRequest( data.size() * algorithms.size() ); - stop(); - } - - - template < typename Type > void operator()( const Type& externalDataBegin, const Type& externalDataEnd ) - { - - for ( Type externalDataIterator = externalDataBegin; externalDataIterator != externalDataEnd; externalDataIterator++ ) - { - - data.push_back( new DataType< EntityType >( *externalDataIterator ) ); - } - - functionIndex = dataIndex = idx = num_term = 0; - requestResourceRequest( data.size() * algorithms.size() ); - stop(); - } - - - void packData(); - - void unpackData(); - - void execute(); - - void packResult(); - - void unpackResult(); - - void notifySendingData(); - - void notifySendingAllResourceRequests(); - - - private: - - template < typename Type > struct DataType; - - struct AbstractDataType + for ( unsigned int index = 0; index < externalAlgorithms.size(); index++ ) { - virtual ~AbstractDataType() - { } + algorithms.push_back( new Algorithm< AlgorithmType >( *externalAlgorithms[ index ] ) ); + } - template < typename Type > operator Type& () - { + aggregationFunction = new AggregationAlgorithm< AggregationFunctionType >( externalAggregationFunction ); + } - return ( dynamic_cast< DataType< Type >& >( *this ) ).data; - } - }; + template < typename AlgorithmReturnType, typename AlgorithmDataType, typename AggregationFunctionType > + peoSynchronousMultiStart( std::vector< AlgorithmReturnType (*)( AlgorithmDataType& ) >& externalAlgorithms, + AggregationFunctionType& externalAggregationFunction ) + { + + for ( unsigned int index = 0; index < externalAlgorithms.size(); index++ ) + { + + algorithms.push_back( new FunctionAlgorithm< AlgorithmReturnType, AlgorithmDataType >( externalAlgorithms[ index ] ) ); + } + + aggregationFunction = new AggregationAlgorithm< AggregationFunctionType >( externalAggregationFunction ); + } + + ~peoSynchronousMultiStart() + { + + for ( unsigned int index = 0; index < data.size(); index++ ) delete data[ index ]; + for ( unsigned int index = 0; index < algorithms.size(); index++ ) delete algorithms[ index ]; + + delete aggregationFunction; + } + + + template < typename Type > void operator()( Type& externalData ) + { + + for ( typename Type::iterator externalDataIterator = externalData.begin(); externalDataIterator != externalData.end(); externalDataIterator++ ) + { + + data.push_back( new DataType< EntityType >( *externalDataIterator ) ); + } + + functionIndex = dataIndex = idx = num_term = 0; + requestResourceRequest( data.size() * algorithms.size() ); + stop(); + } + + + template < typename Type > void operator()( const Type& externalDataBegin, const Type& externalDataEnd ) + { + + for ( Type externalDataIterator = externalDataBegin; externalDataIterator != externalDataEnd; externalDataIterator++ ) + { + + data.push_back( new DataType< EntityType >( *externalDataIterator ) ); + } + + functionIndex = dataIndex = idx = num_term = 0; + requestResourceRequest( data.size() * algorithms.size() ); + stop(); + } + + + void packData(); + + void unpackData(); + + void execute(); + + void packResult(); + + void unpackResult(); + + void notifySendingData(); + + void notifySendingAllResourceRequests(); + + + private: + + template < typename Type > struct DataType; + + struct AbstractDataType + { + + virtual ~AbstractDataType() + { } + + template < typename Type > operator Type& () + { + + return ( dynamic_cast< DataType< Type >& >( *this ) ).data; + } + }; template < typename Type > struct DataType : public AbstractDataType - { + { - DataType( Type& externalData ) : data( externalData ) - { } + DataType( Type& externalData ) : data( externalData ) + { } - Type& data; - }; + Type& data; + }; - struct AbstractAlgorithm - { + struct AbstractAlgorithm + { - virtual ~AbstractAlgorithm() - { } + virtual ~AbstractAlgorithm() + { } - virtual void operator()( AbstractDataType& dataTypeInstance ) - {} - }; + virtual void operator()( AbstractDataType& dataTypeInstance ) + {} + }; template < typename AlgorithmType > struct Algorithm : public AbstractAlgorithm - { + { - Algorithm( AlgorithmType& externalAlgorithm ) : algorithm( externalAlgorithm ) - { } + Algorithm( AlgorithmType& externalAlgorithm ) : algorithm( externalAlgorithm ) + { } - void operator()( AbstractDataType& dataTypeInstance ) - { - algorithm( dataTypeInstance ); - } + void operator()( AbstractDataType& dataTypeInstance ) + { + algorithm( dataTypeInstance ); + } - AlgorithmType& algorithm; - }; + AlgorithmType& algorithm; + }; + template < typename AlgorithmReturnType, typename AlgorithmDataType > struct FunctionAlgorithm : public AbstractAlgorithm + { - struct AbstractAggregationAlgorithm - { + FunctionAlgorithm( AlgorithmReturnType (*externalAlgorithm)( AlgorithmDataType& ) ) : algorithm( externalAlgorithm ) + { } - virtual ~AbstractAggregationAlgorithm() - { } + void operator()( AbstractDataType& dataTypeInstance ) + { + algorithm( dataTypeInstance ); + } - virtual void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB ) - {}; - }; + AlgorithmReturnType (*algorithm)( AlgorithmDataType& ); + }; + + struct AbstractAggregationAlgorithm + { + + virtual ~AbstractAggregationAlgorithm() + { } + + virtual void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB ) + {}; + }; template < typename AggregationAlgorithmType > struct AggregationAlgorithm : public AbstractAggregationAlgorithm - { + { - AggregationAlgorithm( AggregationAlgorithmType& externalAggregationAlgorithm ) : aggregationAlgorithm( externalAggregationAlgorithm ) - { } + AggregationAlgorithm( AggregationAlgorithmType& externalAggregationAlgorithm ) : aggregationAlgorithm( externalAggregationAlgorithm ) + { } - void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB ) - { + void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB ) + { - aggregationAlgorithm( dataTypeInstanceA, dataTypeInstanceB ); - } + aggregationAlgorithm( dataTypeInstanceA, dataTypeInstanceB ); + } - AggregationAlgorithmType& aggregationAlgorithm; - }; + AggregationAlgorithmType& aggregationAlgorithm; + }; struct NoAggregationFunction : public AbstractAggregationAlgorithm - { + { - void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB ) - { + void operator()( AbstractDataType& dataTypeInstanceA, AbstractDataType& dataTypeInstanceB ) + { - static_cast< EntityType& >( dataTypeInstanceA ) = static_cast< EntityType& >( dataTypeInstanceB ); - } - }; - - - - AbstractAlgorithm* singularAlgorithm; - - std::vector< AbstractAlgorithm* > algorithms; - AbstractAggregationAlgorithm* aggregationFunction; - - - EntityType entityTypeInstance; - std::vector< AbstractDataType* > data; - - unsigned idx; - unsigned num_term; - unsigned dataIndex; - unsigned functionIndex; + static_cast< EntityType& >( dataTypeInstanceA ) = static_cast< EntityType& >( dataTypeInstanceB ); + } }; + + AbstractAlgorithm* singularAlgorithm; + + std::vector< AbstractAlgorithm* > algorithms; + AbstractAggregationAlgorithm* aggregationFunction; + + + EntityType entityTypeInstance; + std::vector< AbstractDataType* > data; + + unsigned idx; + unsigned num_term; + unsigned dataIndex; + unsigned functionIndex; +}; + + template < typename EntityType > void peoSynchronousMultiStart< EntityType >::packData() { @@ -239,11 +274,11 @@ template < typename EntityType > void peoSynchronousMultiStart< EntityType >::pa // done with functionIndex for the entire data set - moving to another // function/algorithm starting all over with the entire data set ( idx is set to 0 ) if ( idx == data.size() ) - { + { - ++functionIndex; - idx = 0; - } + ++functionIndex; + idx = 0; + } } template < typename EntityType > void peoSynchronousMultiStart< EntityType >::unpackData() @@ -287,11 +322,11 @@ template < typename EntityType > void peoSynchronousMultiStart< EntityType >::un num_term++; if ( num_term == data.size() * algorithms.size() ) - { + { - getOwner()->setActive(); - resume(); - } + getOwner()->setActive(); + resume(); + } } template < typename EntityType > void peoSynchronousMultiStart< EntityType >::notifySendingData() diff --git a/trunk/paradiseo-peo/src/rmc/mpi/comm.cpp b/trunk/paradiseo-peo/src/rmc/mpi/comm.cpp index 23ab6c539..3b26de542 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/comm.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/comm.cpp @@ -55,7 +55,8 @@ Communicator :: Communicator (int * __argc, char * * * __argv) { the_thread = this; initNode (__argc, __argv); - loadRMCParameters (* __argc, * __argv); + loadRMCParameters (* __argc, * __argv); + sem_post (& sem_comm_init); } @@ -68,15 +69,18 @@ void Communicator :: start () { sendMessages (); - if (! atLeastOneActiveRunner ()) + if (theEnd() || ! atLeastOneActiveRunner ()) break; receiveMessages (); } - waitBuffers (); + waitBuffers (); + sem_destroy(& sem_comm_init); + printDebugMessage ("finalizing"); - MPI_Finalize (); + + synchronizeNodes (); } void initCommunication () { diff --git a/trunk/paradiseo-peo/src/rmc/mpi/mess.cpp b/trunk/paradiseo-peo/src/rmc/mpi/mess.cpp index 669eaae4d..a55cf4a29 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/mess.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/mess.cpp @@ -54,18 +54,18 @@ static std :: vector act_req; /* Active requests */ void cleanBuffers () { for (unsigned i = 0; i < act_req.size ();) { - + MPI_Status stat ; int flag ; MPI_Test (act_req [i], & flag, & stat) ; if (flag) { - + delete[] act_buf [i] ; delete act_req [i] ; act_buf [i] = act_buf.back () ; act_buf.pop_back () ; - + act_req [i] = act_req.back () ; act_req.pop_back () ; } diff --git a/trunk/paradiseo-peo/src/rmc/mpi/mess.h b/trunk/paradiseo-peo/src/rmc/mpi/mess.h index 83ae3998d..0b8264b9b 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/mess.h +++ b/trunk/paradiseo-peo/src/rmc/mpi/mess.h @@ -58,4 +58,3 @@ extern void waitMessage (); extern void synchronizeNodes (); #endif - diff --git a/trunk/paradiseo-peo/src/rmc/mpi/node.cpp b/trunk/paradiseo-peo/src/rmc/mpi/node.cpp index dcd156781..b989d228d 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/node.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/node.cpp @@ -40,6 +40,52 @@ #include #include + +class MPIThreadedEnv { + +public: + + static void init ( int * __argc, char * * * __argv ) { + + static MPIThreadedEnv mpiThreadedEnv( __argc, __argv ); + } + + static void finalize () { + + static bool finalizedEnvironment = false; + + if (! finalizedEnvironment ) { + + MPI_Finalize (); + finalizedEnvironment = true; + } + } + +private: + + /* No instance of this class can be created outside its domain */ + MPIThreadedEnv ( int * __argc, char * * * __argv ) { + + static bool MPIThreadedEnvInitialized = false; + int provided = 1; + + if (! MPIThreadedEnvInitialized) { + + MPI_Init_thread (__argc, __argv, MPI_THREAD_FUNNELED, & provided); + assert (provided == MPI_THREAD_FUNNELED); /* The MPI implementation must be multi-threaded. + Yet, only one thread performs the comm. + operations */ + MPIThreadedEnvInitialized = true; + } + } + + ~MPIThreadedEnv() { + + finalize (); + } +}; + + static int rk, sz; /* Rank & size */ static std :: map name_to_rk; @@ -57,17 +103,19 @@ int getNumberOfNodes () { } int getRankFromName (const std :: string & __name) { - + return atoi (__name.c_str ()); } void initNode (int * __argc, char * * * __argv) { - - int provided; - MPI_Init_thread (__argc, __argv, MPI_THREAD_FUNNELED, & provided); - assert (provided == MPI_THREAD_FUNNELED); /* The MPI implementation must be multi-threaded. - Yet, only one thread performs the comm. - operations */ + + rk_to_name.clear (); + name_to_rk.clear (); + + + MPIThreadedEnv :: init ( __argc, __argv ); + + MPI_Comm_rank (MPI_COMM_WORLD, & rk); /* Who ? */ MPI_Comm_size (MPI_COMM_WORLD, & sz); /* How many ? */ @@ -77,10 +125,9 @@ void initNode (int * __argc, char * * * __argv) { /* Processor names */ MPI_Get_processor_name (names [0], & len); /* Me */ MPI_Allgather (names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, MPI_COMM_WORLD); /* Broadcast */ - + for (int i = 0; i < sz; i ++) { rk_to_name.push_back (names [i]); name_to_rk [names [i]] = i; } } - diff --git a/trunk/paradiseo-peo/src/rmc/mpi/rmc.cpp b/trunk/paradiseo-peo/src/rmc/mpi/rmc.cpp index 21473a057..9d846dd47 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/rmc.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/rmc.cpp @@ -42,12 +42,16 @@ #include "../../core/peo_debug.h" static std :: vector ll_threads; /* Low level threads */ +static std :: vector worker_threads; /* Worker threads */ +static Communicator* communicator_thread = NULL; /* Communicator thread */ void runRMC () { /* Worker(s) ? */ - for (unsigned i = 0; i < my_node -> num_workers; i ++) - addThread (new Worker, ll_threads); + for (unsigned i = 0; i < my_node -> num_workers; i ++) { + worker_threads.push_back (new Worker); + addThread (worker_threads.back(), ll_threads); + } wakeUpCommunicator (); } @@ -56,20 +60,26 @@ void initRMC (int & __argc, char * * & __argv) { /* Communication */ initCommunication (); - addThread (new Communicator (& __argc, & __argv), ll_threads); + communicator_thread = new Communicator (& __argc, & __argv); + addThread (communicator_thread, ll_threads); waitNodeInitialization (); initSending (); /* Scheduler */ if (isScheduleNode ()) initScheduler (); - - /// } void finalizeRMC () { printDebugMessage ("before join threads RMC"); + joinThreads (ll_threads); + for (unsigned i = 0; i < worker_threads.size(); i++ ) { + delete worker_threads [i]; + } + worker_threads.clear (); + delete communicator_thread; + printDebugMessage ("after join threads RMC"); } diff --git a/trunk/paradiseo-peo/src/rmc/mpi/scheduler.cpp b/trunk/paradiseo-peo/src/rmc/mpi/scheduler.cpp index aef662050..6f7837b3e 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/scheduler.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/scheduler.cpp @@ -48,11 +48,15 @@ static std :: queue requests; /* Requests */ static unsigned initNumberOfRes = 0; void initScheduler () { - + + resources = std :: queue (); + requests = std :: queue (); + initNumberOfRes = 0; + for (unsigned i = 0; i < the_schema.size (); i ++) { - + const Node & node = the_schema [i]; - + if (node.rk_sched == my_node -> rk) for (unsigned j = 0; j < node.num_workers; j ++) resources.push (std :: pair (i, j + 1)); diff --git a/trunk/paradiseo-peo/src/rmc/mpi/schema.cpp b/trunk/paradiseo-peo/src/rmc/mpi/schema.cpp index 3fc02840e..f7f98d47b 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/schema.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/schema.cpp @@ -34,9 +34,8 @@ * */ -#include #include -#include +#include #include "schema.h" #include "xml_parser.h" @@ -131,9 +130,9 @@ void loadSchema (const char * __filename) { name = getNextNode (); assert (name == "schema"); + the_schema.clear(); maxSpecifiedRunnerID = 0; - while (true) { /* TAG: | */ diff --git a/trunk/paradiseo-peo/src/rmc/mpi/schema.h b/trunk/paradiseo-peo/src/rmc/mpi/schema.h index 57604e876..6feabadc7 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/schema.h +++ b/trunk/paradiseo-peo/src/rmc/mpi/schema.h @@ -43,6 +43,7 @@ #include "../../core/runner.h" + typedef int RANK_ID; struct Node { diff --git a/trunk/paradiseo-peo/src/rmc/mpi/send.cpp b/trunk/paradiseo-peo/src/rmc/mpi/send.cpp index 303953430..03bc31c19 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/send.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/send.cpp @@ -61,9 +61,24 @@ static std :: queue mess; static sem_t sem_send; +static bool contextInitialized = false; + void initSending () { - sem_init (& sem_send, 0, 1); + static bool initializedSem = false; + + mess = std :: queue (); + + if (! initializedSem) { + sem_init (& sem_send, 0, 1); + initializedSem = true; + } + else { + sem_destroy(& sem_send); + sem_init (& sem_send, 0, 1); + } + + contextInitialized = false; } void send (Communicable * __comm, int __to, int __tag) { @@ -90,17 +105,15 @@ void sendMessages () { sem_wait (& sem_send); - static bool contextInitialized = false; - if (! contextInitialized) { contextInitialized = true; initializeContext(); } while (! mess.empty ()) { - + SEND_REQUEST req = mess.front (); - + Communicable * comm = req.comm; initMessage (); @@ -131,12 +144,12 @@ void sendMessages () { dynamic_cast (comm) -> packTaskDone (); dynamic_cast (comm) -> notifySendingTaskDone (); break; - + default : break; }; - + if (req.to == TO_ALL) sendMessageToAll (req.tag); else diff --git a/trunk/paradiseo-peo/src/rmc/mpi/service.cpp b/trunk/paradiseo-peo/src/rmc/mpi/service.cpp index f26f70b3d..df2556426 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/service.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/service.cpp @@ -53,6 +53,6 @@ void Service :: packResourceRequest () { SCHED_REQUEST req; req.first = getNodeRank (); req.second = getKey (); - // printf ("demande de ressource pour %d\n", req.second); + :: pack (req); } diff --git a/trunk/paradiseo-peo/src/rmc/mpi/tags.h b/trunk/paradiseo-peo/src/rmc/mpi/tags.h index 4325e6a51..92d5ac1a2 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/tags.h +++ b/trunk/paradiseo-peo/src/rmc/mpi/tags.h @@ -37,18 +37,17 @@ #ifndef __tags_h #define __tags_h -#define EXECUTION_CONTEXT_TAG 1000 - #define RUNNER_STOP_TAG 13 #define COOP_TAG 14 #define SCHED_REQUEST_TAG 16 - #define SCHED_RESULT_TAG 17 -#define TASK_DATA_TAG 18 +#define TASK_DATA_TAG 18 #define TASK_RESULT_TAG 19 #define TASK_DONE_TAG 20 +#define EXECUTION_CONTEXT_TAG 1000 + #endif diff --git a/trunk/paradiseo-peo/src/rmc/mpi/worker.cpp b/trunk/paradiseo-peo/src/rmc/mpi/worker.cpp index 53ad72925..0dc242389 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/worker.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/worker.cpp @@ -53,7 +53,7 @@ Worker * getWorker (WORKER_ID __key) { Worker :: Worker () { - toto = false; + recvAndCompleted = false; id = key_to_worker.size (); key_to_worker.push_back (this); } @@ -84,7 +84,7 @@ void Worker :: packTaskDone () { void Worker :: notifySendingResult () { /* Notifying the scheduler of the termination */ - toto = true; + recvAndCompleted = true; wakeUp (); } @@ -101,15 +101,15 @@ void Worker :: setSource (int __rank) { void Worker :: start () { while (true) { - + sleep (); if (! atLeastOneActiveRunner ()) break; - - if (toto) { + + if (recvAndCompleted) { send (this, my_node -> rk_sched, TASK_DONE_TAG); - toto = false; + recvAndCompleted = false; } else { @@ -119,3 +119,8 @@ void Worker :: start () { } } } + +void initWorkersEnv () { + + key_to_worker.resize (1); +} diff --git a/trunk/paradiseo-peo/src/rmc/mpi/worker.h b/trunk/paradiseo-peo/src/rmc/mpi/worker.h index 43c4ec62c..709a10584 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/worker.h +++ b/trunk/paradiseo-peo/src/rmc/mpi/worker.h @@ -41,6 +41,7 @@ #include "../../core/reac_thread.h" #include "../../core/service.h" + typedef unsigned WORKER_ID; class Worker : public Communicable, public ReactiveThread { @@ -70,9 +71,11 @@ private : Service * serv; int src; - bool toto; + bool recvAndCompleted; }; +extern void initWorkersEnv (); + extern Worker * getWorker (WORKER_ID __key); #endif diff --git a/trunk/paradiseo-peo/src/rmc/mpi/xml_parser.cpp b/trunk/paradiseo-peo/src/rmc/mpi/xml_parser.cpp index 1f044ba90..706468cba 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/xml_parser.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/xml_parser.cpp @@ -83,7 +83,6 @@ std :: string getNextNode () { xmlTextReaderRead (reader); name = xmlTextReaderName (reader); value = xmlTextReaderValue (reader); - // printf ("value = %s\n", value); } while (! strcmp ((char *) name, "#text") && isSep (value)); std :: string str;