Putting everything in namespace eo::mpi

This commit is contained in:
Benjamin Bouvier 2012-06-25 14:11:44 +02:00
commit b291e56e03
12 changed files with 492 additions and 466 deletions

View file

@ -13,7 +13,7 @@ SET(EOMPI_LIB_OUTPUT_PATH ${EO_BINARY_DIR}/lib)
SET(LIBRARY_OUTPUT_PATH ${EOMPI_LIB_OUTPUT_PATH})
SET(EOMPI_SOURCES
eompi.cpp
eoMpi.cpp
)
ADD_LIBRARY(eompi STATIC ${EOMPI_SOURCES})

View file

@ -1,25 +0,0 @@
# ifndef __MPI_NODE_H__
# define __MPI_NODE_H__
# include <boost/mpi.hpp>
namespace mpi = boost::mpi;
class MpiNode
{
public:
static void init( int argc, char** argv )
{
static mpi::environment env( argc, argv );
}
static mpi::communicator& comm()
{
return _comm;
}
protected:
static mpi::communicator _comm;
};
# endif // __MPI_NODE_H__

View file

@ -1,207 +0,0 @@
# ifndef __ASSIGNMENT_ALGORITHM_H__
# define __ASSIGNMENT_ALGORITHM_H__
# include <vector>
# include "MpiNode.h"
namespace eo
{
const int REST_OF_THE_WORLD = -1;
}
struct AssignmentAlgorithm
{
virtual int get( ) = 0;
virtual int availableWorkers( ) = 0;
virtual void confirm( int wrkRank ) = 0;
virtual std::vector<int> idles( ) = 0;
};
struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm
{
public:
DynamicAssignmentAlgorithm( )
{
for(int i = 1; i < MpiNode::comm().size(); ++i)
{
availableWrk.push_back( i );
}
}
DynamicAssignmentAlgorithm( int unique )
{
availableWrk.push_back( unique );
}
DynamicAssignmentAlgorithm( const std::vector<int> & workers )
{
availableWrk = workers;
}
DynamicAssignmentAlgorithm( int first, int last )
{
if( last == eo::REST_OF_THE_WORLD )
{
last = MpiNode::comm().size() - 1;
}
for( int i = first; i <= last; ++i)
{
availableWrk.push_back( i );
}
}
virtual int get( )
{
int assignee = -1;
if (! availableWrk.empty() )
{
assignee = availableWrk.back();
availableWrk.pop_back();
}
return assignee;
}
int availableWorkers()
{
return availableWrk.size();
}
void confirm( int rank )
{
availableWrk.push_back( rank );
}
std::vector<int> idles( )
{
return availableWrk;
}
protected:
std::vector< int > availableWrk;
};
struct StaticAssignmentAlgorithm : public AssignmentAlgorithm
{
public:
StaticAssignmentAlgorithm( std::vector<int>& workers, int runs )
{
init( workers, runs );
}
StaticAssignmentAlgorithm( int first, int last, int runs )
{
std::vector<int> workers;
if( last == eo::REST_OF_THE_WORLD )
{
last = MpiNode::comm().size() - 1;
}
for(int i = first; i <= last; ++i)
{
workers.push_back( i );
}
init( workers, runs );
}
StaticAssignmentAlgorithm( int runs )
{
std::vector<int> workers;
for(int i = 1; i < MpiNode::comm().size(); ++i)
{
workers.push_back( i );
}
init( workers, runs );
}
StaticAssignmentAlgorithm( int unique, int runs )
{
std::vector<int> workers;
workers.push_back( unique );
init( workers, runs );
}
private:
void init( std::vector<int> & workers, int runs )
{
unsigned int nbWorkers = workers.size();
freeWorkers = nbWorkers;
attributions.reserve( nbWorkers );
busy.resize( nbWorkers, false );
// Let be the euclidean division of runs by nbWorkers :
// runs == q * nbWorkers + r, 0 <= r < nbWorkers
// This one liner affects q requests to each worker
for (unsigned int i = 0; i < nbWorkers; attributions[i++] = runs / nbWorkers) ;
// The first line computes r and the one liner affects the remaining
// r requests to workers, in ascending order
unsigned int diff = runs - (runs / nbWorkers) * nbWorkers;
for (unsigned int i = 0; i < diff; ++attributions[i++]);
realRank = workers;
}
public:
int get( )
{
int assignee = -1;
for( unsigned i = 0; i < busy.size(); ++i )
{
if( !busy[i] && attributions[i] > 0 )
{
busy[i] = true;
--freeWorkers;
assignee = realRank[ i ];
break;
}
}
return assignee;
}
int availableWorkers( )
{
return freeWorkers;
}
std::vector<int> idles()
{
std::vector<int> ret;
for(unsigned int i = 0; i < busy.size(); ++i)
{
if( !busy[i] )
{
eo::log << "Idle : " << realRank[i] <<
" / attributions : " << attributions[i] << std::endl;
ret.push_back( realRank[i] );
}
}
return ret;
}
void confirm( int rank )
{
int i = -1;
for( int j = 0; j < realRank.size(); ++j )
{
if( realRank[j] == rank )
{
i = j;
break;
}
}
--attributions[ i ];
busy[ i ] = false;
++freeWorkers;
}
private:
std::vector<int> attributions;
std::vector<int> realRank;
std::vector<bool> busy;
unsigned int freeWorkers;
};
# endif // __ASSIGNMENT_ALGORITHM_H__

