From a7ce5c3ffbc2cb343b192aa370830a59db9ab073 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 27 Jul 2012 15:09:59 +0200 Subject: [PATCH 1/9] eoMpi: memory leak when setting own job functors. Fixed --- eo/src/mpi/eoMpi.h | 51 ++++++++++++++++++++++++++++++++++++--- eo/src/mpi/eoMultiStart.h | 2 +- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h index 92c2ba99b..ae004fa11 100644 --- a/eo/src/mpi/eoMpi.h +++ b/eo/src/mpi/eoMpi.h @@ -447,10 +447,53 @@ namespace eo IsFinishedFunction & isFinished() { return *_iff; } // Setters - void sendTask( SendTaskFunction* stf ) { _stf = stf; } - void handleResponse( HandleResponseFunction* hrf ) { _hrf = hrf; } - void processTask( ProcessTaskFunction* ptf ) { _ptf = ptf; } - void isFinished( IsFinishedFunction* iff ) { _iff = iff; } + void sendTask( SendTaskFunction* stf ) + { + if( !stf ) + return; + + if( _stf && _stf->needDelete() ) + { + delete _stf; + } + _stf = stf; + } + + void handleResponse( HandleResponseFunction* hrf ) + { + if( !hrf ) + return; + + if( _hrf && _hrf->needDelete() ) + { + delete _hrf; + } + _hrf = hrf; + } + + void processTask( ProcessTaskFunction* ptf ) + { + if( !ptf ) + return; + + if( _ptf && _ptf->needDelete() ) + { + delete _ptf; + } + _ptf = ptf; + } + + void isFinished( IsFinishedFunction* iff ) + { + if( !iff ) + return; + + if( _iff && _iff->needDelete() ) + { + delete _iff; + } + _iff = iff; + } /** * @brief Helpers for wrapping send task functor. diff --git a/eo/src/mpi/eoMultiStart.h b/eo/src/mpi/eoMultiStart.h index 619364942..b8455e680 100644 --- a/eo/src/mpi/eoMultiStart.h +++ b/eo/src/mpi/eoMultiStart.h @@ -397,7 +397,7 @@ namespace eo /************************************** * DEFAULT RESET ALGO IMPLEMENTATIONS * - ************************************** + **************************************/ /** * @brief For a Genetic Algorithm, reinits the population by copying the original one From b31c520eba7943c1464f324b554e8174f3c7ff95 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 27 Jul 2012 15:22:23 +0200 Subject: [PATCH 2/9] Removed all remaining warnings at compilation --- eo/src/mpi/eoMultiStart.h | 16 ++++++++++------ eo/src/utils/eoParallel.h | 2 +- eo/test/mpi/t-mpi-multipleRoles.cpp | 4 ++-- eo/test/mpi/t-mpi-parallelApply.cpp | 2 +- eo/test/mpi/t-mpi-wrapper.cpp | 2 +- 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/eo/src/mpi/eoMultiStart.h b/eo/src/mpi/eoMultiStart.h index b8455e680..5be5fd68f 100644 --- a/eo/src/mpi/eoMultiStart.h +++ b/eo/src/mpi/eoMultiStart.h @@ -53,10 +53,14 @@ namespace eo { typedef eoUF< eoPop&, void> ResetAlgo; - MultiStartData( bmpi::communicator& _comm, eoAlgo& _algo, int _masterRank, ResetAlgo & _resetAlgo ) + MultiStartData( + bmpi::communicator& _comm, + eoAlgo& _algo, + int _masterRank, + ResetAlgo & _resetAlgo ) : - runs( 0 ), pop(), bests(), - comm( _comm ), algo( _algo ), masterRank( _masterRank ), resetAlgo( _resetAlgo ) + runs( 0 ), bests(), pop(), + comm( _comm ), algo( _algo ), resetAlgo( _resetAlgo ), masterRank( _masterRank ) { // empty } @@ -245,20 +249,20 @@ namespace eo { _data.runs = runs; - int nbWorkers = workers.size(); + unsigned nbWorkers = workers.size(); std::vector< int > seeds = _getSeeds( nbWorkers ); if( eo::mpi::Node::comm().rank() == _masterRank ) { if( seeds.size() < nbWorkers ) { // Random seeds - for( int i = seeds.size(); i < nbWorkers; ++i ) + for( unsigned i = seeds.size(); i < nbWorkers; ++i ) { seeds.push_back( eo::rng.rand() ); } } - for( int i = 0 ; i < nbWorkers ; ++i ) + for( unsigned i = 0 ; i < nbWorkers ; ++i ) { int wrkRank = workers[i]; eo::mpi::Node::comm().send( wrkRank, 1, seeds[ i ] ); diff --git a/eo/src/utils/eoParallel.h b/eo/src/utils/eoParallel.h index b812fecc9..c7a15a4f9 100644 --- a/eo/src/utils/eoParallel.h +++ b/eo/src/utils/eoParallel.h @@ -68,9 +68,9 @@ private: eoValueParam _isDynamic; eoValueParam _prefix; eoValueParam _nthreads; - eoValueParam _packetSize; eoValueParam _enableResults; eoValueParam _doMeasure; + eoValueParam _packetSize; double _t_start; }; diff --git a/eo/test/mpi/t-mpi-multipleRoles.cpp b/eo/test/mpi/t-mpi-multipleRoles.cpp index 07f65937e..3a525126b 100644 --- a/eo/test/mpi/t-mpi-multipleRoles.cpp +++ b/eo/test/mpi/t-mpi-multipleRoles.cpp @@ -118,7 +118,7 @@ struct Work: public eoUF< SerializableVector< SerializableBase >&, void > { cout << "Work phase..." << endl; subtask( v, Node::comm().rank() ); - for( int i = 0; i < v.size(); ++i ) + for( unsigned i = 0; i < v.size(); ++i ) { v[i] *= 2; } @@ -168,7 +168,7 @@ int main(int argc, char** argv) EmptyJob stop( algo, 0 ); v = metaV[0]; cout << "Results : " << endl; - for(int i = 0; i < v.size(); ++i) + for(unsigned i = 0; i < v.size(); ++i) { cout << v[i] << ' '; } diff --git a/eo/test/mpi/t-mpi-parallelApply.cpp b/eo/test/mpi/t-mpi-parallelApply.cpp index 6cda89907..559505bd1 100644 --- a/eo/test/mpi/t-mpi-parallelApply.cpp +++ b/eo/test/mpi/t-mpi-parallelApply.cpp @@ -198,7 +198,7 @@ int main(int argc, char** argv) // job. EmptyJob stop( *(tests[i].assign), eo::mpi::DEFAULT_MASTER ); ++offset; - for(int i = 0; i < v.size(); ++i) + for(unsigned i = 0; i < v.size(); ++i) { cout << v[i] << ' '; if( originalV[i] + offset != v[i] ) diff --git a/eo/test/mpi/t-mpi-wrapper.cpp b/eo/test/mpi/t-mpi-wrapper.cpp index cedc717fe..7e5fa4d2f 100644 --- a/eo/test/mpi/t-mpi-wrapper.cpp +++ b/eo/test/mpi/t-mpi-wrapper.cpp @@ -116,7 +116,7 @@ int main(int argc, char** argv) if( job.isMaster() ) { ++offset; - for(int i = 0; i < v.size(); ++i) + for(unsigned i = 0; i < v.size(); ++i) { cout << v[i] << ' '; if( originalV[i] + offset != v[i] ) From d48f045451460dbf25cb8d01f393bebe7627e66b Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 2 Aug 2012 16:46:00 +0200 Subject: [PATCH 3/9] MPI: Commands channel is now used only for job level informations, messages channels for data used in a job. --- eo/src/mpi/eoMpi.h | 4 ++-- eo/src/mpi/eoMultiStart.h | 8 ++++---- eo/src/mpi/eoParallelApply.h | 12 ++++++------ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h index ae004fa11..1fe158229 100644 --- a/eo/src/mpi/eoMpi.h +++ b/eo/src/mpi/eoMpi.h @@ -668,7 +668,7 @@ namespace eo timerStat.start("master_wait_for_all_responses"); while( assignmentAlgo.availableWorkers() != totalWorkers ) { - bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); + bmpi::status status = comm.probe( bmpi::any_source, eo::mpi::Channel::Messages ); int wrkRank = status.source(); that.handleResponse( wrkRank ); comm.send( wrkRank, Channel::Commands, Message::Finish ); @@ -716,7 +716,7 @@ namespace eo # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl; # endif - bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); + bmpi::status status = comm.probe( bmpi::any_source, eo::mpi::Channel::Messages ); int wrkRank = status.source(); # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl; diff --git a/eo/src/mpi/eoMultiStart.h b/eo/src/mpi/eoMultiStart.h index 5be5fd68f..e4665c517 100644 --- a/eo/src/mpi/eoMultiStart.h +++ b/eo/src/mpi/eoMultiStart.h @@ -138,7 +138,7 @@ namespace eo { EOT individual; MultiStartData< EOT >& d = *_data; - d.comm.recv( wrkRank, 1, individual ); + d.comm.recv( wrkRank, eo::mpi::Channel::Messages, individual ); d.bests.push_back( individual ); } }; @@ -159,7 +159,7 @@ namespace eo { _data->resetAlgo( _data->pop ); _data->algo( _data->pop ); - _data->comm.send( _data->masterRank, 1, _data->pop.best_element() ); + _data->comm.send( _data->masterRank, eo::mpi::Channel::Messages, _data->pop.best_element() ); } }; @@ -265,12 +265,12 @@ namespace eo for( unsigned i = 0 ; i < nbWorkers ; ++i ) { int wrkRank = workers[i]; - eo::mpi::Node::comm().send( wrkRank, 1, seeds[ i ] ); + eo::mpi::Node::comm().send( wrkRank, eo::mpi::Channel::Commands, seeds[ i ] ); } } else { int seed; - eo::mpi::Node::comm().recv( _masterRank, 1, seed ); + eo::mpi::Node::comm().recv( _masterRank, eo::mpi::Channel::Commands, seed ); eo::log << eo::debug << eo::mpi::Node::comm().rank() << "- Seed: " << seed << std::endl; eo::rng.reseed( seed ); } diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 6b58aaf1b..8e379fce3 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -173,14 +173,14 @@ namespace eo int sentSize = futureIndex - _data->index ; - _data->comm.send( wrkRank, 1, sentSize ); + _data->comm.send( wrkRank, eo::mpi::Channel::Messages, sentSize ); eo::log << eo::progress << "Evaluating individual " << _data->index << std::endl; _data->assignedTasks[ wrkRank ].index = _data->index; _data->assignedTasks[ wrkRank ].size = sentSize; - _data->comm.send( wrkRank, 1, & ( (_data->table())[ _data->index ] ) , sentSize ); + _data->comm.send( wrkRank, eo::mpi::Channel::Messages, & ( (_data->table())[ _data->index ] ) , sentSize ); _data->index = futureIndex; } }; @@ -203,7 +203,7 @@ namespace eo void operator()(int wrkRank) { - _data->comm.recv( wrkRank, 1, & (_data->table()[ _data->assignedTasks[wrkRank].index ] ), _data->assignedTasks[wrkRank].size ); + _data->comm.recv( wrkRank, eo::mpi::Channel::Messages, & (_data->table()[ _data->assignedTasks[wrkRank].index ] ), _data->assignedTasks[wrkRank].size ); } }; @@ -230,16 +230,16 @@ namespace eo { int recvSize; - _data->comm.recv( _data->masterRank, 1, recvSize ); + _data->comm.recv( _data->masterRank, eo::mpi::Channel::Messages, recvSize ); _data->tempArray.resize( recvSize ); - _data->comm.recv( _data->masterRank, 1, & _data->tempArray[0] , recvSize ); + _data->comm.recv( _data->masterRank, eo::mpi::Channel::Messages, & _data->tempArray[0] , recvSize ); timerStat.start("worker_processes"); for( int i = 0; i < recvSize ; ++i ) { _data->func( _data->tempArray[ i ] ); } timerStat.stop("worker_processes"); - _data->comm.send( _data->masterRank, 1, & _data->tempArray[0], recvSize ); + _data->comm.send( _data->masterRank, eo::mpi::Channel::Messages, & _data->tempArray[0], recvSize ); } }; From 0dca473aac7581aaa36a4c458840541e25f92eee Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 3 Aug 2012 11:42:44 +0200 Subject: [PATCH 4/9] MPI Multistart: using pop_eval functions instead of eval functions for resetters. --- eo/src/mpi/eoMultiStart.h | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/eo/src/mpi/eoMultiStart.h b/eo/src/mpi/eoMultiStart.h index e4665c517..095bc4468 100644 --- a/eo/src/mpi/eoMultiStart.h +++ b/eo/src/mpi/eoMultiStart.h @@ -419,7 +419,19 @@ namespace eo eoEvalFunc& eval) : _continuator( continuator ), _originalPop( originalPop ), - _eval( eval ) + _pop_eval( eval ) + { + // empty + } + + ReuseOriginalPopEA( + eoCountContinue & continuator, + const eoPop& originalPop, + eoPopEvalFunc& pop_eval + ) : + _continuator( continuator ), + _originalPop( originalPop ), + _pop_eval( pop_eval ) { // empty } @@ -427,17 +439,14 @@ namespace eo void operator()( eoPop& pop ) { pop = _originalPop; // copies the original population - for(unsigned i = 0, size = pop.size(); i < size; ++i) - { - _eval( pop[i] ); - } + _pop_eval( pop, pop ); _continuator.reset(); } private: eoCountContinue & _continuator; const eoPop& _originalPop; - eoEvalFunc& _eval; + eoPopEvalFunc& _pop_eval; }; /** @@ -469,6 +478,18 @@ namespace eo } } + ReuseSamePopEA( + eoCountContinue& continuator, + const eoPop& originalPop, + eoPopEvalFunc& pop_eval + ) : + _continuator( continuator ), + _originalPop( originalPop ), + _firstTime( true ) + { + pop_eval( _originalPop, _originalPop ); + } + void operator()( eoPop& pop ) { if( _firstTime ) From d8edf161890904a4664ac3c8573d91c2c0975dfa Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 3 Aug 2012 16:40:55 +0200 Subject: [PATCH 5/9] MPI: Show debug messages into log even in release mode. --- eo/src/mpi/eoMpi.h | 55 +++++++++++++----------------------- eo/src/mpi/eoParallelApply.h | 2 +- 2 files changed, 21 insertions(+), 36 deletions(-) diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h index 1fe158229..003373aee 100644 --- a/eo/src/mpi/eoMpi.h +++ b/eo/src/mpi/eoMpi.h @@ -648,10 +648,8 @@ namespace eo ~FinallyBlock() { -# ifndef NDEBUG - eo::log << eo::debug; - eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl; -# endif + eo::log << eo::debug << "[M" << comm.rank() << "] Frees all the idle." << std::endl; + // frees all the idle workers timerStat.start("master_wait_for_idles"); std::vector idles = assignmentAlgo.idles(); @@ -661,9 +659,8 @@ namespace eo } timerStat.stop("master_wait_for_idles"); -# ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl; -# endif + eo::log << eo::debug << "[M" << comm.rank() << "] Waits for all responses." << std::endl; + // wait for all responses timerStat.start("master_wait_for_all_responses"); while( assignmentAlgo.availableWorkers() != totalWorkers ) @@ -675,9 +672,8 @@ namespace eo assignmentAlgo.confirm( wrkRank ); } timerStat.stop("master_wait_for_all_responses"); -# ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; -# endif + + eo::log << eo::debug << "[M" << comm.rank() << "] Leaving master task." << std::endl; } protected: @@ -701,10 +697,8 @@ namespace eo void master( ) { int totalWorkers = assignmentAlgo.availableWorkers(); -# ifndef NDEBUG - eo::log << eo::debug; - eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl; -# endif + eo::log << eo::debug << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl; + try { FinallyBlock finally( totalWorkers, assignmentAlgo, *this ); while( ! isFinished() ) @@ -713,22 +707,20 @@ namespace eo int assignee = assignmentAlgo.get( ); while( assignee <= 0 ) { -# ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl; -# endif + eo::log << eo::debug << "[M" << comm.rank() << "] Waitin' for node..." << std::endl; + bmpi::status status = comm.probe( bmpi::any_source, eo::mpi::Channel::Messages ); int wrkRank = status.source(); -# ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl; -# endif + + eo::log << eo::debug << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl; + handleResponse( wrkRank ); assignmentAlgo.confirm( wrkRank ); assignee = assignmentAlgo.get( ); } timerStat.stop("master_wait_for_assignee"); -# ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl; -# endif + + eo::log << eo::debug << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl; timerStat.start("master_wait_for_send"); comm.send( assignee, Channel::Commands, Message::Continue ); @@ -752,29 +744,22 @@ namespace eo void worker( ) { int order; -# ifndef NDEBUG - eo::log << eo::debug; -# endif + timerStat.start("worker_wait_for_order"); comm.recv( masterRank, Channel::Commands, order ); timerStat.stop("worker_wait_for_order"); while( true ) { -# ifndef NDEBUG - eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl; -# endif + eo::log << eo::debug << "[W" << comm.rank() << "] Waiting for an order..." << std::endl; + if ( order == workerStopCondition ) { -# ifndef NDEBUG - eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl; -# endif + eo::log << eo::debug << "[W" << comm.rank() << "] Leaving worker task." << std::endl; return; } else if( order == Message::Continue ) { -# ifndef NDEBUG - eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl; -# endif + eo::log << eo::debug << "[W" << comm.rank() << "] Processing task..." << std::endl; processTask( ); } diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 8e379fce3..671c3843f 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -175,7 +175,7 @@ namespace eo _data->comm.send( wrkRank, eo::mpi::Channel::Messages, sentSize ); - eo::log << eo::progress << "Evaluating individual " << _data->index << std::endl; + eo::log << eo::debug << "Evaluating individual " << _data->index << std::endl; _data->assignedTasks[ wrkRank ].index = _data->index; _data->assignedTasks[ wrkRank ].size = sentSize; From 026764215ffed43ee72e65b9ab5545ed73a1a28c Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 3 Aug 2012 16:41:23 +0200 Subject: [PATCH 6/9] MPI: const correctness in Static Assignment constructor and reinit functions. --- eo/src/mpi/eoMpiAssignmentAlgorithm.cpp | 4 ++-- eo/src/mpi/eoMpiAssignmentAlgorithm.h | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/eo/src/mpi/eoMpiAssignmentAlgorithm.cpp b/eo/src/mpi/eoMpiAssignmentAlgorithm.cpp index 3e7ff0251..39ce02f18 100644 --- a/eo/src/mpi/eoMpiAssignmentAlgorithm.cpp +++ b/eo/src/mpi/eoMpiAssignmentAlgorithm.cpp @@ -99,7 +99,7 @@ namespace eo * STATIC ASSIGNMENT ALGORITHM ************************** *******************************************************/ - StaticAssignmentAlgorithm::StaticAssignmentAlgorithm( std::vector& workers, int runs ) + StaticAssignmentAlgorithm::StaticAssignmentAlgorithm( const std::vector& workers, int runs ) { init( workers, runs ); } @@ -138,7 +138,7 @@ namespace eo init( workers, runs ); } - void StaticAssignmentAlgorithm::init( std::vector & workers, int runs ) + void StaticAssignmentAlgorithm::init( const std::vector & workers, int runs ) { unsigned int nbWorkers = workers.size(); freeWorkers = nbWorkers; diff --git a/eo/src/mpi/eoMpiAssignmentAlgorithm.h b/eo/src/mpi/eoMpiAssignmentAlgorithm.h index 3389db684..abe465b8b 100644 --- a/eo/src/mpi/eoMpiAssignmentAlgorithm.h +++ b/eo/src/mpi/eoMpiAssignmentAlgorithm.h @@ -177,7 +177,7 @@ namespace eo * @param workers std::vector of MPI ranks of workers which will be used. * @param runs Fixed amount of runs, strictly positive. */ - StaticAssignmentAlgorithm( std::vector& workers, int runs ); + StaticAssignmentAlgorithm( const std::vector& workers, int runs ); /** * @brief Uses a range of workers. @@ -215,7 +215,7 @@ namespace eo * @param workers Vector of hosts' ranks * @param runs Fixed amount of runs, strictly positive. */ - void init( std::vector & workers, int runs ); + void init( const std::vector & workers, int runs ); public: int get( ); From 3e449c9df9df01fce131de984fc5a6d0e00c9f24 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 3 Aug 2012 16:41:42 +0200 Subject: [PATCH 7/9] MPI: removed unused parameter seeds in MultiStart ctor. --- eo/src/mpi/eoMultiStart.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/eo/src/mpi/eoMultiStart.h b/eo/src/mpi/eoMultiStart.h index 095bc4468..23a8cc04a 100644 --- a/eo/src/mpi/eoMultiStart.h +++ b/eo/src/mpi/eoMultiStart.h @@ -302,8 +302,7 @@ namespace eo int masterRank, MultiStartStore< EOT > & store, // dynamic parameters - int runs, - const std::vector& seeds = std::vector() ) : + int runs ) : OneShotJob< MultiStartData< EOT > >( algo, masterRank, store ) { store.init( algo.idles(), runs ); From c50eadc891a7fa986752de6d9e9388cd120ec83b Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 6 Aug 2012 15:28:10 +0200 Subject: [PATCH 8/9] MPI: added comments for use of eo::mpi::Channel::Messages. --- eo/src/mpi/eoMpi.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h index 003373aee..a47ce8a33 100644 --- a/eo/src/mpi/eoMpi.h +++ b/eo/src/mpi/eoMpi.h @@ -314,6 +314,9 @@ namespace eo * This is a functor implementing void operator()(int), and also a shared data function, containing wrapper on * its own type. * + * The master has to receive worker's data on channel (= MPI tag) eo::mpi::Channel::Messages. No other tags are + * allowed. + * * @ingroup MPI */ template< typename JobData > @@ -335,7 +338,8 @@ namespace eo * This is where the real computation happen. * Whenever the master sends the command "Continue" to workers, which indicates the worker will receive a task, * the worker calls this functor. The user has to explicitly retrieve the data, handle it and transmit it, - * processed, back to the master. If the worker does not send any data back to the master, the latter will + * processed, back to the master. Data sent back needs to be transmitted via channel (= MPI tag) + * eo::mpi::Channel::Messages, and no one else. If the worker does not send any data back to the master, the latter will * consider the worker isn't done and a deadlock could occur. * * This is a functor implementing void operator()(), and also a shared data function, containing wrapper on its From 9ea12568b271c998ecda345d49043b542d0c9edb Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 10 Aug 2012 11:30:25 +0200 Subject: [PATCH 9/9] MPI: first version of distribution of time sleep experiment. --- eo/test/mpi/CMakeLists.txt | 1 + eo/test/mpi/t-mpi-distrib-exp.cpp | 135 ++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 eo/test/mpi/t-mpi-distrib-exp.cpp diff --git a/eo/test/mpi/CMakeLists.txt b/eo/test/mpi/CMakeLists.txt index d89f3ab3f..fe7ecb0fd 100644 --- a/eo/test/mpi/CMakeLists.txt +++ b/eo/test/mpi/CMakeLists.txt @@ -32,6 +32,7 @@ SET (TEST_LIST t-mpi-multipleRoles t-mpi-eval t-mpi-multistart + t-mpi-distrib-exp ) FOREACH (test ${TEST_LIST}) diff --git a/eo/test/mpi/t-mpi-distrib-exp.cpp b/eo/test/mpi/t-mpi-distrib-exp.cpp new file mode 100644 index 000000000..4fad9d44c --- /dev/null +++ b/eo/test/mpi/t-mpi-distrib-exp.cpp @@ -0,0 +1,135 @@ +# include // usleep + +# include +# include +# include + +# include + +# include +# include "../test/mpi/t-mpi-common.h" + +using namespace eo::mpi; + +typedef SerializableBase type; + +struct Wait : public eoUF< type &, void > +{ + void operator()( type & milliseconds ) + { + std::cout << "Sleeping for " << milliseconds << "ms..." << std::endl; + // usleep takes an input in microseconds + usleep( milliseconds * 1000 ); + } +} wait; + +class Distribution : public std::vector< type > +{ + public: + + /** + * @brief Really fills the vector with the distribution values. + */ + void fill( unsigned size ) + { + for( unsigned i = 0; i < size; ++i ) + { + push_back( next_element() ); + } + } + + /** + * @brief Returns the next element of the distribution to put in the + * vector. + * + * @returns Number of milliseconds to wait + */ + virtual int next_element() = 0; + + // Idea for function name: enlarge_your_parser + /** + * @brief Creates params and retrieves values from parser + */ + virtual void make_parser( eoParser & parser ) = 0; + + /** + * @brief Returns true if this distribution has been activated by the + * command line. + * + * Serves to main program to check if at least one distribution has been + * activated. + */ + bool isActive() { return _active; } + + protected: + + bool _active; +}; + +class UniformDistribution : public Distribution +{ + public: + + UniformDistribution() : _rng(0) + { + // empty + } + + void make_parser( eoParser & parser ) + { + _active = parser.createParam( false, "uniform", "Uniform distribution", '\0', "Uniform").value(); + _min = parser.createParam( 0.0, "uniform-min", "Minimum for uniform distribution", '\0', "Uniform").value(); + _max = parser.createParam( 1.0, "uniform-max", "Maximum for uniform distribution", '\0', "Uniform").value(); + } + + int next_element() + { + return std::floor( 1000. * _rng.uniform( _min, _max ) ); + } + + protected: + + eoRng _rng; + + double _min; + double _max; + +} uniformDistribution; + +int main( int argc, char** argv ) +{ + Node::init( argc, argv ); + eoParser parser( argc, argv ); + + // TODO for each available distribution, check if activated. + // If no distribution is activated, show an error message + // If two distributions or more are activated, show an error message + // Otherwise, use the activated distribution as distrib + Distribution & distrib = uniformDistribution; + // Make parser of distribution here + distrib.make_parser( parser ); + + unsigned size = parser.createParam( 10U, "size", "Number of elements to distribute.", 's', "Distribution").value(); + unsigned packet_size = parser.createParam( 1U, "packet_size", "Number of elements to distribute at each time for a single worker.", 'p', "Parallelization").value(); + + make_parallel( parser ); + make_help( parser ); + + ParallelApplyStore< type> store( wait, DEFAULT_MASTER, packet_size ); + + // Fill distribution + distrib.fill( size ); + store.data( distrib ); + + DynamicAssignmentAlgorithm scheduling; + ParallelApply< type > job( scheduling, DEFAULT_MASTER, store ); + + job.run(); + + if( job.isMaster() ) + { + EmptyJob( scheduling, DEFAULT_MASTER ); // to terminate parallel apply + } + + return 0; +}