MpiJob now just deal with loop logic, not with handled data. Handled data is now handled by the MpiJob subclasses. Tests updated.

This commit is contained in:
Benjamin Bouvier 2012-06-21 18:26:56 +02:00
commit 28ab2004ea
4 changed files with 84 additions and 28 deletions

View file

@ -7,28 +7,53 @@
# include <vector> # include <vector>
template< typename EOT > template< typename EOT >
class ParallelApply : public MpiJob< EOT > struct ParallelApplyContinuator : public BaseContinuator
{
ParallelApplyContinuator( int index, int size )
{
_index = index;
_size = size;
}
void index( int i ) { _index = i; }
bool operator()()
{
return _index < _size;
}
private:
int _index;
int _size;
};
template< typename EOT >
class ParallelApply : public MpiJob
{ {
public: public:
using MpiJob<EOT>::comm;
using MpiJob<EOT>::data;
using MpiJob<EOT>::_masterRank;
ParallelApply( eoUF<EOT&, void> & _proc, std::vector<EOT>& _pop, AssignmentAlgorithm & algo, int _masterRank ) : ParallelApply( eoUF<EOT&, void> & _proc, std::vector<EOT>& _pop, AssignmentAlgorithm & algo, int _masterRank ) :
MpiJob<EOT>( _pop, algo, _masterRank ), MpiJob( algo,
func( _proc ) new ParallelApplyContinuator<EOT>( 0, _pop.size() ),
_masterRank ),
func( _proc ),
index( 0 ),
data( _pop )
{ {
// empty pa_continuator = static_cast<ParallelApplyContinuator<EOT>*>( _continuator );
} }
virtual void sendTask( int wrkRank, int index ) virtual void sendTask( int wrkRank )
{ {
assignedTasks[ wrkRank ] = index;
comm.send( wrkRank, 1, data[ index ] ); comm.send( wrkRank, 1, data[ index ] );
++index;
pa_continuator->index( index );
} }
virtual void handleResponse( int wrkRank, int index ) virtual void handleResponse( int wrkRank )
{ {
comm.recv( wrkRank, 1, data[ index ] ); comm.recv( wrkRank, 1, data[ assignedTasks[ wrkRank ] ] );
} }
virtual void processTask( ) virtual void processTask( )
@ -40,7 +65,11 @@ class ParallelApply : public MpiJob< EOT >
} }
protected: protected:
vector<EOT> & data;
eoUF<EOT&, void>& func; eoUF<EOT&, void>& func;
int index;
ParallelApplyContinuator<EOT> * pa_continuator;
std::map< int /* worker rank */, int /* index in vector */> assignedTasks;
}; };
# endif // __EO_PARALLEL_APPLY_H__ # endif // __EO_PARALLEL_APPLY_H__

View file