11
eo/src/mpi/eoMpi.cpp Normal file
View file

@ -0,0 +1,11 @@
# include "eoMpi.h"
// MpiNode* MpiNodeStore::singleton;
namespace eo
{
namespace mpi
{
bmpi::communicator Node::_comm;
}
}

132
eo/src/mpi/eoMpi.h Normal file
View file

@ -0,0 +1,132 @@
# ifndef __EO_MPI_H__
# define __EO_MPI_H__
# include <vector>
# include <map>
# include <utils/eoLogger.h>
# include "eoMpiNode.h"
# include "eoMpiAssignmentAlgorithm.h"
// TODO TODOB comment!
namespace eo
{
namespace mpi
{
namespace Channel
{
const int Commands = 0;
}
namespace Message
{
const int Continue = 0;
const int Finish = 1;
}
class Job
{
public:
Job( AssignmentAlgorithm& _algo, int _masterRank ) :
assignmentAlgo( _algo ),
comm( Node::comm() ),
masterRank( _masterRank )
{
_isMaster = Node::comm().rank() == _masterRank;
}
// master
virtual bool isFinished() = 0;
virtual void sendTask( int wrkRank ) = 0;
virtual void handleResponse( int wrkRank ) = 0;
// worker
virtual void processTask( ) = 0;
void master( )
{
int totalWorkers = assignmentAlgo.availableWorkers();
eo::log << eo::debug;
eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl;
while( ! isFinished() )
{
int assignee = assignmentAlgo.get( );
while( assignee <= 0 )
{
eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl;
bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag );
int wrkRank = status.source();
eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl;
handleResponse( wrkRank );
assignmentAlgo.confirm( wrkRank );
assignee = assignmentAlgo.get( );
}
eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl;
comm.send( assignee, Channel::Commands, Message::Continue );
sendTask( assignee );
}
eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl;
// frees all the idle workers
std::vector<int> idles = assignmentAlgo.idles();
for(unsigned int i = 0; i < idles.size(); ++i)
{
comm.send( idles[i], Channel::Commands, Message::Finish );
}
eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl;
// wait for all responses
while( assignmentAlgo.availableWorkers() != totalWorkers )
{
bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag );
int wrkRank = status.source();
handleResponse( wrkRank );
comm.send( wrkRank, Channel::Commands, Message::Finish );
assignmentAlgo.confirm( wrkRank );
}
eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl;
}
void worker( )
{
int order;
eo::log << eo::debug;
while( true )
{
eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl;
comm.recv( masterRank, Channel::Commands, order );
if ( order == Message::Finish )
{
eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl;
return;
} else
{
eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl;
processTask( );
}
}
}
void run( )
{
( _isMaster ) ? master( ) : worker( );
}
bool isMaster( )
{
return _isMaster;
}
protected:
AssignmentAlgorithm& assignmentAlgo;
bmpi::communicator& comm;
int masterRank;
bool _isMaster;
};
}
}
# endif // __EO_MPI_H__

