From b291e56e03c60dd04d16808fbbf513cd4c9916af Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 25 Jun 2012 14:11:44 +0200 Subject: [PATCH] Putting everything in namespace eo::mpi --- eo/src/mpi/CMakeLists.txt | 2 +- eo/src/mpi/MpiNode.h | 25 ---- eo/src/mpi/assignmentAlgorithm.h | 207 -------------------------- eo/src/mpi/eoMpi.cpp | 11 ++ eo/src/mpi/eoMpi.h | 132 ++++++++++++++++ eo/src/mpi/eoMpiAssignmentAlgorithm.h | 207 ++++++++++++++++++++++++++ eo/src/mpi/eoMpiNode.h | 31 ++++ eo/src/mpi/eoParallelApply.h | 183 ++++++++++++----------- eo/src/mpi/eompi.cpp | 5 - eo/src/mpi/eompi.h | 127 ---------------- eo/test/mpi/multipleRoles.cpp | 12 +- eo/test/mpi/parallelApply.cpp | 16 +- 12 files changed, 492 insertions(+), 466 deletions(-) delete mode 100644 eo/src/mpi/MpiNode.h delete mode 100644 eo/src/mpi/assignmentAlgorithm.h create mode 100644 eo/src/mpi/eoMpi.cpp create mode 100644 eo/src/mpi/eoMpi.h create mode 100644 eo/src/mpi/eoMpiAssignmentAlgorithm.h create mode 100644 eo/src/mpi/eoMpiNode.h delete mode 100644 eo/src/mpi/eompi.cpp delete mode 100644 eo/src/mpi/eompi.h diff --git a/eo/src/mpi/CMakeLists.txt b/eo/src/mpi/CMakeLists.txt index c770acd13..0e22e47e0 100644 --- a/eo/src/mpi/CMakeLists.txt +++ b/eo/src/mpi/CMakeLists.txt @@ -13,7 +13,7 @@ SET(EOMPI_LIB_OUTPUT_PATH ${EO_BINARY_DIR}/lib) SET(LIBRARY_OUTPUT_PATH ${EOMPI_LIB_OUTPUT_PATH}) SET(EOMPI_SOURCES - eompi.cpp + eoMpi.cpp ) ADD_LIBRARY(eompi STATIC ${EOMPI_SOURCES}) diff --git a/eo/src/mpi/MpiNode.h b/eo/src/mpi/MpiNode.h deleted file mode 100644 index 10804ce01..000000000 --- a/eo/src/mpi/MpiNode.h +++ /dev/null @@ -1,25 +0,0 @@ -# ifndef __MPI_NODE_H__ -# define __MPI_NODE_H__ - -# include -namespace mpi = boost::mpi; - -class MpiNode -{ - public: - - static void init( int argc, char** argv ) - { - static mpi::environment env( argc, argv ); - } - - static mpi::communicator& comm() - { - return _comm; - } - - protected: - static mpi::communicator _comm; -}; - -# endif // __MPI_NODE_H__ diff --git a/eo/src/mpi/assignmentAlgorithm.h b/eo/src/mpi/assignmentAlgorithm.h deleted file mode 100644 index a27e8289a..000000000 --- a/eo/src/mpi/assignmentAlgorithm.h +++ /dev/null @@ -1,207 +0,0 @@ -# ifndef __ASSIGNMENT_ALGORITHM_H__ -# define __ASSIGNMENT_ALGORITHM_H__ - -# include -# include "MpiNode.h" - -namespace eo -{ - const int REST_OF_THE_WORLD = -1; -} - -struct AssignmentAlgorithm -{ - virtual int get( ) = 0; - virtual int availableWorkers( ) = 0; - virtual void confirm( int wrkRank ) = 0; - virtual std::vector idles( ) = 0; -}; - -struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm -{ - public: - - DynamicAssignmentAlgorithm( ) - { - for(int i = 1; i < MpiNode::comm().size(); ++i) - { - availableWrk.push_back( i ); - } - } - - DynamicAssignmentAlgorithm( int unique ) - { - availableWrk.push_back( unique ); - } - - DynamicAssignmentAlgorithm( const std::vector & workers ) - { - availableWrk = workers; - } - - DynamicAssignmentAlgorithm( int first, int last ) - { - if( last == eo::REST_OF_THE_WORLD ) - { - last = MpiNode::comm().size() - 1; - } - - for( int i = first; i <= last; ++i) - { - availableWrk.push_back( i ); - } - } - - virtual int get( ) - { - int assignee = -1; - if (! availableWrk.empty() ) - { - assignee = availableWrk.back(); - availableWrk.pop_back(); - } - return assignee; - } - - int availableWorkers() - { - return availableWrk.size(); - } - - void confirm( int rank ) - { - availableWrk.push_back( rank ); - } - - std::vector idles( ) - { - return availableWrk; - } - - protected: - std::vector< int > availableWrk; -}; - -struct StaticAssignmentAlgorithm : public AssignmentAlgorithm -{ - public: - StaticAssignmentAlgorithm( std::vector& workers, int runs ) - { - init( workers, runs ); - } - - StaticAssignmentAlgorithm( int first, int last, int runs ) - { - std::vector workers; - - if( last == eo::REST_OF_THE_WORLD ) - { - last = MpiNode::comm().size() - 1; - } - - for(int i = first; i <= last; ++i) - { - workers.push_back( i ); - } - init( workers, runs ); - } - - StaticAssignmentAlgorithm( int runs ) - { - std::vector workers; - for(int i = 1; i < MpiNode::comm().size(); ++i) - { - workers.push_back( i ); - } - init( workers, runs ); - } - - StaticAssignmentAlgorithm( int unique, int runs ) - { - std::vector workers; - workers.push_back( unique ); - init( workers, runs ); - } - -private: - void init( std::vector & workers, int runs ) - { - unsigned int nbWorkers = workers.size(); - freeWorkers = nbWorkers; - attributions.reserve( nbWorkers ); - busy.resize( nbWorkers, false ); - - // Let be the euclidean division of runs by nbWorkers : - // runs == q * nbWorkers + r, 0 <= r < nbWorkers - // This one liner affects q requests to each worker - for (unsigned int i = 0; i < nbWorkers; attributions[i++] = runs / nbWorkers) ; - // The first line computes r and the one liner affects the remaining - // r requests to workers, in ascending order - unsigned int diff = runs - (runs / nbWorkers) * nbWorkers; - for (unsigned int i = 0; i < diff; ++attributions[i++]); - - realRank = workers; - } - -public: - int get( ) - { - int assignee = -1; - for( unsigned i = 0; i < busy.size(); ++i ) - { - if( !busy[i] && attributions[i] > 0 ) - { - busy[i] = true; - --freeWorkers; - assignee = realRank[ i ]; - break; - } - } - return assignee; - } - - int availableWorkers( ) - { - return freeWorkers; - } - - std::vector idles() - { - std::vector ret; - for(unsigned int i = 0; i < busy.size(); ++i) - { - if( !busy[i] ) - { - eo::log << "Idle : " << realRank[i] << - " / attributions : " << attributions[i] << std::endl; - ret.push_back( realRank[i] ); - } - } - return ret; - } - - void confirm( int rank ) - { - int i = -1; - for( int j = 0; j < realRank.size(); ++j ) - { - if( realRank[j] == rank ) - { - i = j; - break; - } - } - - --attributions[ i ]; - busy[ i ] = false; - ++freeWorkers; - } - - private: - std::vector attributions; - std::vector realRank; - std::vector busy; - unsigned int freeWorkers; -}; - -# endif // __ASSIGNMENT_ALGORITHM_H__ diff --git a/eo/src/mpi/eoMpi.cpp b/eo/src/mpi/eoMpi.cpp new file mode 100644 index 000000000..f6e3b3c52 --- /dev/null +++ b/eo/src/mpi/eoMpi.cpp @@ -0,0 +1,11 @@ +# include "eoMpi.h" + +// MpiNode* MpiNodeStore::singleton; +namespace eo +{ + namespace mpi + { + bmpi::communicator Node::_comm; + } +} + diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h new file mode 100644 index 000000000..86ad55ab3 --- /dev/null +++ b/eo/src/mpi/eoMpi.h @@ -0,0 +1,132 @@ +# ifndef __EO_MPI_H__ +# define __EO_MPI_H__ + +# include +# include +# include + +# include "eoMpiNode.h" +# include "eoMpiAssignmentAlgorithm.h" +// TODO TODOB comment! + +namespace eo +{ + namespace mpi + { + namespace Channel + { + const int Commands = 0; + } + + namespace Message + { + const int Continue = 0; + const int Finish = 1; + } + + class Job + { + public: + + Job( AssignmentAlgorithm& _algo, int _masterRank ) : + assignmentAlgo( _algo ), + comm( Node::comm() ), + masterRank( _masterRank ) + { + _isMaster = Node::comm().rank() == _masterRank; + } + + // master + virtual bool isFinished() = 0; + virtual void sendTask( int wrkRank ) = 0; + virtual void handleResponse( int wrkRank ) = 0; + // worker + virtual void processTask( ) = 0; + + void master( ) + { + int totalWorkers = assignmentAlgo.availableWorkers(); + eo::log << eo::debug; + eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl; + + while( ! isFinished() ) + { + int assignee = assignmentAlgo.get( ); + while( assignee <= 0 ) + { + eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl; + bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); + int wrkRank = status.source(); + eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl; + handleResponse( wrkRank ); + assignmentAlgo.confirm( wrkRank ); + assignee = assignmentAlgo.get( ); + } + eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl; + comm.send( assignee, Channel::Commands, Message::Continue ); + sendTask( assignee ); + } + + eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl; + // frees all the idle workers + std::vector idles = assignmentAlgo.idles(); + for(unsigned int i = 0; i < idles.size(); ++i) + { + comm.send( idles[i], Channel::Commands, Message::Finish ); + } + + eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl; + // wait for all responses + while( assignmentAlgo.availableWorkers() != totalWorkers ) + { + bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); + int wrkRank = status.source(); + handleResponse( wrkRank ); + comm.send( wrkRank, Channel::Commands, Message::Finish ); + assignmentAlgo.confirm( wrkRank ); + } + + eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; + } + + void worker( ) + { + int order; + eo::log << eo::debug; + while( true ) + { + eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl; + comm.recv( masterRank, Channel::Commands, order ); + if ( order == Message::Finish ) + { + eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl; + return; + } else + { + eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl; + processTask( ); + } + } + } + + void run( ) + { + ( _isMaster ) ? master( ) : worker( ); + } + + bool isMaster( ) + { + return _isMaster; + } + + protected: + AssignmentAlgorithm& assignmentAlgo; + bmpi::communicator& comm; + int masterRank; + bool _isMaster; + }; + } +} + +# endif // __EO_MPI_H__ + diff --git a/eo/src/mpi/eoMpiAssignmentAlgorithm.h b/eo/src/mpi/eoMpiAssignmentAlgorithm.h new file mode 100644 index 000000000..aa162d951 --- /dev/null +++ b/eo/src/mpi/eoMpiAssignmentAlgorithm.h @@ -0,0 +1,207 @@ +# ifndef __MPI_ASSIGNMENT_ALGORITHM_H__ +# define __MPI_ASSIGNMENT_ALGORITHM_H__ + +# include +# include "eoMpiNode.h" + +namespace eo +{ + namespace mpi + { + const int REST_OF_THE_WORLD = -1; + + struct AssignmentAlgorithm + { + virtual int get( ) = 0; + virtual int availableWorkers( ) = 0; + virtual void confirm( int wrkRank ) = 0; + virtual std::vector idles( ) = 0; + }; + + struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm + { + public: + + DynamicAssignmentAlgorithm( ) + { + for(int i = 1; i < Node::comm().size(); ++i) + { + availableWrk.push_back( i ); + } + } + + DynamicAssignmentAlgorithm( int unique ) + { + availableWrk.push_back( unique ); + } + + DynamicAssignmentAlgorithm( const std::vector & workers ) + { + availableWrk = workers; + } + + DynamicAssignmentAlgorithm( int first, int last ) + { + if( last == REST_OF_THE_WORLD ) + { + last = Node::comm().size() - 1; + } + + for( int i = first; i <= last; ++i) + { + availableWrk.push_back( i ); + } + } + + virtual int get( ) + { + int assignee = -1; + if (! availableWrk.empty() ) + { + assignee = availableWrk.back(); + availableWrk.pop_back(); + } + return assignee; + } + + int availableWorkers() + { + return availableWrk.size(); + } + + void confirm( int rank ) + { + availableWrk.push_back( rank ); + } + + std::vector idles( ) + { + return availableWrk; + } + + protected: + std::vector< int > availableWrk; + }; + + struct StaticAssignmentAlgorithm : public AssignmentAlgorithm + { + public: + StaticAssignmentAlgorithm( std::vector& workers, int runs ) + { + init( workers, runs ); + } + + StaticAssignmentAlgorithm( int first, int last, int runs ) + { + std::vector workers; + + if( last == REST_OF_THE_WORLD ) + { + last = Node::comm().size() - 1; + } + + for(int i = first; i <= last; ++i) + { + workers.push_back( i ); + } + init( workers, runs ); + } + + StaticAssignmentAlgorithm( int runs ) + { + std::vector workers; + for(int i = 1; i < Node::comm().size(); ++i) + { + workers.push_back( i ); + } + init( workers, runs ); + } + + StaticAssignmentAlgorithm( int unique, int runs ) + { + std::vector workers; + workers.push_back( unique ); + init( workers, runs ); + } + + private: + void init( std::vector & workers, int runs ) + { + unsigned int nbWorkers = workers.size(); + freeWorkers = nbWorkers; + attributions.reserve( nbWorkers ); + busy.resize( nbWorkers, false ); + + // Let be the euclidean division of runs by nbWorkers : + // runs == q * nbWorkers + r, 0 <= r < nbWorkers + // This one liner affects q requests to each worker + for (unsigned int i = 0; i < nbWorkers; attributions[i++] = runs / nbWorkers) ; + // The first line computes r and the one liner affects the remaining + // r requests to workers, in ascending order + unsigned int diff = runs - (runs / nbWorkers) * nbWorkers; + for (unsigned int i = 0; i < diff; ++attributions[i++]); + + realRank = workers; + } + + public: + int get( ) + { + int assignee = -1; + for( unsigned i = 0; i < busy.size(); ++i ) + { + if( !busy[i] && attributions[i] > 0 ) + { + busy[i] = true; + --freeWorkers; + assignee = realRank[ i ]; + break; + } + } + return assignee; + } + + int availableWorkers( ) + { + return freeWorkers; + } + + std::vector idles() + { + std::vector ret; + for(unsigned int i = 0; i < busy.size(); ++i) + { + if( !busy[i] ) + { + ret.push_back( realRank[i] ); + } + } + return ret; + } + + void confirm( int rank ) + { + int i = -1; + for( unsigned int j = 0; j < realRank.size(); ++j ) + { + if( realRank[j] == rank ) + { + i = j; + break; + } + } + + --attributions[ i ]; + busy[ i ] = false; + ++freeWorkers; + } + + private: + std::vector attributions; + std::vector realRank; + std::vector busy; + unsigned int freeWorkers; + }; + } +} +# endif // __MPI_ASSIGNMENT_ALGORITHM_H__ diff --git a/eo/src/mpi/eoMpiNode.h b/eo/src/mpi/eoMpiNode.h new file mode 100644 index 000000000..9f1ea7b53 --- /dev/null +++ b/eo/src/mpi/eoMpiNode.h @@ -0,0 +1,31 @@ +# ifndef __MPI_NODE_H__ +# define __MPI_NODE_H__ + +# include +namespace bmpi = boost::mpi; + +namespace eo +{ + namespace mpi + { + class Node + { + public: + + static void init( int argc, char** argv ) + { + static bmpi::environment env( argc, argv ); + } + + static bmpi::communicator& comm() + { + return _comm; + } + + protected: + static bmpi::communicator _comm; + }; + } +} +# endif // __MPI_NODE_H__ + diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 064396daa..1e9d4c1de 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -1,102 +1,107 @@ # ifndef __EO_PARALLEL_APPLY_H__ # define __EO_PARALLEL_APPLY_H__ -# include "eompi.h" +# include "eoMpi.h" # include # include -template< typename EOT > -class ParallelApply : public MpiJob +namespace eo { - private: - struct ParallelApplyAssignment + namespace mpi + { + template< typename EOT > + class ParallelApply : public Job { - int index; - int size; + private: + struct ParallelApplyAssignment + { + int index; + int size; + }; + public: + + ParallelApply( + eoUF & _proc, + std::vector& _pop, + AssignmentAlgorithm & algo, + int _masterRank, + int _packetSize = 1 + ) : + Job( algo, _masterRank ), + func( _proc ), + index( 0 ), + size( _pop.size() ), + data( _pop ), + packetSize( _packetSize ) + { + if ( _packetSize <= 0 ) + { + throw std::runtime_error("Packet size should not be negative."); + } + tempArray = new EOT[ packetSize ]; + } + + ~ParallelApply() + { + delete [] tempArray; + } + + virtual void sendTask( int wrkRank ) + { + int futureIndex; + + if( index + packetSize < size ) + { + futureIndex = index + packetSize; + } else { + futureIndex = size; + } + + int sentSize = futureIndex - index ; + comm.send( wrkRank, 1, sentSize ); + + assignedTasks[ wrkRank ].index = index; + assignedTasks[ wrkRank ].size = sentSize; + + comm.send( wrkRank, 1, &data[ index ] , sentSize ); + index = futureIndex; + } + + virtual void handleResponse( int wrkRank ) + { + comm.recv( wrkRank, 1, &data[ assignedTasks[wrkRank].index ], assignedTasks[wrkRank].size ); + } + + virtual void processTask( ) + { + int recvSize; + comm.recv( masterRank, 1, recvSize ); + comm.recv( masterRank, 1, tempArray, recvSize ); + for( int i = 0; i < recvSize ; ++i ) + { + func( tempArray[ i ] ); + } + comm.send( masterRank, 1, tempArray, recvSize ); + } + + bool isFinished() + { + return index == size; + } + + protected: + std::vector & data; + eoUF& func; + int index; + int size; + std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks; + + int packetSize; + EOT* tempArray; }; - public: - - ParallelApply( - eoUF & _proc, - std::vector& _pop, - AssignmentAlgorithm & algo, - int _masterRank, - int _packetSize = 1 - ) : - MpiJob( algo, _masterRank ), - func( _proc ), - index( 0 ), - size( _pop.size() ), - data( _pop ), - packetSize( _packetSize ) - { - if ( _packetSize <= 0 ) - { - throw std::runtime_error("Packet size should not be negative."); - } - tempArray = new EOT[ packetSize ]; - } - - ~ParallelApply() - { - delete [] tempArray; - } - - virtual void sendTask( int wrkRank ) - { - int futureIndex; - - if( index + packetSize < size ) - { - futureIndex = index + packetSize; - } else { - futureIndex = size; - } - - int sentSize = futureIndex - index ; - comm.send( wrkRank, 1, sentSize ); - - assignedTasks[ wrkRank ].index = index; - assignedTasks[ wrkRank ].size = sentSize; - - comm.send( wrkRank, 1, &data[ index ] , sentSize ); - index = futureIndex; - } - - virtual void handleResponse( int wrkRank ) - { - comm.recv( wrkRank, 1, &data[ assignedTasks[wrkRank].index ], assignedTasks[wrkRank].size ); - } - - virtual void processTask( ) - { - int recvSize; - comm.recv( masterRank, 1, recvSize ); - comm.recv( masterRank, 1, tempArray, recvSize ); - for( int i = 0; i < recvSize ; ++i ) - { - func( tempArray[ i ] ); - } - comm.send( masterRank, 1, tempArray, recvSize ); - } - - bool isFinished() - { - return index == size; - } - - protected: - std::vector & data; - eoUF& func; - int index; - int size; - std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks; - - int packetSize; - EOT* tempArray; -}; - + } +} # endif // __EO_PARALLEL_APPLY_H__ diff --git a/eo/src/mpi/eompi.cpp b/eo/src/mpi/eompi.cpp deleted file mode 100644 index 6429a31c5..000000000 --- a/eo/src/mpi/eompi.cpp +++ /dev/null @@ -1,5 +0,0 @@ -# include "eompi.h" - -// MpiNode* MpiNodeStore::singleton; -mpi::communicator MpiNode::_comm; - diff --git a/eo/src/mpi/eompi.h b/eo/src/mpi/eompi.h deleted file mode 100644 index 3b51e7a3a..000000000 --- a/eo/src/mpi/eompi.h +++ /dev/null @@ -1,127 +0,0 @@ -# ifndef __EO_MPI_H__ -# define __EO_MPI_H__ - -# include -# include -# include - -# include "MpiNode.h" -# include "assignmentAlgorithm.h" -// TODO TODOB comment! - -namespace EoMpi -{ - namespace Channel - { - const int Commands = 0; - } - - namespace Message - { - const int Continue = 0; - const int Finish = 1; - } -} -class MpiJob -{ - public: - - MpiJob( AssignmentAlgorithm& _algo, int _masterRank ) : - assignmentAlgo( _algo ), - comm( MpiNode::comm() ), - masterRank( _masterRank ) - { - _isMaster = MpiNode::comm().rank() == _masterRank; - } - - // master - virtual bool isFinished() = 0; - virtual void sendTask( int wrkRank ) = 0; - virtual void handleResponse( int wrkRank ) = 0; - // worker - virtual void processTask( ) = 0; - - void master( ) - { - int totalWorkers = assignmentAlgo.availableWorkers(); - eo::log << eo::debug; - eo::log << "[M] Have " << totalWorkers << " workers." << std::endl; - - while( ! isFinished() ) - { - int assignee = assignmentAlgo.get( ); - while( assignee <= 0 ) - { - eo::log << "[M] Waitin' for node..." << std::endl; - mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); - int wrkRank = status.source(); - eo::log << "[M] Node " << wrkRank << " just terminated." << std::endl; - handleResponse( wrkRank ); - assignmentAlgo.confirm( wrkRank ); - assignee = assignmentAlgo.get( ); - } - eo::log << "[M] Assignee : " << assignee << std::endl; - comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue ); - sendTask( assignee ); - } - - eo::log << "[M] Frees all the idle." << std::endl; - // frees all the idle workers - std::vector idles = assignmentAlgo.idles(); - for(unsigned int i = 0; i < idles.size(); ++i) - { - comm.send( idles[i], EoMpi::Channel::Commands, EoMpi::Message::Finish ); - } - - eo::log << "[M] Waits for all responses." << std::endl; - // wait for all responses - while( assignmentAlgo.availableWorkers() != totalWorkers ) - { - mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); - int wrkRank = status.source(); - handleResponse( wrkRank ); - comm.send( wrkRank, EoMpi::Channel::Commands, EoMpi::Message::Finish ); - assignmentAlgo.confirm( wrkRank ); - } - - eo::log << "[M] Leaving master task." << std::endl; - } - - void worker( ) - { - int order; - eo::log << eo::debug; - while( true ) - { - eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl; - comm.recv( masterRank, EoMpi::Channel::Commands, order ); - if ( order == EoMpi::Message::Finish ) - { - eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl; - return; - } else - { - eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl; - processTask( ); - } - } - } - - void run( ) - { - ( _isMaster ) ? master( ) : worker( ); - } - - bool isMaster( ) - { - return _isMaster; - } - -protected: - AssignmentAlgorithm& assignmentAlgo; - mpi::communicator& comm; - int masterRank; - bool _isMaster; -}; -# endif // __EO_MPI_H__ - diff --git a/eo/test/mpi/multipleRoles.cpp b/eo/test/mpi/multipleRoles.cpp index ab8020732..5b829c890 100644 --- a/eo/test/mpi/multipleRoles.cpp +++ b/eo/test/mpi/multipleRoles.cpp @@ -1,4 +1,4 @@ -# include +# include # include # include @@ -8,6 +8,8 @@ # include using namespace std; +using namespace eo::mpi; + // Role map // 0 : general master // 1, 2 : worker of general job, master of subjob @@ -38,7 +40,7 @@ struct Work: public eoUF< vector&, void > void operator() ( vector& v ) { cout << "Work phase..." << endl; - subtask( v, MpiNode::comm().rank() ); + subtask( v, Node::comm().rank() ); for( int i = 0; i < v.size(); ++i ) { v[i] *= 2; @@ -49,7 +51,7 @@ struct Work: public eoUF< vector&, void > int main(int argc, char** argv) { // eo::log << eo::setlevel( eo::debug ); - MpiNode::init( argc, argv ); + Node::init( argc, argv ); vector v; v.push_back(1); @@ -62,7 +64,7 @@ int main(int argc, char** argv) metaV.push_back( v ); metaV.push_back( v ); - switch( MpiNode::comm().rank() ) + switch( Node::comm().rank() ) { case 0: case 1: @@ -88,7 +90,7 @@ int main(int argc, char** argv) default: { // all the other nodes are sub workers - int rank = MpiNode::comm().rank(); + int rank = Node::comm().rank(); if ( rank == 3 or rank == 5 ) { subtask( v, 1 ); diff --git a/eo/test/mpi/parallelApply.cpp b/eo/test/mpi/parallelApply.cpp index 46130b80a..82f4e0e70 100644 --- a/eo/test/mpi/parallelApply.cpp +++ b/eo/test/mpi/parallelApply.cpp @@ -1,4 +1,4 @@ -# include +# include # include # include @@ -6,6 +6,8 @@ # include using namespace std; +using namespace eo::mpi; + struct plusOne : public eoUF< int&, void > { void operator() ( int & x ) @@ -27,7 +29,7 @@ int main(int argc, char** argv) // eo::log << eo::setlevel( eo::debug ); bool launchOnlyOne = false; // Set this to true if you wanna launch only the first test. - MpiNode::init( argc, argv ); + Node::init( argc, argv ); srand( time(0) ); vector v; @@ -43,10 +45,10 @@ int main(int argc, char** argv) vector< Test > tests; - const int ALL = MpiNode::comm().size(); + const int ALL = Node::comm().size(); Test tIntervalStatic; - tIntervalStatic.assign = new StaticAssignmentAlgorithm( 1, eo::REST_OF_THE_WORLD, v.size() ); + tIntervalStatic.assign = new StaticAssignmentAlgorithm( 1, REST_OF_THE_WORLD, v.size() ); tIntervalStatic.description = "Correct static assignment with interval."; tIntervalStatic.requiredNodesNumber = ALL; tests.push_back( tIntervalStatic ); @@ -81,7 +83,7 @@ int main(int argc, char** argv) tests.push_back( tVectorStatic ); Test tIntervalDynamic; - tIntervalDynamic.assign = new DynamicAssignmentAlgorithm( 1, eo::REST_OF_THE_WORLD ); + tIntervalDynamic.assign = new DynamicAssignmentAlgorithm( 1, REST_OF_THE_WORLD ); tIntervalDynamic.description = "Dynamic assignment with interval."; tIntervalDynamic.requiredNodesNumber = ALL; tests.push_back( tIntervalDynamic ); @@ -114,7 +116,7 @@ int main(int argc, char** argv) cout << "Test : " << tests[i].description << endl; } - if( MpiNode::comm().rank() < tests[i].requiredNodesNumber ) + if( Node::comm().rank() < tests[i].requiredNodesNumber ) { job.run(); } @@ -134,7 +136,7 @@ int main(int argc, char** argv) cout << endl; } - MpiNode::comm().barrier(); + Node::comm().barrier(); delete tests[i].assign; }