From f4d8b43f7d15e923acdc59eb7e64590757e2a421 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 25 Jun 2012 13:41:48 +0200 Subject: [PATCH] ParallelApply can now handle many data at a time. --- eo/src/mpi/eoParallelApply.h | 64 ++++++++++++++++++++++++++++------- eo/test/mpi/multipleRoles.cpp | 4 +-- eo/test/mpi/parallelApply.cpp | 5 +-- 3 files changed, 57 insertions(+), 16 deletions(-) diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 0c4f1e6e..e401fda6 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -6,39 +6,76 @@ # include # include + template< typename EOT > class ParallelApply : public MpiJob { + private: + struct ParallelApplyAssignment + { + int index; + int size; + }; public: - ParallelApply( eoUF & _proc, std::vector& _pop, AssignmentAlgorithm & algo, int _masterRank ) : + ParallelApply( + eoUF & _proc, + std::vector& _pop, + AssignmentAlgorithm & algo, + int _masterRank, + int _packetSize = 1 + ) : MpiJob( algo, _masterRank ), func( _proc ), index( 0 ), size( _pop.size() ), - data( _pop ) + data( _pop ), + packetSize( _packetSize ) { - // empty + tempArray = new EOT[ packetSize ]; + } + + ~ParallelApply() + { + delete [] tempArray; } virtual void sendTask( int wrkRank ) { - assignedTasks[ wrkRank ] = index; - comm.send( wrkRank, 1, data[ index ] ); - ++index; + int futureIndex; + + if( index + packetSize < size ) + { + futureIndex = index + packetSize; + } else { + futureIndex = size; + } + + int sentSize = futureIndex - index ; + comm.send( wrkRank, 1, sentSize ); + + assignedTasks[ wrkRank ].index = index; + assignedTasks[ wrkRank ].size = sentSize; + + comm.send( wrkRank, 1, &data[ index ] , sentSize ); + index = futureIndex; } virtual void handleResponse( int wrkRank ) { - comm.recv( wrkRank, 1, data[ assignedTasks[ wrkRank ] ] ); + comm.recv( wrkRank, 1, &data[ assignedTasks[wrkRank].index ], assignedTasks[wrkRank].size ); } virtual void processTask( ) { - EOT ind; - comm.recv( masterRank, 1, ind ); - func( ind ); - comm.send( masterRank, 1, ind ); + int recvSize; + comm.recv( masterRank, 1, recvSize ); + comm.recv( masterRank, 1, tempArray, recvSize ); + for( int i = 0; i < recvSize ; ++i ) + { + func( tempArray[ i ] ); + } + comm.send( masterRank, 1, tempArray, recvSize ); } bool isFinished() @@ -51,7 +88,10 @@ class ParallelApply : public MpiJob eoUF& func; int index; int size; - std::map< int /* worker rank */, int /* index in vector */> assignedTasks; + std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks; + + int packetSize; + EOT* tempArray; }; # endif // __EO_PARALLEL_APPLY_H__ diff --git a/eo/test/mpi/multipleRoles.cpp b/eo/test/mpi/multipleRoles.cpp index d855c998..ab802073 100644 --- a/eo/test/mpi/multipleRoles.cpp +++ b/eo/test/mpi/multipleRoles.cpp @@ -10,8 +10,8 @@ using namespace std; // Role map // 0 : general master -// 1 : worker of general job, master of subjob -// 2 and more : workers of subjob +// 1, 2 : worker of general job, master of subjob +// 3 to 7 : workers of subjob struct SubWork: public eoUF< int&, void > { diff --git a/eo/test/mpi/parallelApply.cpp b/eo/test/mpi/parallelApply.cpp index d9577121..69b01e48 100644 --- a/eo/test/mpi/parallelApply.cpp +++ b/eo/test/mpi/parallelApply.cpp @@ -24,8 +24,9 @@ struct Test // These tests require at least 3 processes to be launched. int main(int argc, char** argv) { - eo::log << eo::setlevel( eo::debug ); - bool launchOnlyOne = true; + // eo::log << eo::setlevel( eo::debug ); + bool launchOnlyOne = false; // Set this to true if you wanna launch only the first test. + MpiNode::init( argc, argv ); vector v;