@ -45,23 +45,33 @@ class MpiNode
static mpi::communicator _comm; static mpi::communicator _comm;
}; };
template< typename EOT > struct BaseContinuator
{
virtual bool operator()() = 0;
};
// template< typename EOT >
class MpiJob class MpiJob
{ {
public: public:
MpiJob( std::vector< EOT > & _data, AssignmentAlgorithm& algo, int masterRank ) : MpiJob( AssignmentAlgorithm& algo, BaseContinuator* c, int masterRank ) :
data( _data ),
comm( MpiNode::comm() ),
assignmentAlgo( algo ), assignmentAlgo( algo ),
_masterRank( masterRank ) comm( MpiNode::comm() ),
_masterRank( masterRank ),
_continuator( c )
{ {
// empty // empty
} }
~MpiJob()
{
delete _continuator;
}
// master // master
virtual void sendTask( int wrkRank, int index ) = 0; virtual void sendTask( int wrkRank ) = 0;
virtual void handleResponse( int wrkRank, int index ) = 0; virtual void handleResponse( int wrkRank ) = 0;
// worker // worker
virtual void processTask( ) = 0; virtual void processTask( ) = 0;
@ -70,6 +80,25 @@ class MpiJob
int totalWorkers = assignmentAlgo.size(); int totalWorkers = assignmentAlgo.size();
cout << "[M] Have " << totalWorkers << " workers." << endl; cout << "[M] Have " << totalWorkers << " workers." << endl;
while( (*_continuator)() )
{
int assignee = assignmentAlgo.get( );
cout << "[M] Assignee : " << assignee << endl;
while( assignee <= 0 )
{
cout << "[M] Waitin' for node..." << endl;
mpi::status status = comm.probe( mpi::any_source, mpi::any_tag );
int wrkRank = status.source();
cout << "[M] Node " << wrkRank << " just terminated." << endl;
handleResponse( wrkRank );
assignmentAlgo.confirm( wrkRank );
assignee = assignmentAlgo.get( );
}
comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue );
sendTask( assignee );
}
/*
for( int i = 0, size = data.size(); for( int i = 0, size = data.size();
i < size; i < size;
++i) ++i)
@ -92,6 +121,7 @@ class MpiJob
comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue ); comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue );
sendTask( assignee, i ); sendTask( assignee, i );
} }
*/
cout << "[M] Frees all the idle." << endl; cout << "[M] Frees all the idle." << endl;
// frees all the idle workers // frees all the idle workers
@ -102,7 +132,7 @@ class MpiJob
comm.send( idle, EoMpi::Channel::Commands, EoMpi::Message::Finish ); comm.send( idle, EoMpi::Channel::Commands, EoMpi::Message::Finish );
idles.push_back( idle ); idles.push_back( idle );
} }
for (int i = 0; i < idles.size(); ++i) for (unsigned int i = 0; i < idles.size(); ++i)
{ {
assignmentAlgo.confirm( idles[i] ); assignmentAlgo.confirm( idles[i] );
} }
@ -113,7 +143,7 @@ class MpiJob
{ {
mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); mpi::status status = comm.probe( mpi::any_source, mpi::any_tag );
int wrkRank = status.source(); int wrkRank = status.source();
handleResponse( wrkRank, assignedTasks[ wrkRank ] ); handleResponse( wrkRank );
comm.send( wrkRank, EoMpi::Channel::Commands, EoMpi::Message::Finish ); comm.send( wrkRank, EoMpi::Channel::Commands, EoMpi::Message::Finish );
assignmentAlgo.confirm( wrkRank ); assignmentAlgo.confirm( wrkRank );
} }
@ -145,19 +175,16 @@ class MpiJob
} }
protected: protected:
std::vector<EOT> & data;
std::map< int /* worker rank */, int /* index in vector */> assignedTasks;
AssignmentAlgorithm& assignmentAlgo; AssignmentAlgorithm& assignmentAlgo;
BaseContinuator* _continuator;
mpi::communicator& comm; mpi::communicator& comm;
int _masterRank; int _masterRank;
}; };
template< class EOT >
class Role class Role
{ {
public: public:
Role( MpiJob<EOT> & job ) : Role( MpiJob & job ) :
_job( job ) _job( job )
{ {
_master = job.masterRank() == MpiNode::comm().rank(); _master = job.masterRank() == MpiNode::comm().rank();
@ -185,7 +212,7 @@ class Role
} }
protected: protected:
MpiJob<EOT> & _job; MpiJob & _job;
bool _master; bool _master;
}; };
# endif // __EO_MPI_H__ # endif // __EO_MPI_H__

View file

@ -27,7 +27,7 @@ void subtask( vector<int>& v )
DynamicAssignmentAlgorithm algo( 2, MpiNode::comm().size()-1 ); DynamicAssignmentAlgorithm algo( 2, MpiNode::comm().size()-1 );
plusOne plusOneInstance; plusOne plusOneInstance;
ParallelApply<int> job( plusOneInstance, v, algo, 1 ); ParallelApply<int> job( plusOneInstance, v, algo, 1 );
Role<int> node( job ); Role node( job );
node.run(); node.run();
} }
@ -64,7 +64,7 @@ int main(int argc, char** argv)
// only one node is assigned to subjob mastering // only one node is assigned to subjob mastering
DynamicAssignmentAlgorithm algo( 1, 1 ); DynamicAssignmentAlgorithm algo( 1, 1 );
ParallelApply< vector<int> > job( transmitInstance, metaV, algo, 0 ); ParallelApply< vector<int> > job( transmitInstance, metaV, algo, 0 );
Role< vector<int> > node( job ); Role node( job );
node.run(); node.run();
if( node.master() ) if( node.master() )
{ {

View file

@ -33,7 +33,7 @@ int main(int argc, char** argv)
cout << "Création du job..." << endl; cout << "Création du job..." << endl;
ParallelApply<int> job( plusOneInstance, v, algo, 0 ); ParallelApply<int> job( plusOneInstance, v, algo, 0 );
Role<int> node( job ); Role node( job );
node.run(); node.run();
if( node.master() ) if( node.master() )