diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h index 6d22a1bf..433e4c27 100644 --- a/eo/src/mpi/eoMpi.h +++ b/eo/src/mpi/eoMpi.h @@ -33,54 +33,42 @@ namespace eo const int Finish = 1; } - template - struct SharedDataFunction - { - void data( Data & _d ) { d = _d; } - - protected: - Data d; - }; - - template - struct SendTaskFunction : public eoUF, public SharedDataFunction + class SendTaskFunction : public eoUF { + public: virtual ~SendTaskFunction() {} }; - template - struct HandleResponseFunction : public eoUF, public SharedDataFunction + class HandleResponseFunction : public eoUF { + public: virtual ~HandleResponseFunction() {} }; - template - struct ProcessTaskFunction : public eoF, public SharedDataFunction + class ProcessTaskFunction : public eoF { + public: virtual ~ProcessTaskFunction() {} }; - template - struct IsFinishedFunction : public eoF, public SharedDataFunction + class IsFinishedFunction : public eoF { + public: virtual ~IsFinishedFunction() {} }; - template struct JobStore { - virtual SendTaskFunction & sendTask() = 0; - virtual HandleResponseFunction & handleResponse() = 0; - virtual ProcessTaskFunction & processTask() = 0; - virtual IsFinishedFunction & isFinished() = 0; + virtual SendTaskFunction & sendTask() const = 0; + virtual HandleResponseFunction & handleResponse() const = 0; + virtual ProcessTaskFunction & processTask() const = 0; + virtual IsFinishedFunction & isFinished() const = 0; }; - template class Job { public: - - Job( AssignmentAlgorithm& _algo, int _masterRank, JobStore store ) : + Job( AssignmentAlgorithm& _algo, int _masterRank, const JobStore & store ) : // Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) : assignmentAlgo( _algo ), comm( Node::comm() ), @@ -106,10 +94,10 @@ namespace eo protected: - SendTaskFunction & sendTask; - HandleResponseFunction & handleResponse; - ProcessTaskFunction & processTask; - IsFinishedFunction & isFinished; + SendTaskFunction & sendTask; + HandleResponseFunction & handleResponse; + ProcessTaskFunction & processTask; + IsFinishedFunction & isFinished; void master( ) { diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 6ec78c80..9cc7bb5f 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -17,145 +17,178 @@ namespace eo }; template - struct ParallelApplyData - { - ParallelApplyData() {} + class SendTaskParallelApply; - ParallelApplyData( - eoUF & _proc, - std::vector& _pop, - int _masterRank, - int _packetSize - ) : - func( _proc ), - data( _pop ), - index( 0 ), - size( _pop.size() ), - packetSize( _packetSize ), - // job - masterRank( _masterRank ), - comm( Node::comm() ) + template + class HandleResponseParallelApply; + + template + class ProcessTaskParallelApply; + + template + class IsFinishedParallelApply; + + template + class ParallelApply; + + template< class EOT > + class BaseParallelApply + { + public: + void owner(ParallelApply * job) { - tempArray = new EOT[ _packetSize ]; + j = job; } - ~ParallelApplyData() - { - delete [] tempArray; - } + protected: + ParallelApply * j; + }; + + template< typename EOT > + class ParallelApply : public Job + { + friend class SendTaskParallelApply; + friend class HandleResponseParallelApply; + friend class ProcessTaskParallelApply; + friend class IsFinishedParallelApply; + + public: + + ParallelApply( + eoUF & _proc, + std::vector& _pop, + AssignmentAlgorithm & algo, + int _masterRank, + const JobStore& store, + // long _maxTime = 0, + int _packetSize = 1 + ) : + Job( algo, _masterRank, store ), + // Job( algo, _masterRank, _maxTime ), + func( _proc ), + data( _pop ), + packetSize( _packetSize ), + index( 0 ), + size( _pop.size() ) + { + if ( _packetSize <= 0 ) + { + throw std::runtime_error("Packet size should not be negative."); + } + tempArray = new EOT [ _packetSize ]; + + dynamic_cast< BaseParallelApply& >( sendTask ).owner( this ); + dynamic_cast< BaseParallelApply& >( handleResponse ).owner( this ); + dynamic_cast< BaseParallelApply& >( processTask ).owner( this ); + dynamic_cast< BaseParallelApply& >( isFinished ).owner( this ); + } + + ~ParallelApply() + { + delete [] tempArray; + } + + protected: std::vector & data; eoUF & func; int index; int size; std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks; - int packetSize; EOT* tempArray; - int masterRank; - bmpi::communicator& comm; + // bmpi::communicator& comm; }; - template - struct SendTaskParallelApply : public SendTaskFunction< Data > + template< class EOT > + class SendTaskParallelApply : public SendTaskFunction, public BaseParallelApply { - SendTaskParallelApply( Data & _d ) - { - data( _d ); - } - - using SharedDataFunction< Data >::d; + public: + using BaseParallelApply::j; // futureIndex, index, packetSize, size, comm, assignedTasks, data void operator()(int wrkRank) { int futureIndex; - if( d.index + d.packetSize < d.size ) + if( j->index + j->packetSize < j->size ) { - futureIndex = d.index + d.packetSize; + futureIndex = j->index + j->packetSize; } else { - futureIndex = d.size; + futureIndex = j->size; } - int sentSize = futureIndex - d.index ; + int sentSize = futureIndex - j->index ; - d.comm.send( wrkRank, 1, sentSize ); + j->comm.send( wrkRank, 1, sentSize ); - eo::log << eo::progress << "Evaluating individual " << d.index << std::endl; + eo::log << eo::progress << "Evaluating individual " << j->index << std::endl; - d.assignedTasks[ wrkRank ].index = d.index; - d.assignedTasks[ wrkRank ].size = sentSize; + j->assignedTasks[ wrkRank ].index = j->index; + j->assignedTasks[ wrkRank ].size = sentSize; - d.comm.send( wrkRank, 1, & (d.data[ index ]) , sentSize ); - d.index = futureIndex; + j->comm.send( wrkRank, 1, & ( (j->data)[ j->index ] ) , sentSize ); + j->index = futureIndex; } }; - template - struct HandleResponseParallelApply : public HandleResponseFunction< Data > + template< class EOT > + class HandleResponseParallelApply : public HandleResponseFunction, public BaseParallelApply { - HandleResponseParallelApply( Data & _d ) - { - data( _d ); - } + public: + using BaseParallelApply::j; - using SharedDataFunction< Data >::d; void operator()(int wrkRank) { - d.comm.recv( wrkRank, 1, & (d.data[ d.assignedTasks[wrkRank].index ] ), d.assignedTasks[wrkRank].size ); + j->comm.recv( wrkRank, 1, & (j->data[ j->assignedTasks[wrkRank].index ] ), j->assignedTasks[wrkRank].size ); } }; - template - struct ProcessTaskParallelApply : public ProcessTaskFunction< Data > + template< class EOT > + class ProcessTaskParallelApply : public ProcessTaskFunction, public BaseParallelApply { - ProcessTaskParallelApply( Data & _d ) - { - data( _d ); - } + public: + using BaseParallelApply::j; - using SharedDataFunction< Data >::d; void operator()() { int recvSize; - d.comm.recv( d.masterRank, 1, recvSize ); - d.comm.recv( d.masterRank, 1, d.tempArray, recvSize ); + + j->comm.recv( j->masterRank, 1, recvSize ); + j->comm.recv( j->masterRank, 1, j->tempArray, recvSize ); timerStat.start("worker_processes"); for( int i = 0; i < recvSize ; ++i ) { - d.func( d.tempArray[ i ] ); + j->func( j->tempArray[ i ] ); } timerStat.stop("worker_processes"); - d.comm.send( d.masterRank, 1, d.tempArray, recvSize ); + j->comm.send( j->masterRank, 1, j->tempArray, recvSize ); } }; - template - struct IsFinishedParallelApply : public IsFinishedFunction< Data > + template< class EOT > + class IsFinishedParallelApply : public IsFinishedFunction, public BaseParallelApply { - IsFinishedParallelApply( Data & _d ) - { - data( _d ); - } + public: + + using BaseParallelApply::j; - using SharedDataFunction< Data >::d; bool operator()() { - return d.index == d.size; + return j->index == j->size; } }; - template< typename Data > - struct ParallelApplyStore : public JobStore< Data > + template< class EOT > + struct ParallelApplyStore : public JobStore { - ParallelApplyStore( Data & data ) + ParallelApplyStore() { - stpa = new SendTaskParallelApply< Data >( data ); - hrpa = new HandleResponseParallelApply< Data >( data ); - ptpa = new ProcessTaskParallelApply< Data >( data ); - ispa = new IsFinishedParallelApply< Data >( data ); + stpa = new SendTaskParallelApply; + hrpa = new HandleResponseParallelApply; + ptpa = new ProcessTaskParallelApply; + ispa = new IsFinishedParallelApply; } ~ParallelApplyStore() @@ -166,45 +199,16 @@ namespace eo delete ispa; } - SendTaskFunction< Data > & sendTask() { return *stpa; } - HandleResponseFunction< Data > & handleResponse() { return *hrpa; } - ProcessTaskFunction< Data > & processTask() { return *ptpa; } - IsFinishedFunction< Data > & isFinished() { return *ispa; } + SendTaskFunction& sendTask() const { return *stpa; } + HandleResponseFunction& handleResponse() const { return *hrpa; } + ProcessTaskFunction& processTask() const { return *ptpa; } + IsFinishedFunction& isFinished() const { return *ispa; } protected: - SendTaskParallelApply< Data >* stpa; - HandleResponseParallelApply< Data >* hrpa; - ProcessTaskParallelApply< Data >* ptpa; - IsFinishedParallelApply< Data >* ispa; - }; - - template< typename EOT > - class ParallelApply : public Job< ParallelApplyData > - { - public: - - ParallelApply( - eoUF & _proc, - std::vector& _pop, - AssignmentAlgorithm & algo, - int _masterRank, - // long _maxTime = 0, - int _packetSize = 1 - ) : - - Job< ParallelApplyData >( algo, _masterRank, ParallelApplyStore< ParallelApplyData >( sharedData ) ), - // Job( algo, _masterRank, _maxTime ), - sharedData( _proc, _pop, _masterRank, _packetSize ) - - { - if ( _packetSize <= 0 ) - { - throw std::runtime_error("Packet size should not be negative."); - } - } - - protected: - ParallelApplyData sharedData; + SendTaskParallelApply* stpa; + HandleResponseParallelApply* hrpa; + ProcessTaskParallelApply* ptpa; + IsFinishedParallelApply* ispa; }; } } diff --git a/eo/test/mpi/parallelApply.cpp b/eo/test/mpi/parallelApply.cpp index 82f4e0e7..18aadd54 100644 --- a/eo/test/mpi/parallelApply.cpp +++ b/eo/test/mpi/parallelApply.cpp @@ -27,7 +27,8 @@ struct Test int main(int argc, char** argv) { // eo::log << eo::setlevel( eo::debug ); - bool launchOnlyOne = false; // Set this to true if you wanna launch only the first test. + eo::log << eo::setlevel( eo::quiet ); + bool launchOnlyOne = false ; // Set this to true if you wanna launch only the first test. Node::init( argc, argv ); @@ -44,6 +45,8 @@ int main(int argc, char** argv) plusOne plusOneInstance; vector< Test > tests; + + ParallelApplyStore store; const int ALL = Node::comm().size(); @@ -109,7 +112,7 @@ int main(int argc, char** argv) for( unsigned int i = 0; i < tests.size(); ++i ) { - ParallelApply job( plusOneInstance, v, *(tests[i].assign), 0, 3 ); + ParallelApply job( plusOneInstance, v, *(tests[i].assign), 0, store, 3 ); if( job.isMaster() ) {