View file

@ -0,0 +1,207 @@
# ifndef __MPI_ASSIGNMENT_ALGORITHM_H__
# define __MPI_ASSIGNMENT_ALGORITHM_H__
# include <vector>
# include "eoMpiNode.h"
namespace eo
{
namespace mpi
{
const int REST_OF_THE_WORLD = -1;
struct AssignmentAlgorithm
{
virtual int get( ) = 0;
virtual int availableWorkers( ) = 0;
virtual void confirm( int wrkRank ) = 0;
virtual std::vector<int> idles( ) = 0;
};
struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm
{
public:
DynamicAssignmentAlgorithm( )
{
for(int i = 1; i < Node::comm().size(); ++i)
{
availableWrk.push_back( i );
}
}
DynamicAssignmentAlgorithm( int unique )
{
availableWrk.push_back( unique );
}
DynamicAssignmentAlgorithm( const std::vector<int> & workers )
{
availableWrk = workers;
}
DynamicAssignmentAlgorithm( int first, int last )
{
if( last == REST_OF_THE_WORLD )
{
last = Node::comm().size() - 1;
}
for( int i = first; i <= last; ++i)
{
availableWrk.push_back( i );
}
}
virtual int get( )
{
int assignee = -1;
if (! availableWrk.empty() )
{
assignee = availableWrk.back();
availableWrk.pop_back();
}
return assignee;
}
int availableWorkers()
{
return availableWrk.size();
}
void confirm( int rank )
{
availableWrk.push_back( rank );
}
std::vector<int> idles( )
{
return availableWrk;
}
protected:
std::vector< int > availableWrk;
};
struct StaticAssignmentAlgorithm : public AssignmentAlgorithm
{
public:
StaticAssignmentAlgorithm( std::vector<int>& workers, int runs )
{
init( workers, runs );
}
StaticAssignmentAlgorithm( int first, int last, int runs )
{
std::vector<int> workers;
if( last == REST_OF_THE_WORLD )
{
last = Node::comm().size() - 1;
}
for(int i = first; i <= last; ++i)
{
workers.push_back( i );
}
init( workers, runs );
}
StaticAssignmentAlgorithm( int runs )
{
std::vector<int> workers;
for(int i = 1; i < Node::comm().size(); ++i)
{
workers.push_back( i );
}
init( workers, runs );
}
StaticAssignmentAlgorithm( int unique, int runs )
{
std::vector<int> workers;
workers.push_back( unique );
init( workers, runs );
}
private:
void init( std::vector<int> & workers, int runs )
{
unsigned int nbWorkers = workers.size();
freeWorkers = nbWorkers;
attributions.reserve( nbWorkers );
busy.resize( nbWorkers, false );
// Let be the euclidean division of runs by nbWorkers :
// runs == q * nbWorkers + r, 0 <= r < nbWorkers
// This one liner affects q requests to each worker
for (unsigned int i = 0; i < nbWorkers; attributions[i++] = runs / nbWorkers) ;
// The first line computes r and the one liner affects the remaining
// r requests to workers, in ascending order
unsigned int diff = runs - (runs / nbWorkers) * nbWorkers;
for (unsigned int i = 0; i < diff; ++attributions[i++]);
realRank = workers;
}
public:
int get( )
{
int assignee = -1;
for( unsigned i = 0; i < busy.size(); ++i )
{
if( !busy[i] && attributions[i] > 0 )
{
busy[i] = true;
--freeWorkers;
assignee = realRank[ i ];
break;
}
}
return assignee;
}
int availableWorkers( )
{
return freeWorkers;
}
std::vector<int> idles()
{
std::vector<int> ret;
for(unsigned int i = 0; i < busy.size(); ++i)
{
if( !busy[i] )
{
ret.push_back( realRank[i] );
}
}
return ret;
}
void confirm( int rank )
{
int i = -1;
for( unsigned int j = 0; j < realRank.size(); ++j )
{
if( realRank[j] == rank )
{
i = j;
break;
}
}
--attributions[ i ];
busy[ i ] = false;
++freeWorkers;
}
private:
std::vector<int> attributions;
std::vector<int> realRank;
std::vector<bool> busy;
unsigned int freeWorkers;
};
}
}
# endif // __MPI_ASSIGNMENT_ALGORITHM_H__

