diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 990f8fad..86e2b04a 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -7,28 +7,53 @@ # include template< typename EOT > -class ParallelApply : public MpiJob< EOT > +struct ParallelApplyContinuator : public BaseContinuator +{ + ParallelApplyContinuator( int index, int size ) + { + _index = index; + _size = size; + } + + void index( int i ) { _index = i; } + + bool operator()() + { + return _index < _size; + } + +private: + int _index; + int _size; +}; + +template< typename EOT > +class ParallelApply : public MpiJob { public: - using MpiJob::comm; - using MpiJob::data; - using MpiJob::_masterRank; ParallelApply( eoUF & _proc, std::vector& _pop, AssignmentAlgorithm & algo, int _masterRank ) : - MpiJob( _pop, algo, _masterRank ), - func( _proc ) + MpiJob( algo, + new ParallelApplyContinuator( 0, _pop.size() ), + _masterRank ), + func( _proc ), + index( 0 ), + data( _pop ) { - // empty + pa_continuator = static_cast*>( _continuator ); } - virtual void sendTask( int wrkRank, int index ) + virtual void sendTask( int wrkRank ) { + assignedTasks[ wrkRank ] = index; comm.send( wrkRank, 1, data[ index ] ); + ++index; + pa_continuator->index( index ); } - virtual void handleResponse( int wrkRank, int index ) + virtual void handleResponse( int wrkRank ) { - comm.recv( wrkRank, 1, data[ index ] ); + comm.recv( wrkRank, 1, data[ assignedTasks[ wrkRank ] ] ); } virtual void processTask( ) @@ -40,7 +65,11 @@ class ParallelApply : public MpiJob< EOT > } protected: + vector & data; eoUF& func; + int index; + ParallelApplyContinuator * pa_continuator; + std::map< int /* worker rank */, int /* index in vector */> assignedTasks; }; # endif // __EO_PARALLEL_APPLY_H__ diff --git a/eo/src/mpi/eompi.h b/eo/src/mpi/eompi.h index cbb66a46..8bc25d31 100644 --- a/eo/src/mpi/eompi.h +++ b/eo/src/mpi/eompi.h @@ -45,23 +45,33 @@ class MpiNode static mpi::communicator _comm; }; -template< typename EOT > +struct BaseContinuator +{ + virtual bool operator()() = 0; +}; + +// template< typename EOT > class MpiJob { public: - MpiJob( std::vector< EOT > & _data, AssignmentAlgorithm& algo, int masterRank ) : - data( _data ), - comm( MpiNode::comm() ), + MpiJob( AssignmentAlgorithm& algo, BaseContinuator* c, int masterRank ) : assignmentAlgo( algo ), - _masterRank( masterRank ) + comm( MpiNode::comm() ), + _masterRank( masterRank ), + _continuator( c ) { // empty } + ~MpiJob() + { + delete _continuator; + } + // master - virtual void sendTask( int wrkRank, int index ) = 0; - virtual void handleResponse( int wrkRank, int index ) = 0; + virtual void sendTask( int wrkRank ) = 0; + virtual void handleResponse( int wrkRank ) = 0; // worker virtual void processTask( ) = 0; @@ -70,6 +80,25 @@ class MpiJob int totalWorkers = assignmentAlgo.size(); cout << "[M] Have " << totalWorkers << " workers." << endl; + while( (*_continuator)() ) + { + int assignee = assignmentAlgo.get( ); + cout << "[M] Assignee : " << assignee << endl; + while( assignee <= 0 ) + { + cout << "[M] Waitin' for node..." << endl; + mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); + int wrkRank = status.source(); + cout << "[M] Node " << wrkRank << " just terminated." << endl; + handleResponse( wrkRank ); + assignmentAlgo.confirm( wrkRank ); + assignee = assignmentAlgo.get( ); + } + comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue ); + sendTask( assignee ); + } + + /* for( int i = 0, size = data.size(); i < size; ++i) @@ -92,6 +121,7 @@ class MpiJob comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue ); sendTask( assignee, i ); } + */ cout << "[M] Frees all the idle." << endl; // frees all the idle workers @@ -102,7 +132,7 @@ class MpiJob comm.send( idle, EoMpi::Channel::Commands, EoMpi::Message::Finish ); idles.push_back( idle ); } - for (int i = 0; i < idles.size(); ++i) + for (unsigned int i = 0; i < idles.size(); ++i) { assignmentAlgo.confirm( idles[i] ); } @@ -113,7 +143,7 @@ class MpiJob { mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); int wrkRank = status.source(); - handleResponse( wrkRank, assignedTasks[ wrkRank ] ); + handleResponse( wrkRank ); comm.send( wrkRank, EoMpi::Channel::Commands, EoMpi::Message::Finish ); assignmentAlgo.confirm( wrkRank ); } @@ -145,19 +175,16 @@ class MpiJob } protected: - - std::vector & data; - std::map< int /* worker rank */, int /* index in vector */> assignedTasks; AssignmentAlgorithm& assignmentAlgo; + BaseContinuator* _continuator; mpi::communicator& comm; int _masterRank; }; -template< class EOT > class Role { public: - Role( MpiJob & job ) : + Role( MpiJob & job ) : _job( job ) { _master = job.masterRank() == MpiNode::comm().rank(); @@ -185,7 +212,7 @@ class Role } protected: - MpiJob & _job; + MpiJob & _job; bool _master; }; # endif // __EO_MPI_H__ diff --git a/eo/test/mpi/multipleRoles.cpp b/eo/test/mpi/multipleRoles.cpp index bd91245b..c300fdbb 100644 --- a/eo/test/mpi/multipleRoles.cpp +++ b/eo/test/mpi/multipleRoles.cpp @@ -27,7 +27,7 @@ void subtask( vector& v ) DynamicAssignmentAlgorithm algo( 2, MpiNode::comm().size()-1 ); plusOne plusOneInstance; ParallelApply job( plusOneInstance, v, algo, 1 ); - Role node( job ); + Role node( job ); node.run(); } @@ -64,7 +64,7 @@ int main(int argc, char** argv) // only one node is assigned to subjob mastering DynamicAssignmentAlgorithm algo( 1, 1 ); ParallelApply< vector > job( transmitInstance, metaV, algo, 0 ); - Role< vector > node( job ); + Role node( job ); node.run(); if( node.master() ) { diff --git a/eo/test/mpi/parallelApply.cpp b/eo/test/mpi/parallelApply.cpp index 605f3911..dd11f7b3 100644 --- a/eo/test/mpi/parallelApply.cpp +++ b/eo/test/mpi/parallelApply.cpp @@ -33,7 +33,7 @@ int main(int argc, char** argv) cout << "Création du job..." << endl; ParallelApply job( plusOneInstance, v, algo, 0 ); - Role node( job ); + Role node( job ); node.run(); if( node.master() )