Merged MpiJob and Role, using eoLogger instead of cout.

This commit is contained in:
Benjamin Bouvier 2012-06-22 14:24:23 +02:00
commit da9eb9ce7b
3 changed files with 38 additions and 65 deletions

View file

@ -36,18 +36,18 @@ class ParallelApply : public MpiJob
virtual void processTask( ) virtual void processTask( )
{ {
EOT ind; EOT ind;
comm.recv( _masterRank, 1, ind ); comm.recv( masterRank, 1, ind );
func( ind ); func( ind );
comm.send( _masterRank, 1, ind ); comm.send( masterRank, 1, ind );
} }
bool isFinished() bool isFinished()
{ {
return index = size; return index == size;
} }
protected: protected:
vector<EOT> & data; std::vector<EOT> & data;
eoUF<EOT&, void>& func; eoUF<EOT&, void>& func;
int index; int index;
int size; int size;

View file

@ -7,10 +7,9 @@
# include <boost/mpi.hpp> # include <boost/mpi.hpp>
namespace mpi = boost::mpi; namespace mpi = boost::mpi;
# include "assignmentAlgorithm.h" # include <utils/eoLogger.h>
# include <iostream> # include "assignmentAlgorithm.h"
using namespace std;
// TODO TODOB comment! // TODO TODOB comment!
namespace EoMpi namespace EoMpi
@ -49,12 +48,12 @@ class MpiJob
{ {
public: public:
MpiJob( AssignmentAlgorithm& algo, int masterRank ) : MpiJob( AssignmentAlgorithm& _algo, int _masterRank ) :
assignmentAlgo( algo ), assignmentAlgo( _algo ),
comm( MpiNode::comm() ), comm( MpiNode::comm() ),
_masterRank( masterRank ) masterRank( _masterRank )
{ {
// empty _isMaster = MpiNode::comm().rank() == _masterRank;
} }
// master // master
@ -67,18 +66,19 @@ class MpiJob
void master( ) void master( )
{ {
int totalWorkers = assignmentAlgo.size(); int totalWorkers = assignmentAlgo.size();
cout << "[M] Have " << totalWorkers << " workers." << endl; eo::log << eo::debug;
eo::log << "[M] Have " << totalWorkers << " workers." << std::endl;
while( ! isFinished() ) while( ! isFinished() )
{ {
int assignee = assignmentAlgo.get( ); int assignee = assignmentAlgo.get( );
cout << "[M] Assignee : " << assignee << endl; eo::log << "[M] Assignee : " << assignee << std::endl;
while( assignee <= 0 ) while( assignee <= 0 )
{ {
cout << "[M] Waitin' for node..." << endl; eo::log << "[M] Waitin' for node..." << std::endl;
mpi::status status = comm.probe( mpi::any_source, mpi::any_tag ); mpi::status status = comm.probe( mpi::any_source, mpi::any_tag );
int wrkRank = status.source(); int wrkRank = status.source();
cout << "[M] Node " << wrkRank << " just terminated." << endl; eo::log << "[M] Node " << wrkRank << " just terminated." << std::endl;
handleResponse( wrkRank ); handleResponse( wrkRank );
assignmentAlgo.confirm( wrkRank ); assignmentAlgo.confirm( wrkRank );
assignee = assignmentAlgo.get( ); assignee = assignmentAlgo.get( );
@ -87,10 +87,10 @@ class MpiJob
sendTask( assignee ); sendTask( assignee );
} }
cout << "[M] Frees all the idle." << endl; eo::log << "[M] Frees all the idle." << std::endl;
// frees all the idle workers // frees all the idle workers
int idle; int idle;
vector<int> idles; std::vector<int> idles;
while ( ( idle = assignmentAlgo.get( ) ) > 0 ) while ( ( idle = assignmentAlgo.get( ) ) > 0 )
{ {
comm.send( idle, EoMpi::Channel::Commands, EoMpi::Message::Finish ); comm.send( idle, EoMpi::Channel::Commands, EoMpi::Message::Finish );
@ -101,7 +101,7 @@ class MpiJob
assignmentAlgo.confirm( idles[i] ); assignmentAlgo.confirm( idles[i] );
} }
cout << "[M] Waits for all responses." << endl; eo::log << "[M] Waits for all responses." << std::endl;
// wait for all responses // wait for all responses
while( assignmentAlgo.size() != totalWorkers ) while( assignmentAlgo.size() != totalWorkers )
{ {
@ -112,70 +112,43 @@ class MpiJob
assignmentAlgo.confirm( wrkRank ); assignmentAlgo.confirm( wrkRank );
} }
cout << "[M] Leaving master task." << endl; eo::log << "[M] Leaving master task." << std::endl;
} }
void worker( ) void worker( )
{ {
int order; int order;
eo::log << eo::debug;
while( true ) while( true )
{ {
cout << "[W] Waiting for an order..." << std::endl; eo::log << "[W] Waiting for an order..." << std::endl;
comm.recv( _masterRank, EoMpi::Channel::Commands, order ); comm.recv( masterRank, EoMpi::Channel::Commands, order );
if ( order == EoMpi::Message::Finish ) if ( order == EoMpi::Message::Finish )
{ {
return; return;
} else } else
{ {
cout << "[W] Processing task..." << std::endl; eo::log << "[W] Processing task..." << std::endl;
processTask( ); processTask( );
} }
} }
} }
int masterRank() void run( )
{ {
return _masterRank; ( _isMaster ) ? master( ) : worker( );
}
bool isMaster( )
{
return _isMaster;
} }
protected: protected:
AssignmentAlgorithm& assignmentAlgo; AssignmentAlgorithm& assignmentAlgo;
mpi::communicator& comm; mpi::communicator& comm;
int _masterRank; int masterRank;
}; bool _isMaster;
class Role
{
public:
Role( MpiJob & job ) :
_job( job )
{
_master = job.masterRank() == MpiNode::comm().rank();
}
bool master()
{
return _master;
}
virtual void run( )
{
if( _master )
{
_job.master( );
} else
{
_job.worker( );
}
}
virtual ~Role()
{
// empty
}
protected:
MpiJob & _job;
bool _master;
}; };
# endif // __EO_MPI_H__ # endif // __EO_MPI_H__

View file

@ -16,9 +16,10 @@ struct plusOne : public eoUF< int&, void >
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
eo::log << eo::setlevel( eo::debug );
cout << "Appel à init... " << endl; cout << "Appel à init... " << endl;
MpiNode::init( argc, argv ); MpiNode::init( argc, argv );
DynamicAssignmentAlgorithm algo( 1, MpiNode::comm().size()-1 ); DynamicAssignmentAlgorithm assign( 1, MpiNode::comm().size()-1 );
cout << "Création des données... " << endl; cout << "Création des données... " << endl;
vector<int> v; vector<int> v;
@ -32,11 +33,10 @@ int main(int argc, char** argv)
plusOne plusOneInstance; plusOne plusOneInstance;
cout << "Création du job..." << endl; cout << "Création du job..." << endl;
ParallelApply<int> job( plusOneInstance, v, algo, 0 ); ParallelApply<int> job( plusOneInstance, v, assign, 0 );
Role node( job ); job.run();
node.run();
if( node.master() ) if( job.isMaster() )
{ {
for(int i = 0; i < v.size(); ++i) for(int i = 0; i < v.size(); ++i)
{ {