31
eo/src/mpi/eoMpiNode.h Normal file
View file

@ -0,0 +1,31 @@
# ifndef __MPI_NODE_H__
# define __MPI_NODE_H__
# include <boost/mpi.hpp>
namespace bmpi = boost::mpi;
namespace eo
{
namespace mpi
{
class Node
{
public:
static void init( int argc, char** argv )
{
static bmpi::environment env( argc, argv );
}
static bmpi::communicator& comm()
{
return _comm;
}
protected:
static bmpi::communicator _comm;
};
}
}
# endif // __MPI_NODE_H__

View file

@ -1,102 +1,107 @@
# ifndef __EO_PARALLEL_APPLY_H__
# define __EO_PARALLEL_APPLY_H__
# include "eompi.h"
# include "eoMpi.h"
# include <eoFunctor.h>
# include <vector>
template< typename EOT >
class ParallelApply : public MpiJob
namespace eo
{
private:
struct ParallelApplyAssignment
namespace mpi
{
template< typename EOT >
class ParallelApply : public Job
{
int index;
int size;
private:
struct ParallelApplyAssignment
{
int index;
int size;
};
public:
ParallelApply(
eoUF<EOT&, void> & _proc,
std::vector<EOT>& _pop,
AssignmentAlgorithm & algo,
int _masterRank,
int _packetSize = 1
) :
Job( algo, _masterRank ),
func( _proc ),
index( 0 ),
size( _pop.size() ),
data( _pop ),
packetSize( _packetSize )
{
if ( _packetSize <= 0 )
{
throw std::runtime_error("Packet size should not be negative.");
}
tempArray = new EOT[ packetSize ];
}
~ParallelApply()
{
delete [] tempArray;
}
virtual void sendTask( int wrkRank )
{
int futureIndex;
if( index + packetSize < size )
{
futureIndex = index + packetSize;
} else {
futureIndex = size;
}
int sentSize = futureIndex - index ;
comm.send( wrkRank, 1, sentSize );
assignedTasks[ wrkRank ].index = index;
assignedTasks[ wrkRank ].size = sentSize;
comm.send( wrkRank, 1, &data[ index ] , sentSize );
index = futureIndex;
}
virtual void handleResponse( int wrkRank )
{
comm.recv( wrkRank, 1, &data[ assignedTasks[wrkRank].index ], assignedTasks[wrkRank].size );
}
virtual void processTask( )
{
int recvSize;
comm.recv( masterRank, 1, recvSize );
comm.recv( masterRank, 1, tempArray, recvSize );
for( int i = 0; i < recvSize ; ++i )
{
func( tempArray[ i ] );
}
comm.send( masterRank, 1, tempArray, recvSize );
}
bool isFinished()
{
return index == size;
}
protected:
std::vector<EOT> & data;
eoUF<EOT&, void>& func;
int index;
int size;
std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks;
int packetSize;
EOT* tempArray;
};
public:
ParallelApply(
eoUF<EOT&, void> & _proc,
std::vector<EOT>& _pop,
AssignmentAlgorithm & algo,
int _masterRank,
int _packetSize = 1
) :
MpiJob( algo, _masterRank ),
func( _proc ),
index( 0 ),
size( _pop.size() ),
data( _pop ),
packetSize( _packetSize )
{
if ( _packetSize <= 0 )
{
throw std::runtime_error("Packet size should not be negative.");
}
tempArray = new EOT[ packetSize ];
}
~ParallelApply()
{
delete [] tempArray;
}
virtual void sendTask( int wrkRank )
{
int futureIndex;
if( index + packetSize < size )
{
futureIndex = index + packetSize;
} else {
futureIndex = size;
}
int sentSize = futureIndex - index ;
comm.send( wrkRank, 1, sentSize );
assignedTasks[ wrkRank ].index = index;
assignedTasks[ wrkRank ].size = sentSize;
comm.send( wrkRank, 1, &data[ index ] , sentSize );
index = futureIndex;
}
virtual void handleResponse( int wrkRank )
{
comm.recv( wrkRank, 1, &data[ assignedTasks[wrkRank].index ], assignedTasks[wrkRank].size );
}
virtual void processTask( )
{
int recvSize;
comm.recv( masterRank, 1, recvSize );
comm.recv( masterRank, 1, tempArray, recvSize );
for( int i = 0; i < recvSize ; ++i )
{
func( tempArray[ i ] );
}
comm.send( masterRank, 1, tempArray, recvSize );
}
bool isFinished()
{
return index == size;
}
protected:
std::vector<EOT> & data;
eoUF<EOT&, void>& func;
int index;
int size;
std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks;
int packetSize;
EOT* tempArray;
};
}
}
# endif // __EO_PARALLEL_APPLY_H__

