eoMpi API is simpler and allows to have multiple roles for a given node.

This commit is contained in:
Benjamin Bouvier 2012-06-21 16:10:51 +02:00
commit 122d0debf0
5 changed files with 111 additions and 158 deletions

View file

@ -0,0 +1,49 @@
# ifndef __ASSIGNMENT_ALGORITHM_H__
# define __ASSIGNMENT_ALGORITHM_H__
struct AssignmentAlgorithm
{
virtual int get( ) = 0;
virtual int size( ) = 0;
virtual void confirm( int wrkRank ) = 0;
};
struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm
{
public:
DynamicAssignmentAlgorithm( int offset, int size )
{
for( int i = 0; offset + i < size; ++i)
{
availableWrk.push_back( offset + i );
}
}
virtual int get( )
{
int assignee = -1;
if (! availableWrk.empty() )
{
assignee = availableWrk.back();
availableWrk.pop_back();
}
return assignee;
}
int size()
{
return availableWrk.size();
}
void confirm( int rank )
{
availableWrk.push_back( rank );
}
protected:
std::vector< int > availableWrk;
};
# endif // __ASSIGNMENT_ALGORITHM_H__

View file

@ -11,8 +11,8 @@ class ParallelApply : public MpiJob< EOT >
{ {
public: public:
ParallelApply( eoUF<EOT&, void> & _proc, std::vector<EOT>& _pop ) : ParallelApply( eoUF<EOT&, void> & _proc, std::vector<EOT>& _pop, AssignmentAlgorithm & algo ) :
MpiJob<EOT>( _pop ), MpiJob<EOT>( _pop, algo ),
func( _proc ) func( _proc )
{ {
// empty // empty

View file

@ -1,4 +1,5 @@
# include "eompi.h" # include "eompi.h"
MpiNode* MpiNodeStore::singleton; // MpiNode* MpiNodeStore::singleton;
mpi::communicator MpiNode::_comm;

View file

@ -7,6 +7,8 @@
# include <boost/mpi.hpp> # include <boost/mpi.hpp>
namespace mpi = boost::mpi; namespace mpi = boost::mpi;
# include "assignmentAlgorithm.h"
# include <iostream> # include <iostream>
using namespace std; using namespace std;
// TODO TODOB comment! // TODO TODOB comment!
@ -25,102 +27,22 @@ namespace EoMpi
} }
} }
class MpiNode; class MpiNode
class MpiNodeStore
{ {
public: public:
static void instance( MpiNode* _instance ) static void init( int argc, char** argv )
{ {
singleton = _instance; static mpi::environment env( argc, argv );
} }
static MpiNode* instance() static mpi::communicator& comm()
{
return singleton;
}
protected:
static MpiNode* singleton;
};
class MpiNode
{
protected:
mpi::environment& env;
mpi::communicator& _comm;
int rank;
int size;
int argc;
char** argv;
public:
MpiNode( mpi::environment& _env, mpi::communicator& __comm ) :
env(_env),
_comm(__comm),
rank(__comm.rank()),
size(__comm.size())
{
// empty
}
virtual ~MpiNode()
{
// empty
}
mpi::communicator& comm()
{ {
return _comm; return _comm;
} }
};
struct AssignmentAlgorithm
{
virtual int get( ) = 0;
virtual void size( int s ) = 0;
virtual int size( ) = 0;
virtual void confirm( int wrkRank ) = 0;
};
struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm
{
public:
virtual int get( )
{
int assignee = -1;
if (! availableWrk.empty() )
{
assignee = availableWrk.back();
availableWrk.pop_back();
}
return assignee;
}
;
void size( int s )
{
for( int i = 1; i < s ; ++i )
{
availableWrk.push_back( i );
}
}
int size()
{
return availableWrk.size();
}
void confirm( int rank )
{
availableWrk.push_back( rank );
}
protected: protected:
std::vector< int > availableWrk; static mpi::communicator _comm;
}; };
template< typename EOT > template< typename EOT >
@ -128,9 +50,10 @@ class MpiJob
{ {
public: public:
MpiJob( std::vector< EOT > & _data ) : MpiJob( std::vector< EOT > & _data, AssignmentAlgorithm& algo ) :
data( _data ), data( _data ),
comm( MpiNodeStore::instance()->comm() ) comm( MpiNode::comm() ),
assignmentAlgo( algo )
{ {
// empty // empty
} }
@ -141,14 +64,14 @@ class MpiJob
// worker // worker
virtual void processTask( ) = 0; virtual void processTask( ) = 0;
void master( AssignmentAlgorithm & assignmentAlgorithm ) void master( )
{ {
for( int i = 0, size = data.size(); for( int i = 0, size = data.size();
i < size; i < size;
++i) ++i)
{ {
cout << "Beginning loop for i = " << i << endl; cout << "Beginning loop for i = " << i << endl;
int assignee = assignmentAlgorithm.get( ); int assignee = assignmentAlgo.get( );
cout << "Assignee : " << assignee << endl; cout << "Assignee : " << assignee << endl;
while( assignee <= 0 ) while( assignee <= 0 )
{ {
@ -157,8 +80,8 @@ class MpiJob
int wrkRank = status.source(); int wrkRank = status.source();
cout << "Node " << wrkRank << " just terminated." << endl; cout << "Node " << wrkRank << " just terminated." << endl;
handleResponse( wrkRank, assignedTasks[ wrkRank ] ); handleResponse( wrkRank, assignedTasks[ wrkRank ] );
assignmentAlgorithm.confirm( wrkRank ); assignmentAlgo.confirm( wrkRank );
assignee = assignmentAlgorithm.get( ); assignee = assignmentAlgo.get( );
} }
cout << "Assignee found : " << assignee << endl; cout << "Assignee found : " << assignee << endl;
assignedTasks[ assignee ] = i; assignedTasks[ assignee ] = i;
@ -169,25 +92,25 @@ class MpiJob
// frees all the idle workers // frees all the idle workers
int idle; int idle;
vector<int> idles; vector<int> idles;
while ( ( idle = assignmentAlgorithm.get( ) ) > 0 ) while ( ( idle = assignmentAlgo.get( ) ) > 0 )
{ {
comm.send( idle, EoMpi::Channel::Commands, EoMpi::Message::Finish ); comm.send( idle, EoMpi::Channel::Commands, EoMpi::Message::Finish );
idles.push_back( idle ); idles.push_back( idle );
} }
for (int i = 0; i < idles.size(); ++i) for (int i = 0; i < idles.size(); ++i)
{ {
assignmentAlgorithm.confirm( idles[i] ); assignmentAlgo.confirm( idles[i] );
} }
// wait for all responses // wait for all responses
int wrkNb = comm.size() - 1; int wrkNb = comm.size() - 1;
while( assignmentAlgorithm.size() != wrkNb ) while( assignmentAlgo.size() != wrkNb )
{ {
mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); mpi::status status = comm.probe( mpi::any_source, mpi::any_tag );
int wrkRank = status.source(); int wrkRank = status.source();
handleResponse( wrkRank, assignedTasks[ wrkRank ] ); handleResponse( wrkRank, assignedTasks[ wrkRank ] );
comm.send( wrkRank, EoMpi::Channel::Commands, EoMpi::Message::Finish ); comm.send( wrkRank, EoMpi::Channel::Commands, EoMpi::Message::Finish );
assignmentAlgorithm.confirm( wrkRank ); assignmentAlgo.confirm( wrkRank );
} }
} }
@ -211,77 +134,44 @@ protected:
std::vector<EOT> & data; std::vector<EOT> & data;
std::map< int /* worker rank */, int /* index in vector */> assignedTasks; std::map< int /* worker rank */, int /* index in vector */> assignedTasks;
AssignmentAlgorithm& assignmentAlgo;
mpi::communicator& comm; mpi::communicator& comm;
}; };
class MasterNode : public MpiNode template< class EOT >
{ class Role
public:
MasterNode( int _argc, char** _argv,
mpi::environment& _env,
mpi::communicator& _comm
) :
MpiNode(_env, _comm )
{
// empty
}
void setAssignmentAlgorithm( AssignmentAlgorithm* assignmentAlgo )
{
_assignmentAlgo = assignmentAlgo;
_assignmentAlgo->size( _comm.size() );
}
template< typename EOT >
void run( MpiJob< EOT > & job )
{
job.master( *_assignmentAlgo );
}
protected:
AssignmentAlgorithm* _assignmentAlgo;
};
class WorkerNode : public MpiNode
{ {
public: public:
Role( MpiJob<EOT> & job, bool master ) :
WorkerNode( _job( job ),
int _argc, char** _argv, _master( master )
mpi::environment& _env,
mpi::communicator& _comm ) :
MpiNode( _env, _comm )
{ {
// empty // empty
} }
template< typename EOT > bool master()
void run( MpiJob<EOT> & job )
{ {
job.worker( ); return _master;
} }
};
class MpiSingletonFactory virtual void run( )
{
public:
static void init( int argc, char** argv )
{
MpiNode* singleton;
//mpi::environment* env = new mpi::environment ( argc, argv );
//mpi::communicator* world = new mpi::communicator; // TODO clean
static mpi::environment env( argc, argv );
static mpi::communicator world;
if ( world.rank() == 0 )
{ {
singleton = new MasterNode( argc, argv, env, world ); if( _master )
} else {
{ _job.master( );
singleton = new WorkerNode( argc, argv, env, world ); } else
{
_job.worker( );
}
} }
MpiNodeStore::instance( singleton );
} virtual ~Role()
{
// empty
}
protected:
bool _master;
MpiJob<EOT> & _job;
}; };
# endif // __EO_MPI_H__ # endif // __EO_MPI_H__

View file

@ -1,5 +1,6 @@
# include <mpi/eompi.h> # include <mpi/eompi.h>
# include <mpi/eoParallelApply.h> # include <mpi/eoParallelApply.h>
# include <iostream> # include <iostream>
# include <vector> # include <vector>
@ -15,9 +16,9 @@ struct plusOne : public eoUF< int&, void >
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
DynamicAssignmentAlgorithm algo;
cout << "Appel à init... " << endl; cout << "Appel à init... " << endl;
MpiSingletonFactory::init( argc, argv ); MpiNode::init( argc, argv );
DynamicAssignmentAlgorithm algo( 1, MpiNode::comm().size() );
cout << "Création des données... " << endl; cout << "Création des données... " << endl;
vector<int> v; vector<int> v;
@ -31,8 +32,19 @@ int main(int argc, char** argv)
plusOne plusOneInstance; plusOne plusOneInstance;
cout << "Création du job..." << endl; cout << "Création du job..." << endl;
ParallelApply<int> job( plusOneInstance, v ); ParallelApply<int> job( plusOneInstance, v, algo );
Role<int> node( job, MpiNode::comm().rank() == 0 );
node.run();
if( node.master() )
{
for(int i = 0; i < v.size(); ++i)
{
cout << v[i] << ' ';
}
cout << endl;
}
/*
cout << "Création de l'instance..." << endl; cout << "Création de l'instance..." << endl;
MpiNode* instance = MpiNodeStore::instance(); MpiNode* instance = MpiNodeStore::instance();
if( dynamic_cast<MasterNode*>( instance ) != 0 ) if( dynamic_cast<MasterNode*>( instance ) != 0 )
@ -54,6 +66,7 @@ int main(int argc, char** argv)
{ {
cout << "Nothing to be done;" << endl; cout << "Nothing to be done;" << endl;
} }
*/
return 0; return 0;
} }