From 7b399aa1ddfe72566c89bb5276f29705c6c2b9b7 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 26 Jun 2012 17:53:32 +0200 Subject: [PATCH] Putting time conditions into eo::mpi::Job and MultiParallelApply, which doesn't prefigure about number of workers evaluations. --- eo/src/apply.h | 7 ++--- eo/src/eoPopEvalFunc.h | 9 ++++--- eo/src/mpi/eoMpi.h | 34 ++++++++++++++++++++--- eo/src/mpi/eoMultiParallelApply.h | 45 +++++++++++++++++++++++++++++++ eo/src/mpi/eoParallelApply.h | 12 ++++++--- eo/src/mpi/eoTerminateJob.h | 42 +++++++++++++++++++++++++++++ 6 files changed, 136 insertions(+), 13 deletions(-) create mode 100644 eo/src/mpi/eoMultiParallelApply.h create mode 100644 eo/src/mpi/eoTerminateJob.h diff --git a/eo/src/apply.h b/eo/src/apply.h index c3c0365c..7e59c21b 100644 --- a/eo/src/apply.h +++ b/eo/src/apply.h @@ -35,7 +35,7 @@ # ifdef WITH_MPI # include -# include +# include # endif // WITH_MPI /** @@ -91,9 +91,10 @@ void parallelApply( std::vector& _pop, eo::mpi::AssignmentAlgorithm& _algo, int _masterRank, - int _packetSize) + int _packetSize, + int _maxTime) { - eo::mpi::ParallelApply job( _proc, _pop, _algo, _masterRank, _packetSize ); + eo::mpi::MultiParallelApply job( _proc, _pop, _algo, _masterRank, _packetSize, _maxTime ); job.run(); } #endif diff --git a/eo/src/eoPopEvalFunc.h b/eo/src/eoPopEvalFunc.h index 8885ef11..77473eb1 100644 --- a/eo/src/eoPopEvalFunc.h +++ b/eo/src/eoPopEvalFunc.h @@ -87,12 +87,14 @@ public: eoEvalFunc & _eval, eo::mpi::AssignmentAlgorithm& _assignAlgo, int _masterRank, - int _packetSize = 1 + int _packetSize = 1, + int _maxTime = 0 ) : eval(_eval), assignAlgo( _assignAlgo ), masterRank( _masterRank ), - packetSize( _packetSize ) + packetSize( _packetSize ), + maxTime( _maxTime ) { // empty } @@ -101,7 +103,7 @@ public: void operator()(eoPop & _parents, eoPop & _offspring) { (void)_parents; - parallelApply(eval, _offspring, assignAlgo, masterRank, packetSize); + parallelApply(eval, _offspring, assignAlgo, masterRank, packetSize, maxTime); } private: @@ -110,6 +112,7 @@ private: eo::mpi::AssignmentAlgorithm & assignAlgo; int masterRank; int packetSize; + int maxTime; }; #endif diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h index b12233c5..480bfbcb 100644 --- a/eo/src/mpi/eoMpi.h +++ b/eo/src/mpi/eoMpi.h @@ -3,10 +3,15 @@ # include # include +# include +# include + # include +# include # include "eoMpiNode.h" # include "eoMpiAssignmentAlgorithm.h" + // TODO TODOB comment! namespace eo @@ -28,10 +33,11 @@ namespace eo { public: - Job( AssignmentAlgorithm& _algo, int _masterRank ) : + Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) : assignmentAlgo( _algo ), comm( Node::comm() ), - masterRank( _masterRank ) + masterRank( _masterRank ), + _maxTime( maxTime ) { _isMaster = Node::comm().rank() == _masterRank; } @@ -43,6 +49,8 @@ namespace eo // worker virtual void processTask( ) = 0; + protected: + void master( ) { int totalWorkers = assignmentAlgo.availableWorkers(); @@ -50,9 +58,18 @@ namespace eo eo::log << eo::debug; eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl; # endif - + bool timeStopped = false; while( ! isFinished() ) { + // Time restrictions + getrusage( RUSAGE_SELF , &_usage ); + _current = _usage.ru_utime.tv_sec + _usage.ru_stime.tv_sec; + if( _maxTime > 0 && _current > _maxTime ) + { + timeStopped = true; + break; + } + int assignee = assignmentAlgo.get( ); while( assignee <= 0 ) { @@ -71,6 +88,7 @@ namespace eo # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl; # endif + comm.send( assignee, Channel::Commands, Message::Continue ); sendTask( assignee ); } @@ -101,6 +119,10 @@ namespace eo # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; # endif + if( timeStopped ) + { + throw eoMaxTimeException( _current ); + } } void worker( ) @@ -131,6 +153,8 @@ namespace eo } } + public: + void run( ) { ( _isMaster ) ? master( ) : worker( ); @@ -146,6 +170,10 @@ namespace eo bmpi::communicator& comm; int masterRank; bool _isMaster; + + struct rusage _usage; + long _current; + const long _maxTime; }; } } diff --git a/eo/src/mpi/eoMultiParallelApply.h b/eo/src/mpi/eoMultiParallelApply.h new file mode 100644 index 00000000..f3df801d --- /dev/null +++ b/eo/src/mpi/eoMultiParallelApply.h @@ -0,0 +1,45 @@ + +# ifndef __EO_MULTI_PARALLEL_APPLY_H__ +# define __EO_MULTI_PARALLEL_APPLY_H__ + +# include "eoParallelApply.h" + +namespace eo +{ + namespace mpi + { + template< typename EOT > + class MultiParallelApply : public ParallelApply + { + public: + + // using ParallelApply::comm; + using ParallelApply::masterRank; + + MultiParallelApply( + eoUF & _proc, + std::vector& _pop, + AssignmentAlgorithm & algo, + int _masterRank, + int _packetSize = 1, + long _maxTime = 0 + ) : + ParallelApply( _proc, _pop, algo, _masterRank, _packetSize, _maxTime ) + { + // empty + } + + virtual void processTask( ) + { + int order = Message::Continue; + while( order != Message::Finish ) + { + ParallelApply::processTask( ); + ParallelApply::comm.recv( masterRank, Channel::Commands, order ); + } + } + }; + } +} +# endif // __EO_PARALLEL_APPLY_H__ + diff --git a/eo/src/mpi/eoParallelApply.h b/eo/src/mpi/eoParallelApply.h index 1e9d4c1d..af0666cc 100644 --- a/eo/src/mpi/eoParallelApply.h +++ b/eo/src/mpi/eoParallelApply.h @@ -26,9 +26,10 @@ namespace eo std::vector& _pop, AssignmentAlgorithm & algo, int _masterRank, - int _packetSize = 1 + int _packetSize = 1, + long _maxTime = 0 ) : - Job( algo, _masterRank ), + Job( algo, _masterRank, _maxTime ), func( _proc ), index( 0 ), size( _pop.size() ), @@ -42,7 +43,7 @@ namespace eo tempArray = new EOT[ packetSize ]; } - ~ParallelApply() + virtual ~ParallelApply() { delete [] tempArray; } @@ -59,8 +60,11 @@ namespace eo } int sentSize = futureIndex - index ; + comm.send( wrkRank, 1, sentSize ); + eo::log << eo::progress << "Evaluating individual " << index << std::endl; + assignedTasks[ wrkRank ].index = index; assignedTasks[ wrkRank ].size = sentSize; @@ -85,7 +89,7 @@ namespace eo comm.send( masterRank, 1, tempArray, recvSize ); } - bool isFinished() + virtual bool isFinished() { return index == size; } diff --git a/eo/src/mpi/eoTerminateJob.h b/eo/src/mpi/eoTerminateJob.h new file mode 100644 index 00000000..4cb18225 --- /dev/null +++ b/eo/src/mpi/eoTerminateJob.h @@ -0,0 +1,42 @@ +# ifndef __EO_TERMINATE_H__ +# define __EO_TERMINATE_H__ + +# include "eoMpi.h" + +namespace eo +{ + namespace mpi + { + class TerminateJob : public Job + { + public: + TerminateJob( AssignmentAlgorithm& algo, int _ ) + : Job( algo, _ ) + { + // empty + } + + void sendTask( int wrkRank ) + { + // empty + } + + void handleResponse( int wrkRank ) + { + // empty + } + + void processTask( ) + { + // empty + } + + bool isFinished() + { + return true; + } + }; + } +} + +# endif // __EO_TERMINATE_H__