View file

@ -1,5 +0,0 @@
# include "eompi.h"
// MpiNode* MpiNodeStore::singleton;
mpi::communicator MpiNode::_comm;

View file

@ -1,127 +0,0 @@
# ifndef __EO_MPI_H__
# define __EO_MPI_H__
# include <vector>
# include <map>
# include <utils/eoLogger.h>
# include "MpiNode.h"
# include "assignmentAlgorithm.h"
// TODO TODOB comment!
namespace EoMpi
{
namespace Channel
{
const int Commands = 0;
}
namespace Message
{
const int Continue = 0;
const int Finish = 1;
}
}
class MpiJob
{
public:
MpiJob( AssignmentAlgorithm& _algo, int _masterRank ) :
assignmentAlgo( _algo ),
comm( MpiNode::comm() ),
masterRank( _masterRank )
{
_isMaster = MpiNode::comm().rank() == _masterRank;
}
// master
virtual bool isFinished() = 0;
virtual void sendTask( int wrkRank ) = 0;
virtual void handleResponse( int wrkRank ) = 0;
// worker
virtual void processTask( ) = 0;
void master( )
{
int totalWorkers = assignmentAlgo.availableWorkers();
eo::log << eo::debug;
eo::log << "[M] Have " << totalWorkers << " workers." << std::endl;
while( ! isFinished() )
{
int assignee = assignmentAlgo.get( );
while( assignee <= 0 )
{
eo::log << "[M] Waitin' for node..." << std::endl;
mpi::status status = comm.probe( mpi::any_source, mpi::any_tag );
int wrkRank = status.source();
eo::log << "[M] Node " << wrkRank << " just terminated." << std::endl;
handleResponse( wrkRank );
assignmentAlgo.confirm( wrkRank );
assignee = assignmentAlgo.get( );
}
eo::log << "[M] Assignee : " << assignee << std::endl;
comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue );
sendTask( assignee );
}
eo::log << "[M] Frees all the idle." << std::endl;
// frees all the idle workers
std::vector<int> idles = assignmentAlgo.idles();
for(unsigned int i = 0; i < idles.size(); ++i)
{
comm.send( idles[i], EoMpi::Channel::Commands, EoMpi::Message::Finish );
}
eo::log << "[M] Waits for all responses." << std::endl;
// wait for all responses
while( assignmentAlgo.availableWorkers() != totalWorkers )
{
mpi::status status = comm.probe( mpi::any_source, mpi::any_tag );
int wrkRank = status.source();
handleResponse( wrkRank );
comm.send( wrkRank, EoMpi::Channel::Commands, EoMpi::Message::Finish );
assignmentAlgo.confirm( wrkRank );
}
eo::log << "[M] Leaving master task." << std::endl;
}
void worker( )
{
int order;
eo::log << eo::debug;
while( true )
{
eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl;
comm.recv( masterRank, EoMpi::Channel::Commands, order );
if ( order == EoMpi::Message::Finish )
{
eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl;
return;
} else
{
eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl;
processTask( );
}
}
}
void run( )
{
( _isMaster ) ? master( ) : worker( );
}
bool isMaster( )
{
return _isMaster;
}
protected:
AssignmentAlgorithm& assignmentAlgo;
mpi::communicator& comm;
int masterRank;
bool _isMaster;
};
# endif // __EO_MPI_H__