diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 86e2b04a..cf964521 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -6,41 +6,19 @@ # include # include -template< typename EOT > -struct ParallelApplyContinuator : public BaseContinuator -{ - ParallelApplyContinuator( int index, int size ) - { - _index = index; - _size = size; - } - - void index( int i ) { _index = i; } - - bool operator()() - { - return _index < _size; - } - -private: - int _index; - int _size; -}; - template< typename EOT > class ParallelApply : public MpiJob { public: ParallelApply( eoUF & _proc, std::vector& _pop, AssignmentAlgorithm & algo, int _masterRank ) : - MpiJob( algo, - new ParallelApplyContinuator( 0, _pop.size() ), - _masterRank ), + MpiJob( algo, _masterRank ), func( _proc ), index( 0 ), + size( _pop.size() ), data( _pop ) { - pa_continuator = static_cast*>( _continuator ); + // empty } virtual void sendTask( int wrkRank ) @@ -48,7 +26,6 @@ class ParallelApply : public MpiJob assignedTasks[ wrkRank ] = index; comm.send( wrkRank, 1, data[ index ] ); ++index; - pa_continuator->index( index ); } virtual void handleResponse( int wrkRank ) @@ -64,11 +41,16 @@ class ParallelApply : public MpiJob comm.send( _masterRank, 1, ind ); } + bool isFinished() + { + return index = size; + } + protected: vector & data; eoUF& func; int index; - ParallelApplyContinuator * pa_continuator; + int size; std::map< int /* worker rank */, int /* index in vector */> assignedTasks; }; diff --git a/eo/src/mpi/eompi.h b/eo/src/mpi/eompi.h index 8bc25d31..f043c162 100644 --- a/eo/src/mpi/eompi.h +++ b/eo/src/mpi/eompi.h @@ -45,31 +45,20 @@ class MpiNode static mpi::communicator _comm; }; -struct BaseContinuator -{ - virtual bool operator()() = 0; -}; - -// template< typename EOT > class MpiJob { public: - MpiJob( AssignmentAlgorithm& algo, BaseContinuator* c, int masterRank ) : + MpiJob( AssignmentAlgorithm& algo, int masterRank ) : assignmentAlgo( algo ), comm( MpiNode::comm() ), - _masterRank( masterRank ), - _continuator( c ) + _masterRank( masterRank ) { // empty } - ~MpiJob() - { - delete _continuator; - } - // master + virtual bool isFinished() = 0; virtual void sendTask( int wrkRank ) = 0; virtual void handleResponse( int wrkRank ) = 0; // worker @@ -80,7 +69,7 @@ class MpiJob int totalWorkers = assignmentAlgo.size(); cout << "[M] Have " << totalWorkers << " workers." << endl; - while( (*_continuator)() ) + while( ! isFinished() ) { int assignee = assignmentAlgo.get( ); cout << "[M] Assignee : " << assignee << endl; @@ -98,31 +87,6 @@ class MpiJob sendTask( assignee ); } - /* - for( int i = 0, size = data.size(); - i < size; - ++i) - { - cout << "[M] Beginning loop for i = " << i << endl; - int assignee = assignmentAlgo.get( ); - cout << "[M] Assignee : " << assignee << endl; - while( assignee <= 0 ) - { - cout << "[M] Waitin' for node..." << endl; - mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); - int wrkRank = status.source(); - cout << "[M] Node " << wrkRank << " just terminated." << endl; - handleResponse( wrkRank, assignedTasks[ wrkRank ] ); - assignmentAlgo.confirm( wrkRank ); - assignee = assignmentAlgo.get( ); - } - cout << "[M] Assignee found : " << assignee << endl; - assignedTasks[ assignee ] = i; - comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue ); - sendTask( assignee, i ); - } - */ - cout << "[M] Frees all the idle." << endl; // frees all the idle workers int idle; @@ -176,7 +140,6 @@ class MpiJob protected: AssignmentAlgorithm& assignmentAlgo; - BaseContinuator* _continuator; mpi::communicator& comm; int _masterRank; };