diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h index a516b442a..6d22a1bf2 100644 --- a/eo/src/mpi/eoMpi.h +++ b/eo/src/mpi/eoMpi.h @@ -8,6 +8,7 @@ # include # include +# include # include # include "eoMpiNode.h" @@ -32,28 +33,84 @@ namespace eo const int Finish = 1; } + template + struct SharedDataFunction + { + void data( Data & _d ) { d = _d; } + + protected: + Data d; + }; + + template + struct SendTaskFunction : public eoUF, public SharedDataFunction + { + virtual ~SendTaskFunction() {} + }; + + template + struct HandleResponseFunction : public eoUF, public SharedDataFunction + { + virtual ~HandleResponseFunction() {} + }; + + template + struct ProcessTaskFunction : public eoF, public SharedDataFunction + { + virtual ~ProcessTaskFunction() {} + }; + + template + struct IsFinishedFunction : public eoF, public SharedDataFunction + { + virtual ~IsFinishedFunction() {} + }; + + template + struct JobStore + { + virtual SendTaskFunction & sendTask() = 0; + virtual HandleResponseFunction & handleResponse() = 0; + virtual ProcessTaskFunction & processTask() = 0; + virtual IsFinishedFunction & isFinished() = 0; + }; + + template class Job { public: - Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) : + Job( AssignmentAlgorithm& _algo, int _masterRank, JobStore store ) : + // Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) : assignmentAlgo( _algo ), comm( Node::comm() ), + // _maxTime( maxTime ), masterRank( _masterRank ), - _maxTime( maxTime ) + // Functors + sendTask( store.sendTask() ), + handleResponse( store.handleResponse() ), + processTask( store.processTask() ), + isFinished( store.isFinished() ) { _isMaster = Node::comm().rank() == _masterRank; } + /* // master virtual bool isFinished() = 0; virtual void sendTask( int wrkRank ) = 0; virtual void handleResponse( int wrkRank ) = 0; // worker virtual void processTask( ) = 0; + */ protected: + SendTaskFunction & sendTask; + HandleResponseFunction & handleResponse; + ProcessTaskFunction & processTask; + IsFinishedFunction & isFinished; + void master( ) { int totalWorkers = assignmentAlgo.availableWorkers(); @@ -65,6 +122,7 @@ namespace eo while( ! isFinished() ) { // Time restrictions + /* getrusage( RUSAGE_SELF , &_usage ); _current = _usage.ru_utime.tv_sec + _usage.ru_stime.tv_sec; if( _maxTime > 0 && _current > _maxTime ) @@ -72,6 +130,7 @@ namespace eo timeStopped = true; break; } + */ timerStat.start("master_wait_for_assignee"); int assignee = assignmentAlgo.get( ); @@ -130,10 +189,12 @@ namespace eo # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; # endif + /* if( timeStopped ) { throw eoMaxTimeException( _current ); } + */ } void worker( ) @@ -186,7 +247,7 @@ namespace eo struct rusage _usage; long _current; - const long _maxTime; + // const long _maxTime; }; } } diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index b89da73e4..6ec78c800 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -10,15 +10,177 @@ namespace eo { namespace mpi { - template< typename EOT > - class ParallelApply : public Job + struct ParallelApplyAssignment { - private: - struct ParallelApplyAssignment + int index; + int size; + }; + + template + struct ParallelApplyData + { + ParallelApplyData() {} + + ParallelApplyData( + eoUF & _proc, + std::vector& _pop, + int _masterRank, + int _packetSize + ) : + func( _proc ), + data( _pop ), + index( 0 ), + size( _pop.size() ), + packetSize( _packetSize ), + // job + masterRank( _masterRank ), + comm( Node::comm() ) + { + tempArray = new EOT[ _packetSize ]; + } + + ~ParallelApplyData() + { + delete [] tempArray; + } + + std::vector & data; + eoUF & func; + int index; + int size; + std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks; + + int packetSize; + EOT* tempArray; + + int masterRank; + bmpi::communicator& comm; + }; + + template + struct SendTaskParallelApply : public SendTaskFunction< Data > + { + SendTaskParallelApply( Data & _d ) + { + data( _d ); + } + + using SharedDataFunction< Data >::d; + + // futureIndex, index, packetSize, size, comm, assignedTasks, data + void operator()(int wrkRank) + { + int futureIndex; + + if( d.index + d.packetSize < d.size ) { - int index; - int size; - }; + futureIndex = d.index + d.packetSize; + } else { + futureIndex = d.size; + } + + int sentSize = futureIndex - d.index ; + + d.comm.send( wrkRank, 1, sentSize ); + + eo::log << eo::progress << "Evaluating individual " << d.index << std::endl; + + d.assignedTasks[ wrkRank ].index = d.index; + d.assignedTasks[ wrkRank ].size = sentSize; + + d.comm.send( wrkRank, 1, & (d.data[ index ]) , sentSize ); + d.index = futureIndex; + } + }; + + template + struct HandleResponseParallelApply : public HandleResponseFunction< Data > + { + HandleResponseParallelApply( Data & _d ) + { + data( _d ); + } + + using SharedDataFunction< Data >::d; + void operator()(int wrkRank) + { + d.comm.recv( wrkRank, 1, & (d.data[ d.assignedTasks[wrkRank].index ] ), d.assignedTasks[wrkRank].size ); + } + }; + + template + struct ProcessTaskParallelApply : public ProcessTaskFunction< Data > + { + ProcessTaskParallelApply( Data & _d ) + { + data( _d ); + } + + using SharedDataFunction< Data >::d; + void operator()() + { + int recvSize; + d.comm.recv( d.masterRank, 1, recvSize ); + d.comm.recv( d.masterRank, 1, d.tempArray, recvSize ); + timerStat.start("worker_processes"); + for( int i = 0; i < recvSize ; ++i ) + { + d.func( d.tempArray[ i ] ); + } + timerStat.stop("worker_processes"); + d.comm.send( d.masterRank, 1, d.tempArray, recvSize ); + } + }; + + template + struct IsFinishedParallelApply : public IsFinishedFunction< Data > + { + IsFinishedParallelApply( Data & _d ) + { + data( _d ); + } + + using SharedDataFunction< Data >::d; + bool operator()() + { + return d.index == d.size; + } + }; + + template< typename Data > + struct ParallelApplyStore : public JobStore< Data > + { + ParallelApplyStore( Data & data ) + { + stpa = new SendTaskParallelApply< Data >( data ); + hrpa = new HandleResponseParallelApply< Data >( data ); + ptpa = new ProcessTaskParallelApply< Data >( data ); + ispa = new IsFinishedParallelApply< Data >( data ); + } + + ~ParallelApplyStore() + { + delete stpa; + delete hrpa; + delete ptpa; + delete ispa; + } + + SendTaskFunction< Data > & sendTask() { return *stpa; } + HandleResponseFunction< Data > & handleResponse() { return *hrpa; } + ProcessTaskFunction< Data > & processTask() { return *ptpa; } + IsFinishedFunction< Data > & isFinished() { return *ispa; } + + protected: + SendTaskParallelApply< Data >* stpa; + HandleResponseParallelApply< Data >* hrpa; + ProcessTaskParallelApply< Data >* ptpa; + IsFinishedParallelApply< Data >* ispa; + }; + + template< typename EOT > + class ParallelApply : public Job< ParallelApplyData > + { public: ParallelApply( @@ -26,85 +188,23 @@ namespace eo std::vector& _pop, AssignmentAlgorithm & algo, int _masterRank, - int _packetSize = 1, - long _maxTime = 0 + // long _maxTime = 0, + int _packetSize = 1 ) : - Job( algo, _masterRank, _maxTime ), - func( _proc ), - index( 0 ), - size( _pop.size() ), - data( _pop ), - packetSize( _packetSize ) + + Job< ParallelApplyData >( algo, _masterRank, ParallelApplyStore< ParallelApplyData >( sharedData ) ), + // Job( algo, _masterRank, _maxTime ), + sharedData( _proc, _pop, _masterRank, _packetSize ) + + { + if ( _packetSize <= 0 ) { - if ( _packetSize <= 0 ) - { - throw std::runtime_error("Packet size should not be negative."); - } - tempArray = new EOT[ packetSize ]; - } - - virtual ~ParallelApply() - { - delete [] tempArray; - } - - virtual void sendTask( int wrkRank ) - { - int futureIndex; - - if( index + packetSize < size ) - { - futureIndex = index + packetSize; - } else { - futureIndex = size; - } - - int sentSize = futureIndex - index ; - - comm.send( wrkRank, 1, sentSize ); - - eo::log << eo::progress << "Evaluating individual " << index << std::endl; - - assignedTasks[ wrkRank ].index = index; - assignedTasks[ wrkRank ].size = sentSize; - - comm.send( wrkRank, 1, &data[ index ] , sentSize ); - index = futureIndex; - } - - virtual void handleResponse( int wrkRank ) - { - comm.recv( wrkRank, 1, &data[ assignedTasks[wrkRank].index ], assignedTasks[wrkRank].size ); - } - - virtual void processTask( ) - { - int recvSize; - comm.recv( masterRank, 1, recvSize ); - comm.recv( masterRank, 1, tempArray, recvSize ); - timerStat.start("worker_processes"); - for( int i = 0; i < recvSize ; ++i ) - { - func( tempArray[ i ] ); - } - timerStat.stop("worker_processes"); - comm.send( masterRank, 1, tempArray, recvSize ); - } - - virtual bool isFinished() - { - return index == size; + throw std::runtime_error("Packet size should not be negative."); } + } protected: - std::vector & data; - eoUF& func; - int index; - int size; - std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks; - - int packetSize; - EOT* tempArray; + ParallelApplyData sharedData; }; } }