From ff61676fb7ac5e17e625202732bde8773173ba48 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 2 Jul 2012 17:53:02 +0200 Subject: [PATCH] Cleaner version of functional parallel job. --- eo/src/mpi/eoMpi.h | 80 +++++++++++++--- eo/src/mpi/eoParallelApply.h | 173 ++++++++++++++++------------------ eo/test/mpi/parallelApply.cpp | 8 +- 3 files changed, 154 insertions(+), 107 deletions(-) diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h index 433e4c27..9ad2bd7b 100644 --- a/eo/src/mpi/eoMpi.h +++ b/eo/src/mpi/eoMpi.h @@ -33,42 +33,91 @@ namespace eo const int Finish = 1; } - class SendTaskFunction : public eoUF + template< typename JobData, typename Wrapped > + struct SharedDataFunction + { + SharedDataFunction( Wrapped * w ) + { + wrapped = w; + } + + void data( JobData* _d ) + { + d = _d; + } + + protected: + JobData* d; + Wrapped* wrapped; + }; + + template< typename JobData > + struct SendTaskFunction : public eoUF, public SharedDataFunction< JobData, SendTaskFunction > { public: + + SendTaskFunction( SendTaskFunction* w = 0 ) : SharedDataFunction >( w ) + { + // empty + } + virtual ~SendTaskFunction() {} }; - class HandleResponseFunction : public eoUF + template< typename JobData > + struct HandleResponseFunction : public eoUF, public SharedDataFunction< JobData, HandleResponseFunction > { public: + + HandleResponseFunction( HandleResponseFunction* w = 0 ) : SharedDataFunction >( w ) + { + // empty + } + virtual ~HandleResponseFunction() {} }; - class ProcessTaskFunction : public eoF + template< typename JobData > + struct ProcessTaskFunction : public eoF, public SharedDataFunction< JobData, ProcessTaskFunction > { public: + + ProcessTaskFunction( ProcessTaskFunction* w = 0 ) : SharedDataFunction >( w ) + { + // empty + } + virtual ~ProcessTaskFunction() {} }; - class IsFinishedFunction : public eoF + template< typename JobData > + struct IsFinishedFunction : public eoF, public SharedDataFunction< JobData, IsFinishedFunction > { public: + + IsFinishedFunction( IsFinishedFunction* w = 0 ) : SharedDataFunction >( w ) + { + // empty + } + virtual ~IsFinishedFunction() {} }; + template< typename JobData > struct JobStore { - virtual SendTaskFunction & sendTask() const = 0; - virtual HandleResponseFunction & handleResponse() const = 0; - virtual ProcessTaskFunction & processTask() const = 0; - virtual IsFinishedFunction & isFinished() const = 0; + virtual SendTaskFunction & sendTask() const = 0; + virtual HandleResponseFunction & handleResponse() const = 0; + virtual ProcessTaskFunction & processTask() const = 0; + virtual IsFinishedFunction & isFinished() const = 0; + virtual JobData* data() = 0; }; + template< class JobData > class Job { public: - Job( AssignmentAlgorithm& _algo, int _masterRank, const JobStore & store ) : + Job( AssignmentAlgorithm& _algo, int _masterRank, JobStore & store ) : // Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) : assignmentAlgo( _algo ), comm( Node::comm() ), @@ -81,6 +130,11 @@ namespace eo isFinished( store.isFinished() ) { _isMaster = Node::comm().rank() == _masterRank; + + sendTask.data( store.data() ); + handleResponse.data( store.data() ); + processTask.data( store.data() ); + isFinished.data( store.data() ); } /* @@ -94,10 +148,10 @@ namespace eo protected: - SendTaskFunction & sendTask; - HandleResponseFunction & handleResponse; - ProcessTaskFunction & processTask; - IsFinishedFunction & isFinished; + SendTaskFunction & sendTask; + HandleResponseFunction & handleResponse; + ProcessTaskFunction & processTask; + IsFinishedFunction & isFinished; void master( ) { diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 9cc7bb5f..bc85d438 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -17,79 +17,29 @@ namespace eo }; template - class SendTaskParallelApply; - - template - class HandleResponseParallelApply; - - template - class ProcessTaskParallelApply; - - template - class IsFinishedParallelApply; - - template - class ParallelApply; - - template< class EOT > - class BaseParallelApply + struct JobData { - public: - void owner(ParallelApply * job) - { - j = job; - } - - protected: - ParallelApply * j; - }; - - template< typename EOT > - class ParallelApply : public Job - { - friend class SendTaskParallelApply; - friend class HandleResponseParallelApply; - friend class ProcessTaskParallelApply; - friend class IsFinishedParallelApply; - - public: - - ParallelApply( + JobData( eoUF & _proc, std::vector& _pop, - AssignmentAlgorithm & algo, int _masterRank, - const JobStore& store, // long _maxTime = 0, - int _packetSize = 1 - ) : - Job( algo, _masterRank, store ), - // Job( algo, _masterRank, _maxTime ), - func( _proc ), - data( _pop ), - packetSize( _packetSize ), - index( 0 ), - size( _pop.size() ) + int _packetSize + ) : + data( _pop ), func( _proc ), index( 0 ), size( _pop.size() ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() ) { if ( _packetSize <= 0 ) { throw std::runtime_error("Packet size should not be negative."); } - tempArray = new EOT [ _packetSize ]; - - dynamic_cast< BaseParallelApply& >( sendTask ).owner( this ); - dynamic_cast< BaseParallelApply& >( handleResponse ).owner( this ); - dynamic_cast< BaseParallelApply& >( processTask ).owner( this ); - dynamic_cast< BaseParallelApply& >( isFinished ).owner( this ); + tempArray = new EOT[ _packetSize ]; } - ~ParallelApply() + ~JobData() { delete [] tempArray; } - protected: - std::vector & data; eoUF & func; int index; @@ -98,92 +48,131 @@ namespace eo int packetSize; EOT* tempArray; - // bmpi::communicator& comm; + int masterRank; + bmpi::communicator& comm; }; - template< class EOT > - class SendTaskParallelApply : public SendTaskFunction, public BaseParallelApply + /* + template< typename EOT > + class ParallelApply : public Job< JobData > { public: - using BaseParallelApply::j; + + ParallelApply( + // eoUF & _proc, + // std::vector& _pop, + AssignmentAlgorithm & algo, + int _masterRank, + const JobStore< JobData >& store + // long _maxTime = 0, + // int _packetSize = 1 + ) : + Job( algo, _masterRank, store ) + // Job( algo, _masterRank, _maxTime ), + func( _proc ), + data( _pop ), + packetSize( _packetSize ), + index( 0 ), + size( _pop.size() ) + { + // empty + } + + protected: + + // bmpi::communicator& comm; + }; + */ + + template< class EOT > + class SendTaskParallelApply : public SendTaskFunction< JobData > + { + public: + using SendTaskFunction< JobData >::d; // futureIndex, index, packetSize, size, comm, assignedTasks, data void operator()(int wrkRank) { int futureIndex; - if( j->index + j->packetSize < j->size ) + if( d->index + d->packetSize < d->size ) { - futureIndex = j->index + j->packetSize; + futureIndex = d->index + d->packetSize; } else { - futureIndex = j->size; + futureIndex = d->size; } - int sentSize = futureIndex - j->index ; + int sentSize = futureIndex - d->index ; - j->comm.send( wrkRank, 1, sentSize ); + d->comm.send( wrkRank, 1, sentSize ); - eo::log << eo::progress << "Evaluating individual " << j->index << std::endl; + eo::log << eo::progress << "Evaluating individual " << d->index << std::endl; - j->assignedTasks[ wrkRank ].index = j->index; - j->assignedTasks[ wrkRank ].size = sentSize; + d->assignedTasks[ wrkRank ].index = d->index; + d->assignedTasks[ wrkRank ].size = sentSize; - j->comm.send( wrkRank, 1, & ( (j->data)[ j->index ] ) , sentSize ); - j->index = futureIndex; + d->comm.send( wrkRank, 1, & ( (d->data)[ d->index ] ) , sentSize ); + d->index = futureIndex; } }; template< class EOT > - class HandleResponseParallelApply : public HandleResponseFunction, public BaseParallelApply + class HandleResponseParallelApply : public HandleResponseFunction< JobData > { public: - using BaseParallelApply::j; + using HandleResponseFunction< JobData >::d; void operator()(int wrkRank) { - j->comm.recv( wrkRank, 1, & (j->data[ j->assignedTasks[wrkRank].index ] ), j->assignedTasks[wrkRank].size ); + d->comm.recv( wrkRank, 1, & (d->data[ d->assignedTasks[wrkRank].index ] ), d->assignedTasks[wrkRank].size ); } }; template< class EOT > - class ProcessTaskParallelApply : public ProcessTaskFunction, public BaseParallelApply + class ProcessTaskParallelApply : public ProcessTaskFunction< JobData > { public: - using BaseParallelApply::j; + using ProcessTaskFunction< JobData >::d; void operator()() { int recvSize; - j->comm.recv( j->masterRank, 1, recvSize ); - j->comm.recv( j->masterRank, 1, j->tempArray, recvSize ); + d->comm.recv( d->masterRank, 1, recvSize ); + d->comm.recv( d->masterRank, 1, d->tempArray, recvSize ); timerStat.start("worker_processes"); for( int i = 0; i < recvSize ; ++i ) { - j->func( j->tempArray[ i ] ); + d->func( d->tempArray[ i ] ); } timerStat.stop("worker_processes"); - j->comm.send( j->masterRank, 1, j->tempArray, recvSize ); + d->comm.send( d->masterRank, 1, d->tempArray, recvSize ); } }; template< class EOT > - class IsFinishedParallelApply : public IsFinishedFunction, public BaseParallelApply + class IsFinishedParallelApply : public IsFinishedFunction< JobData > { public: - - using BaseParallelApply::j; + using IsFinishedFunction< JobData >::d; bool operator()() { - return j->index == j->size; + return d->index == d->size; } }; template< class EOT > - struct ParallelApplyStore : public JobStore + struct ParallelApplyStore : public JobStore< JobData > { - ParallelApplyStore() + ParallelApplyStore( + eoUF & _proc, + std::vector& _pop, + int _masterRank, + // long _maxTime = 0, + int _packetSize = 1 + ) + : j( _proc, _pop, _masterRank, _packetSize ) { stpa = new SendTaskParallelApply; hrpa = new HandleResponseParallelApply; @@ -199,16 +188,20 @@ namespace eo delete ispa; } - SendTaskFunction& sendTask() const { return *stpa; } - HandleResponseFunction& handleResponse() const { return *hrpa; } - ProcessTaskFunction& processTask() const { return *ptpa; } - IsFinishedFunction& isFinished() const { return *ispa; } + SendTaskFunction< JobData >& sendTask() const { return *stpa; } + HandleResponseFunction< JobData >& handleResponse() const { return *hrpa; } + ProcessTaskFunction< JobData >& processTask() const { return *ptpa; } + IsFinishedFunction< JobData >& isFinished() const { return *ispa; } + + JobData* data() { return &j; } protected: SendTaskParallelApply* stpa; HandleResponseParallelApply* hrpa; ProcessTaskParallelApply* ptpa; IsFinishedParallelApply* ispa; + + JobData j; }; } } diff --git a/eo/test/mpi/parallelApply.cpp b/eo/test/mpi/parallelApply.cpp index 18aadd54..4e331334 100644 --- a/eo/test/mpi/parallelApply.cpp +++ b/eo/test/mpi/parallelApply.cpp @@ -38,7 +38,7 @@ int main(int argc, char** argv) { v.push_back( rand() ); } - + int offset = 0; vector originalV = v; @@ -46,8 +46,6 @@ int main(int argc, char** argv) vector< Test > tests; - ParallelApplyStore store; - const int ALL = Node::comm().size(); Test tIntervalStatic; @@ -112,7 +110,9 @@ int main(int argc, char** argv) for( unsigned int i = 0; i < tests.size(); ++i ) { - ParallelApply job( plusOneInstance, v, *(tests[i].assign), 0, store, 3 ); + // ParallelApply job( plusOneInstance, v, *(tests[i].assign), 0, store, 3 ); + ParallelApplyStore< int > store( plusOneInstance, v, 0, 3 ); + Job< JobData > job( *(tests[i].assign), 0, store ); if( job.isMaster() ) {