From 2aa312e43d168f6d25cde3fe3ea1a4135d4ff55a Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 21 Jun 2012 17:21:13 +0200 Subject: [PATCH] Allowing more than one master. --- eo/src/mpi/assignmentAlgorithm.h | 2 +- eo/src/mpi/eoParallelApply.h | 15 ++++++----- eo/src/mpi/eompi.h | 46 +++++++++++++++++++++----------- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/eo/src/mpi/assignmentAlgorithm.h b/eo/src/mpi/assignmentAlgorithm.h index bd635c23..89b97c06 100644 --- a/eo/src/mpi/assignmentAlgorithm.h +++ b/eo/src/mpi/assignmentAlgorithm.h @@ -13,7 +13,7 @@ struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm public: DynamicAssignmentAlgorithm( int offset, int size ) { - for( int i = 0; offset + i < size; ++i) + for( int i = 0; offset + i <= size; ++i) { availableWrk.push_back( offset + i ); } diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 80958e41..990f8fad 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -10,9 +10,12 @@ template< typename EOT > class ParallelApply : public MpiJob< EOT > { public: + using MpiJob::comm; + using MpiJob::data; + using MpiJob::_masterRank; - ParallelApply( eoUF & _proc, std::vector& _pop, AssignmentAlgorithm & algo ) : - MpiJob( _pop, algo ), + ParallelApply( eoUF & _proc, std::vector& _pop, AssignmentAlgorithm & algo, int _masterRank ) : + MpiJob( _pop, algo, _masterRank ), func( _proc ) { // empty @@ -20,20 +23,20 @@ class ParallelApply : public MpiJob< EOT > virtual void sendTask( int wrkRank, int index ) { - MpiJob::comm.send( wrkRank, 1, MpiJob::data[ index ] ); + comm.send( wrkRank, 1, data[ index ] ); } virtual void handleResponse( int wrkRank, int index ) { - MpiJob::comm.recv( wrkRank, 1, MpiJob::data[ index ] ); + comm.recv( wrkRank, 1, data[ index ] ); } virtual void processTask( ) { EOT ind; - MpiJob::comm.recv( 0, 1, ind ); + comm.recv( _masterRank, 1, ind ); func( ind ); - MpiJob::comm.send( 0, 1, ind ); + comm.send( _masterRank, 1, ind ); } protected: diff --git a/eo/src/mpi/eompi.h b/eo/src/mpi/eompi.h index 3b04441d..df041db5 100644 --- a/eo/src/mpi/eompi.h +++ b/eo/src/mpi/eompi.h @@ -50,10 +50,11 @@ class MpiJob { public: - MpiJob( std::vector< EOT > & _data, AssignmentAlgorithm& algo ) : + MpiJob( std::vector< EOT > & _data, AssignmentAlgorithm& algo, int masterRank ) : data( _data ), comm( MpiNode::comm() ), - assignmentAlgo( algo ) + assignmentAlgo( algo ), + _masterRank( masterRank ) { // empty } @@ -66,29 +67,33 @@ class MpiJob void master( ) { + int totalWorkers = assignmentAlgo.size(); + cout << "[M] Have " << totalWorkers << " workers." << endl; + for( int i = 0, size = data.size(); i < size; ++i) { - cout << "Beginning loop for i = " << i << endl; + cout << "[M] Beginning loop for i = " << i << endl; int assignee = assignmentAlgo.get( ); - cout << "Assignee : " << assignee << endl; + cout << "[M] Assignee : " << assignee << endl; while( assignee <= 0 ) { - cout << "Waitin' for node..." << endl; + cout << "[M] Waitin' for node..." << endl; mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); int wrkRank = status.source(); - cout << "Node " << wrkRank << " just terminated." << endl; + cout << "[M] Node " << wrkRank << " just terminated." << endl; handleResponse( wrkRank, assignedTasks[ wrkRank ] ); assignmentAlgo.confirm( wrkRank ); assignee = assignmentAlgo.get( ); } - cout << "Assignee found : " << assignee << endl; + cout << "[M] Assignee found : " << assignee << endl; assignedTasks[ assignee ] = i; comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue ); sendTask( assignee, i ); } + cout << "[M] Frees all the idle." << endl; // frees all the idle workers int idle; vector idles; @@ -102,9 +107,9 @@ class MpiJob assignmentAlgo.confirm( idles[i] ); } + cout << "[M] Waits for all responses." << endl; // wait for all responses - int wrkNb = comm.size() - 1; - while( assignmentAlgo.size() != wrkNb ) + while( assignmentAlgo.size() != totalWorkers ) { mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); int wrkRank = status.source(); @@ -112,6 +117,8 @@ class MpiJob comm.send( wrkRank, EoMpi::Channel::Commands, EoMpi::Message::Finish ); assignmentAlgo.confirm( wrkRank ); } + + cout << "[M] Leaving master task." << endl; } void worker( ) @@ -119,34 +126,41 @@ class MpiJob int order; while( true ) { - comm.recv( 0, EoMpi::Channel::Commands, order ); + cout << "[W] Waiting for an order..." << std::endl; + comm.recv( _masterRank, EoMpi::Channel::Commands, order ); if ( order == EoMpi::Message::Finish ) { return; } else { + cout << "[W] Processing task..." << std::endl; processTask( ); } } } + int masterRank() + { + return _masterRank; + } + protected: std::vector & data; std::map< int /* worker rank */, int /* index in vector */> assignedTasks; AssignmentAlgorithm& assignmentAlgo; mpi::communicator& comm; + int _masterRank; }; template< class EOT > class Role { public: - Role( MpiJob & job, bool master ) : - _job( job ), - _master( master ) + Role( MpiJob & job ) : + _job( job ) { - // empty + _master = job.masterRank() == MpiNode::comm().rank(); } bool master() @@ -156,7 +170,7 @@ class Role virtual void run( ) { - if( _master ) + if( MpiNode::comm().rank() == _job.masterRank() ) { _job.master( ); } else @@ -171,7 +185,7 @@ class Role } protected: - bool _master; MpiJob & _job; + bool _master; }; # endif // __EO_MPI_H__