diff --git a/trunk/paradiseo-peo/src/core/cooperative.h b/trunk/paradiseo-peo/src/core/cooperative.h index efdcfebe4..65862d4b4 100644 --- a/trunk/paradiseo-peo/src/core/cooperative.h +++ b/trunk/paradiseo-peo/src/core/cooperative.h @@ -37,6 +37,7 @@ #ifndef __cooperative_h #define __cooperative_h +#include #include "communicable.h" #include "runner.h" @@ -54,10 +55,20 @@ public : virtual void unpack () = 0; - void send (Cooperative * __coop); + virtual void packSynchronizeReq () = 0; + + void send (Cooperative * __coop); + + void synchronizeCoopEx (); virtual void notifySending (); + virtual void notifyReceiving (); + + virtual void notifySendingSyncReq (); + + virtual void notifySynchronized (); + private : Runner * owner; diff --git a/trunk/paradiseo-peo/src/core/peo_init.cpp b/trunk/paradiseo-peo/src/core/peo_init.cpp index 31af40b19..52c35bcf5 100644 --- a/trunk/paradiseo-peo/src/core/peo_init.cpp +++ b/trunk/paradiseo-peo/src/core/peo_init.cpp @@ -54,6 +54,7 @@ extern void initRunnersEnv (); extern void initWorkersEnv (); extern void initScheduler (); +extern void initSynchron (); static void initExecutionEnv() { @@ -61,6 +62,7 @@ static void initExecutionEnv() { initCommunicableEnv (); initBuffers (); initScheduler(); + initSynchron (); initThreadsEnv (); initReactiveThreadsEnv (); diff --git a/trunk/paradiseo-peo/src/core/runner.cpp b/trunk/paradiseo-peo/src/core/runner.cpp index 624dd8578..4f421b32f 100644 --- a/trunk/paradiseo-peo/src/core/runner.cpp +++ b/trunk/paradiseo-peo/src/core/runner.cpp @@ -63,6 +63,8 @@ extern int getNodeRank (); extern int getNumberOfNodes (); +extern void wakeUpCommunicator (); + Runner :: Runner () { @@ -188,6 +190,8 @@ void unpackTerminationOfRunner () { stopReactiveThreads (); printDebugMessage ("Reactive threads stopped!"); } + + wakeUpCommunicator (); } void initRunnersEnv () { diff --git a/trunk/paradiseo-peo/src/core/topology.cpp b/trunk/paradiseo-peo/src/core/topology.cpp index 6e6ed0779..465210b93 100644 --- a/trunk/paradiseo-peo/src/core/topology.cpp +++ b/trunk/paradiseo-peo/src/core/topology.cpp @@ -45,3 +45,8 @@ void Topology :: add (Cooperative & __mig) { mig.push_back (& __mig) ; } + +Topology :: operator std :: vector & () { + + return mig; +} diff --git a/trunk/paradiseo-peo/src/core/topology.h b/trunk/paradiseo-peo/src/core/topology.h index 1addbf842..8c818561f 100644 --- a/trunk/paradiseo-peo/src/core/topology.h +++ b/trunk/paradiseo-peo/src/core/topology.h @@ -53,9 +53,11 @@ public: std :: vector & __from, std :: vector & __to) = 0; + operator std :: vector & (); + protected: - std :: vector mig ; + std :: vector mig; }; #endif diff --git a/trunk/paradiseo-peo/src/peoAsyncIslandMig.h b/trunk/paradiseo-peo/src/peoAsyncIslandMig.h index 266c32b0c..96a4db782 100644 --- a/trunk/paradiseo-peo/src/peoAsyncIslandMig.h +++ b/trunk/paradiseo-peo/src/peoAsyncIslandMig.h @@ -174,6 +174,8 @@ template< class EOT > class peoAsyncIslandMig : public Cooperative, public eoUpd void pack(); //! Auxiliary function dealing with receiving immigrant individuals. There is no need to explicitly call the function. void unpack(); + //! Auxiliary function dealing with the packing of synchronization requests - not the case. + void packSynchronizeReq(); private: @@ -243,6 +245,8 @@ template< class EOT > void peoAsyncIslandMig< EOT > :: unpack() unlock(); } +template< class EOT > void peoAsyncIslandMig< EOT > :: packSynchronizeReq() { +} template< class EOT > void peoAsyncIslandMig< EOT > :: emigrate() { diff --git a/trunk/paradiseo-peo/src/peoSyncIslandMig.h b/trunk/paradiseo-peo/src/peoSyncIslandMig.h index 6cce0c2a5..149c28bf8 100644 --- a/trunk/paradiseo-peo/src/peoSyncIslandMig.h +++ b/trunk/paradiseo-peo/src/peoSyncIslandMig.h @@ -59,6 +59,8 @@ #include "core/cooperative.h" #include "core/peo_debug.h" +#include "rmc/mpi/synchron.h" + //! Class providing the basis for the synchronous island migration model. @@ -176,10 +178,21 @@ public: void pack(); //! Auxiliary function dealing with receiving immigrant individuals. There is no need to explicitly call the function. void unpack(); + //! Auxiliary function dealing with the packing of synchronization requests. There is no need to explicitly call the function. + void packSynchronizeReq(); //! Auxiliary function dealing with migration notifications. There is no need to explicitly call the function. void notifySending(); + //! Auxiliary function dealing with migration notifications. There is no need to explicitly call the function. + void notifyReceiving(); + + //! Auxiliary function dealing with synchronizing runners for migrations. There is no need to explicitly call the function. + void notifySendingSyncReq(); + + //! Auxiliary function for notifying the synchronization of the runners involved in migration. + void notifySynchronized(); + private: @@ -205,6 +218,11 @@ private: std :: queue< Cooperative* > coop_em; sem_t sync; + + bool explicitPassive; + bool standbyMigration; + + std :: vector< Cooperative* > in, out, all; }; @@ -227,39 +245,28 @@ template< class EOT > peoSyncIslandMig< EOT > :: peoSyncIslandMig( template< class EOT > void peoSyncIslandMig< EOT > :: pack() { - - lock (); - ::pack( coop_em.front()->getKey() ); ::pack( em.front() ); coop_em.pop(); em.pop(); - - unlock(); } - template< class EOT > void peoSyncIslandMig< EOT > :: unpack() { - - lock (); - eoPop< EOT > mig; ::unpack( mig ); imm.push( mig ); - - unlock(); - - sem_post( &sync ); + explicitPassive = true; } +template< class EOT > void peoSyncIslandMig< EOT > :: packSynchronizeReq() { + + packSynchronRequest( all ); +} template< class EOT > void peoSyncIslandMig< EOT > :: emigrate() { - std :: vector< Cooperative* > in, out; - topology.setNeighbors( this, in, out ); - for ( unsigned i = 0; i < out.size(); i ++ ) { @@ -272,57 +279,55 @@ template< class EOT > void peoSyncIslandMig< EOT > :: emigrate() } } - template< class EOT > void peoSyncIslandMig< EOT > :: immigrate() { - - lock (); - { - assert( imm.size() ); replace( destination, imm.front() ) ; imm.pop(); printDebugMessage( "peoSyncIslandMig: receiving some immigrants." ); - } - unlock(); } - template< class EOT > void peoSyncIslandMig< EOT > :: operator()() { if ( !cont( source ) ) { + explicitPassive = standbyMigration = false; + topology.setNeighbors( this, in, out ); all = topology; + + synchronizeCoopEx(); stop(); // sending emigrants emigrate(); - stop(); - // synchronizing sem_wait( &sync ); - getOwner()->setActive(); - // receiving immigrants immigrate(); + + synchronizeCoopEx(); stop(); } } - template< class EOT > void peoSyncIslandMig< EOT > :: notifySending() { - - lock (); - { - - if ( imm.empty() ) - { - - printDebugMessage( "peoSyncIslandMig: entering pasive mode\n" ); - getOwner()->setPassive(); - } + if ( !explicitPassive ) getOwner()->setPassive(); } -unlock(); +template< class EOT > void peoSyncIslandMig< EOT > :: notifyReceiving() +{ + if ( standbyMigration ) getOwner()->setActive(); + sem_post( &sync ); +} + +template< class EOT > void peoSyncIslandMig< EOT > :: notifySendingSyncReq () { + + getOwner()->setPassive(); +} + +template< class EOT > void peoSyncIslandMig< EOT > :: notifySynchronized () { + + standbyMigration = true; + getOwner()->setActive(); resume(); } diff --git a/trunk/paradiseo-peo/src/rmc/mpi/CMakeLists.txt b/trunk/paradiseo-peo/src/rmc/mpi/CMakeLists.txt index cfdf54c78..5ab0ccd94 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/CMakeLists.txt +++ b/trunk/paradiseo-peo/src/rmc/mpi/CMakeLists.txt @@ -1,3 +1,9 @@ +SET(CMAKE_DEFAULT_BUILD_TYPE Debug CACHE STRING "Variable that stores the default CMake build type" FORCE) +SET(CMAKE_CXX_FLAGS "-O0 -g") +SET(CMAKE_CXX_FLAGS_DEBUG "-O0 -g") +SET(CMAKE_CXX_FLAGS_RELEASE "-O0 -g") +SET(CMAKE_CXX_FLAGS_MINSIZEREL "-O0 -g") +SET(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O0 -g") ###################################################################################### ### 0) Set the compiler @@ -29,10 +35,11 @@ SET(LIBRARY_OUTPUT_PATH ${RMC_MPI_LIB_OUTPUT_PATH}) SET (RMC_MPI_SOURCES node.cpp param.cpp comm.cpp - coop.cpp + cooperative.cpp mess.cpp rmc.cpp scheduler.cpp + synchron.cpp worker.cpp send.cpp recv.cpp diff --git a/trunk/paradiseo-peo/src/rmc/mpi/coop.cpp b/trunk/paradiseo-peo/src/rmc/mpi/coop.cpp index 541677dd8..8d750f4aa 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/coop.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/coop.cpp @@ -57,6 +57,10 @@ void Cooperative :: send (Cooperative * __coop) { // stop (); } +void Cooperative :: synchronizeCoopEx () { + :: send (this, my_node -> rk_sched, SYNCHRONIZE_REQ_TAG); +} + Cooperative * getCooperative (COOP_ID __key) { return dynamic_cast (getCommunicable (__key)); @@ -67,3 +71,12 @@ void Cooperative :: notifySending () { //getOwner -> setPassive (); // resume (); } + +void Cooperative :: notifyReceiving () { +} + +void Cooperative :: notifySendingSyncReq () { +} + +void Cooperative :: notifySynchronized () { +} diff --git a/trunk/paradiseo-peo/src/rmc/mpi/node.h b/trunk/paradiseo-peo/src/rmc/mpi/node.h index 66610c91f..92d289bc1 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/node.h +++ b/trunk/paradiseo-peo/src/rmc/mpi/node.h @@ -40,8 +40,28 @@ #include #include +#include "../../core/runner.h" + +typedef int RANK_ID; + +struct Node { + + RANK_ID rk; /* Rank */ + std :: string name; /* Host name */ + unsigned num_workers; /* Number of parallel workers */ + int rk_sched; /* rank of the scheduler */ + std :: vector id_run; /* List of runner def. IDs */ + std :: vector execution_id_run; /* List of runtime execution runner IDs */ +}; + +extern Node * my_node; + +extern bool isScheduleNode (); + extern int getNodeRank (); /* It gives the rank of the calling process */ +extern RANK_ID getRankOfRunner (RUNNER_ID __key); + extern int getNumberOfNodes (); /* It gives the size of the environment (Total number of nodes) */ extern void collectiveCountOfRunners ( unsigned int* num_local_exec_runners, unsigned int* num_exec_runners ); diff --git a/trunk/paradiseo-peo/src/rmc/mpi/recv.cpp b/trunk/paradiseo-peo/src/rmc/mpi/recv.cpp index 9ea9cf7fd..4562cd8dd 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/recv.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/recv.cpp @@ -38,13 +38,13 @@ #include "tags.h" #include "worker.h" #include "scheduler.h" +#include "synchron.h" #include "mess.h" #include "node.h" #include "../../core/runner.h" #include "../../core/cooperative.h" #include "../../core/peo_debug.h" - void receiveMessages () { cleanBuffers (); @@ -66,14 +66,30 @@ void receiveMessages () { switch (tag) { case RUNNER_STOP_TAG: - unpackTerminationOfRunner (); - wakeUpCommunicator (); + unpackTerminationOfRunner (); break; + case SYNCHRONIZE_REQ_TAG: + unpackSynchronRequest (); + break; + + case SYNCHRONIZED_TAG: + { + RUNNER_ID runner_id; + unpack (runner_id); + + COOP_ID coop_id; + unpack (coop_id); + + getCooperative (coop_id) -> notifySynchronized (); + break; + } + case COOP_TAG: COOP_ID coop_id; unpack (coop_id); getCooperative (coop_id) -> unpack (); + getCooperative (coop_id) -> notifyReceiving (); break; case SCHED_REQUEST_TAG: diff --git a/trunk/paradiseo-peo/src/rmc/mpi/schema.h b/trunk/paradiseo-peo/src/rmc/mpi/schema.h index 0381bd5cd..30e061c2c 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/schema.h +++ b/trunk/paradiseo-peo/src/rmc/mpi/schema.h @@ -41,29 +41,18 @@ #include #include +#include "node.h" #include "../../core/runner.h" -typedef int RANK_ID; - -struct Node { - - RANK_ID rk; /* Rank */ - std :: string name; /* Host name */ - unsigned num_workers; /* Number of parallel workers */ - int rk_sched; /* rank of the scheduler */ - std :: vector id_run; /* List of runner def. IDs */ - std :: vector execution_id_run; /* List of runtime execution runner IDs */ -}; - -extern std :: vector the_schema; - extern Node * my_node; -extern void loadSchema (const char * __filename); - -extern RANK_ID getRankOfRunner (RUNNER_ID __key); - extern bool isScheduleNode (); +extern RANK_ID getRankOfRunner (RUNNER_ID __key); + +extern std :: vector the_schema; + +extern void loadSchema (const char * __filename); + #endif diff --git a/trunk/paradiseo-peo/src/rmc/mpi/send.cpp b/trunk/paradiseo-peo/src/rmc/mpi/send.cpp index d36dc0d88..cf10b9b6e 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/send.cpp +++ b/trunk/paradiseo-peo/src/rmc/mpi/send.cpp @@ -131,6 +131,11 @@ void sendMessages () { dynamic_cast (comm) -> notifySending (); break; + case SYNCHRONIZE_REQ_TAG: + dynamic_cast (comm) -> packSynchronizeReq (); + dynamic_cast (comm) -> notifySendingSyncReq (); + break; + case SCHED_REQUEST_TAG: dynamic_cast (comm) -> packResourceRequest (); dynamic_cast (comm) -> notifySendingResourceRequest (); @@ -155,6 +160,7 @@ void sendMessages () { sendMessageToAll (req.tag); else sendMessage (req.to, req.tag); + mess.pop (); } diff --git a/trunk/paradiseo-peo/src/rmc/mpi/synchron.cpp b/trunk/paradiseo-peo/src/rmc/mpi/synchron.cpp new file mode 100644 index 000000000..12468c226 --- /dev/null +++ b/trunk/paradiseo-peo/src/rmc/mpi/synchron.cpp @@ -0,0 +1,123 @@ +/* +* +* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007 +* (C) OPAC Team, LIFL, 2002-2007 +* +* Sebastien Cahon, Alexandru-Adrian Tantar +* +* This software is governed by the CeCILL license under French law and +* abiding by the rules of distribution of free software. You can use, +* modify and/ or redistribute the software under the terms of the CeCILL +* license as circulated by CEA, CNRS and INRIA at the following URL +* "http://www.cecill.info". +* +* As a counterpart to the access to the source code and rights to copy, +* modify and redistribute granted by the license, users are provided only +* with a limited warranty and the software's author, the holder of the +* economic rights, and the successive licensors have only limited liability. +* +* In this respect, the user's attention is drawn to the risks associated +* with loading, using, modifying and/or developing or reproducing the +* software by the user in light of its specific status of free software, +* that may mean that it is complicated to manipulate, and that also +* therefore means that it is reserved for developers and experienced +* professionals having in-depth computer knowledge. Users are therefore +* encouraged to load and test the software's suitability as regards their +* requirements in conditions enabling the security of their systems and/or +* data to be ensured and, more generally, to use and operate it in the +* same conditions as regards security. +* The fact that you are presently reading this means that you have had +* knowledge of the CeCILL license and that you accept its terms. +* +* ParadisEO WebSite : http://paradiseo.gforge.inria.fr +* Contact: paradiseo-help@lists.gforge.inria.fr +* +*/ + +#include +#include "synchron.h" +#include "../../core/messaging.h" +#include "node.h" +#include "tags.h" +#include "mess.h" + + + +static SYNC syncRunners; /* Runners to be synchronized */ + +extern void wakeUpCommunicator(); + +extern RANK_ID getRankOfRunner (RUNNER_ID __key); + +/* Initializing the list of runners to be synchronized */ +void initSynchron () { + + syncRunners = SYNC(); +} + +/* packing a synchronization request from a service */ +void packSynchronRequest ( const std :: vector & coops ) { + + /* Number of coops to synchronize */ + pack( (unsigned)( coops.size() ) ); + + /* Coops to synchronize */ + for (unsigned i = 0; i < coops.size(); i ++) { + pack( coops[ i ]->getOwner()->getDefinitionID() ); + pack( coops[ i ]->getKey() ); + } +} + +/* Processing a synchronization request from a service */ +void unpackSynchronRequest () { + + unsigned req_num_entries; + unpack (req_num_entries); + + /* Creating a sync vector + adding the created entry */ + std::pair< SYNC_RUNNERS, unsigned > req_sync; + + /* Adding entries for each of the runners to be synchronized */ + SyncEntry req_entry; + for (unsigned i = 0; i < req_num_entries; i ++) { + + unpack (req_entry.runner); + unpack (req_entry.coop); + + req_sync.first.push_back (req_entry); + } + + /* Looking for the sync vector */ + SYNC::iterator sync_it = syncRunners.find (req_sync); + + /* The vector does not exist - insert a new sync */ + if (sync_it == syncRunners.end ()) { + req_sync.second = 1; + syncRunners.insert (req_sync); + } + else { + + /* The vector exists - updating the entry */ + std::pair< SYNC_RUNNERS, unsigned >& sync_req_entry = const_cast< std::pair< SYNC_RUNNERS, unsigned >& > (*sync_it); + sync_req_entry.second ++; + + /* All the runners to be synchronized sent the SYNC_REQUEST signal */ + if (sync_req_entry.second == sync_req_entry.first.size()) { + + /* Remove the entry */ + syncRunners.erase (sync_it); + + /* Send SYNCHRONIZED signals to all the coop objects */ + for (unsigned i = 0; i < req_sync.first.size(); i ++) { + + initMessage (); + + pack (req_sync.first [i].runner); + pack (req_sync.first [i].coop); + + RANK_ID dest_rank = getRankOfRunner (req_sync.first [i].runner); + sendMessage (dest_rank, SYNCHRONIZED_TAG); + } + } + } +} diff --git a/trunk/paradiseo-peo/src/rmc/mpi/synchron.h b/trunk/paradiseo-peo/src/rmc/mpi/synchron.h new file mode 100644 index 000000000..b9ac7e6b7 --- /dev/null +++ b/trunk/paradiseo-peo/src/rmc/mpi/synchron.h @@ -0,0 +1,85 @@ +/* +* +* Copyright (C) DOLPHIN Project-Team, INRIA Futurs, 2006-2007 +* (C) OPAC Team, LIFL, 2002-2007 +* +* Sebastien Cahon, Alexandru-Adrian Tantar +* +* This software is governed by the CeCILL license under French law and +* abiding by the rules of distribution of free software. You can use, +* modify and/ or redistribute the software under the terms of the CeCILL +* license as circulated by CEA, CNRS and INRIA at the following URL +* "http://www.cecill.info". +* +* As a counterpart to the access to the source code and rights to copy, +* modify and redistribute granted by the license, users are provided only +* with a limited warranty and the software's author, the holder of the +* economic rights, and the successive licensors have only limited liability. +* +* In this respect, the user's attention is drawn to the risks associated +* with loading, using, modifying and/or developing or reproducing the +* software by the user in light of its specific status of free software, +* that may mean that it is complicated to manipulate, and that also +* therefore means that it is reserved for developers and experienced +* professionals having in-depth computer knowledge. Users are therefore +* encouraged to load and test the software's suitability as regards their +* requirements in conditions enabling the security of their systems and/or +* data to be ensured and, more generally, to use and operate it in the +* same conditions as regards security. +* The fact that you are presently reading this means that you have had +* knowledge of the CeCILL license and that you accept its terms. +* +* ParadisEO WebSite : http://paradiseo.gforge.inria.fr +* Contact: paradiseo-help@lists.gforge.inria.fr +* +*/ + +#ifndef __synchron_h +#define __synchron_h + +#include +#include +#include + +#include "../../core/runner.h" +#include "../../core/cooperative.h" + +struct SyncEntry { + + RUNNER_ID runner; + COOP_ID coop; +}; + +struct SyncCompare { + + bool operator()( const std::pair< std::vector< SyncEntry >, unsigned >& A, const std::pair< std::vector< SyncEntry >, unsigned >& B ) { + + const std::vector< SyncEntry >& syncA = A.first; + const std::vector< SyncEntry >& syncB = B.first; + + if ( syncA.size() == syncB.size() ) { + std::vector< SyncEntry >::const_iterator itA = syncA.begin(); + std::vector< SyncEntry >::const_iterator itB = syncB.begin(); + + while ( (*itA).runner < (*itB).runner && itA != syncA.end() ) { itA++; itB++; } + + return itA == syncA.end(); + } + + return syncA.size() < syncB.size(); + } +}; + +typedef std::vector< SyncEntry > SYNC_RUNNERS; +typedef std::set< std::pair< SYNC_RUNNERS, unsigned >, SyncCompare > SYNC; + +/* Initializing the list of runners to be synchronized */ +extern void initSynchron (); + +/* packing a synchronization request from a service */ +extern void packSynchronRequest ( const std :: vector & coops ); + +/* Processing a synchronization request from a service */ +extern void unpackSynchronRequest (); + +#endif diff --git a/trunk/paradiseo-peo/src/rmc/mpi/tags.h b/trunk/paradiseo-peo/src/rmc/mpi/tags.h index 8fb7f8a84..67a7a71be 100644 --- a/trunk/paradiseo-peo/src/rmc/mpi/tags.h +++ b/trunk/paradiseo-peo/src/rmc/mpi/tags.h @@ -48,4 +48,7 @@ #define TASK_RESULT_TAG 19 #define TASK_DONE_TAG 20 +#define SYNCHRONIZE_REQ_TAG 1000 +#define SYNCHRONIZED_TAG 1001 + #endif