Putting time conditions into eo::mpi::Job and MultiParallelApply, which doesn't prefigure about number of workers evaluations.

This commit is contained in:
Benjamin Bouvier 2012-06-26 17:53:32 +02:00
commit 7b399aa1dd
6 changed files with 136 additions and 13 deletions

View file

@ -35,7 +35,7 @@
# ifdef WITH_MPI # ifdef WITH_MPI
# include <mpi/eoMpi.h> # include <mpi/eoMpi.h>
# include <mpi/eoParallelApply.h> # include <mpi/eoMultiParallelApply.h>
# endif // WITH_MPI # endif // WITH_MPI
/** /**
@ -91,9 +91,10 @@ void parallelApply(
std::vector<EOT>& _pop, std::vector<EOT>& _pop,
eo::mpi::AssignmentAlgorithm& _algo, eo::mpi::AssignmentAlgorithm& _algo,
int _masterRank, int _masterRank,
int _packetSize) int _packetSize,
int _maxTime)
{ {
eo::mpi::ParallelApply<EOT> job( _proc, _pop, _algo, _masterRank, _packetSize ); eo::mpi::MultiParallelApply<EOT> job( _proc, _pop, _algo, _masterRank, _packetSize, _maxTime );
job.run(); job.run();
} }
#endif #endif

View file

@ -87,12 +87,14 @@ public:
eoEvalFunc<EOT> & _eval, eoEvalFunc<EOT> & _eval,
eo::mpi::AssignmentAlgorithm& _assignAlgo, eo::mpi::AssignmentAlgorithm& _assignAlgo,
int _masterRank, int _masterRank,
int _packetSize = 1 int _packetSize = 1,
int _maxTime = 0
) : ) :
eval(_eval), eval(_eval),
assignAlgo( _assignAlgo ), assignAlgo( _assignAlgo ),
masterRank( _masterRank ), masterRank( _masterRank ),
packetSize( _packetSize ) packetSize( _packetSize ),
maxTime( _maxTime )
{ {
// empty // empty
} }
@ -101,7 +103,7 @@ public:
void operator()(eoPop<EOT> & _parents, eoPop<EOT> & _offspring) void operator()(eoPop<EOT> & _parents, eoPop<EOT> & _offspring)
{ {
(void)_parents; (void)_parents;
parallelApply<EOT>(eval, _offspring, assignAlgo, masterRank, packetSize); parallelApply<EOT>(eval, _offspring, assignAlgo, masterRank, packetSize, maxTime);
} }
private: private:
@ -110,6 +112,7 @@ private:
eo::mpi::AssignmentAlgorithm & assignAlgo; eo::mpi::AssignmentAlgorithm & assignAlgo;
int masterRank; int masterRank;
int packetSize; int packetSize;
int maxTime;
}; };
#endif #endif

View file

@ -3,10 +3,15 @@
# include <vector> # include <vector>
# include <map> # include <map>
# include <sys/time.h>
# include <sys/resource.h>
# include <utils/eoLogger.h> # include <utils/eoLogger.h>
# include <eoExceptions.h>
# include "eoMpiNode.h" # include "eoMpiNode.h"
# include "eoMpiAssignmentAlgorithm.h" # include "eoMpiAssignmentAlgorithm.h"
// TODO TODOB comment! // TODO TODOB comment!
namespace eo namespace eo
@ -28,10 +33,11 @@ namespace eo
{ {
public: public:
Job( AssignmentAlgorithm& _algo, int _masterRank ) : Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) :
assignmentAlgo( _algo ), assignmentAlgo( _algo ),
comm( Node::comm() ), comm( Node::comm() ),
masterRank( _masterRank ) masterRank( _masterRank ),
_maxTime( maxTime )
{ {
_isMaster = Node::comm().rank() == _masterRank; _isMaster = Node::comm().rank() == _masterRank;
} }
@ -43,6 +49,8 @@ namespace eo
// worker // worker
virtual void processTask( ) = 0; virtual void processTask( ) = 0;
protected:
void master( ) void master( )
{ {
int totalWorkers = assignmentAlgo.availableWorkers(); int totalWorkers = assignmentAlgo.availableWorkers();
@ -50,9 +58,18 @@ namespace eo
eo::log << eo::debug; eo::log << eo::debug;
eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl; eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl;
# endif # endif
bool timeStopped = false;
while( ! isFinished() ) while( ! isFinished() )
{ {
// Time restrictions
getrusage( RUSAGE_SELF , &_usage );
_current = _usage.ru_utime.tv_sec + _usage.ru_stime.tv_sec;
if( _maxTime > 0 && _current > _maxTime )
{
timeStopped = true;
break;
}
int assignee = assignmentAlgo.get( ); int assignee = assignmentAlgo.get( );
while( assignee <= 0 ) while( assignee <= 0 )
{ {
@ -71,6 +88,7 @@ namespace eo
# ifndef NDEBUG # ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl; eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl;
# endif # endif
comm.send( assignee, Channel::Commands, Message::Continue ); comm.send( assignee, Channel::Commands, Message::Continue );
sendTask( assignee ); sendTask( assignee );
} }
@ -101,6 +119,10 @@ namespace eo
# ifndef NDEBUG # ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl;
# endif # endif
if( timeStopped )
{
throw eoMaxTimeException( _current );
}
} }
void worker( ) void worker( )
@ -131,6 +153,8 @@ namespace eo
} }
} }
public:
void run( ) void run( )
{ {
( _isMaster ) ? master( ) : worker( ); ( _isMaster ) ? master( ) : worker( );
@ -146,6 +170,10 @@ namespace eo
bmpi::communicator& comm; bmpi::communicator& comm;
int masterRank; int masterRank;
bool _isMaster; bool _isMaster;
struct rusage _usage;
long _current;
const long _maxTime;
}; };
} }
} }

View file

@ -0,0 +1,45 @@
# ifndef __EO_MULTI_PARALLEL_APPLY_H__
# define __EO_MULTI_PARALLEL_APPLY_H__
# include "eoParallelApply.h"
namespace eo
{
namespace mpi
{
template< typename EOT >
class MultiParallelApply : public ParallelApply<EOT>
{
public:
// using ParallelApply<EOT>::comm;
using ParallelApply<EOT>::masterRank;
MultiParallelApply(
eoUF<EOT&, void> & _proc,
std::vector<EOT>& _pop,
AssignmentAlgorithm & algo,
int _masterRank,
int _packetSize = 1,
long _maxTime = 0
) :
ParallelApply<EOT>( _proc, _pop, algo, _masterRank, _packetSize, _maxTime )
{
// empty
}
virtual void processTask( )
{
int order = Message::Continue;
while( order != Message::Finish )
{
ParallelApply<EOT>::processTask( );
ParallelApply<EOT>::comm.recv( masterRank, Channel::Commands, order );
}
}
};
}
}
# endif // __EO_PARALLEL_APPLY_H__

View file

@ -26,9 +26,10 @@ namespace eo
std::vector<EOT>& _pop, std::vector<EOT>& _pop,
AssignmentAlgorithm & algo, AssignmentAlgorithm & algo,
int _masterRank, int _masterRank,
int _packetSize = 1 int _packetSize = 1,
long _maxTime = 0
) : ) :
Job( algo, _masterRank ), Job( algo, _masterRank, _maxTime ),
func( _proc ), func( _proc ),
index( 0 ), index( 0 ),
size( _pop.size() ), size( _pop.size() ),
@ -42,7 +43,7 @@ namespace eo
tempArray = new EOT[ packetSize ]; tempArray = new EOT[ packetSize ];
} }
~ParallelApply() virtual ~ParallelApply()
{ {
delete [] tempArray; delete [] tempArray;
} }
@ -59,8 +60,11 @@ namespace eo
} }
int sentSize = futureIndex - index ; int sentSize = futureIndex - index ;
comm.send( wrkRank, 1, sentSize ); comm.send( wrkRank, 1, sentSize );
eo::log << eo::progress << "Evaluating individual " << index << std::endl;
assignedTasks[ wrkRank ].index = index; assignedTasks[ wrkRank ].index = index;
assignedTasks[ wrkRank ].size = sentSize; assignedTasks[ wrkRank ].size = sentSize;
@ -85,7 +89,7 @@ namespace eo
comm.send( masterRank, 1, tempArray, recvSize ); comm.send( masterRank, 1, tempArray, recvSize );
} }
bool isFinished() virtual bool isFinished()
{ {
return index == size; return index == size;
} }

View file

@ -0,0 +1,42 @@
# ifndef __EO_TERMINATE_H__
# define __EO_TERMINATE_H__
# include "eoMpi.h"
namespace eo
{
namespace mpi
{
class TerminateJob : public Job
{
public:
TerminateJob( AssignmentAlgorithm& algo, int _ )
: Job( algo, _ )
{
// empty
}
void sendTask( int wrkRank )
{
// empty
}
void handleResponse( int wrkRank )
{
// empty
}
void processTask( )
{
// empty
}
bool isFinished()
{
return true;
}
};
}
}
# endif // __EO_TERMINATE_H__