Implementation test of functional configurable job

This commit is contained in:
Benjamin Bouvier 2012-06-29 18:19:55 +02:00
commit 4675abaa24
2 changed files with 239 additions and 78 deletions

View file

@ -8,6 +8,7 @@
# include <utils/eoLogger.h> # include <utils/eoLogger.h>
# include <utils/eoTimer.h> # include <utils/eoTimer.h>
# include <eoFunctor.h>
# include <eoExceptions.h> # include <eoExceptions.h>
# include "eoMpiNode.h" # include "eoMpiNode.h"
@ -32,28 +33,84 @@ namespace eo
const int Finish = 1; const int Finish = 1;
} }
template<class Data>
struct SharedDataFunction
{
void data( Data & _d ) { d = _d; }
protected:
Data d;
};
template<class Data>
struct SendTaskFunction : public eoUF<int, void>, public SharedDataFunction<Data>
{
virtual ~SendTaskFunction() {}
};
template<class Data>
struct HandleResponseFunction : public eoUF<int, void>, public SharedDataFunction<Data>
{
virtual ~HandleResponseFunction() {}
};
template<class Data>
struct ProcessTaskFunction : public eoF<void>, public SharedDataFunction<Data>
{
virtual ~ProcessTaskFunction() {}
};
template<class Data>
struct IsFinishedFunction : public eoF<bool>, public SharedDataFunction<Data>
{
virtual ~IsFinishedFunction() {}
};
template<class Data>
struct JobStore
{
virtual SendTaskFunction<Data> & sendTask() = 0;
virtual HandleResponseFunction<Data> & handleResponse() = 0;
virtual ProcessTaskFunction<Data> & processTask() = 0;
virtual IsFinishedFunction<Data> & isFinished() = 0;
};
template<class Data>
class Job class Job
{ {
public: public:
Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) : Job( AssignmentAlgorithm& _algo, int _masterRank, JobStore<Data> store ) :
// Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) :
assignmentAlgo( _algo ), assignmentAlgo( _algo ),
comm( Node::comm() ), comm( Node::comm() ),
// _maxTime( maxTime ),
masterRank( _masterRank ), masterRank( _masterRank ),
_maxTime( maxTime ) // Functors
sendTask( store.sendTask() ),
handleResponse( store.handleResponse() ),
processTask( store.processTask() ),
isFinished( store.isFinished() )
{ {
_isMaster = Node::comm().rank() == _masterRank; _isMaster = Node::comm().rank() == _masterRank;
} }
/*
// master // master
virtual bool isFinished() = 0; virtual bool isFinished() = 0;
virtual void sendTask( int wrkRank ) = 0; virtual void sendTask( int wrkRank ) = 0;
virtual void handleResponse( int wrkRank ) = 0; virtual void handleResponse( int wrkRank ) = 0;
// worker // worker
virtual void processTask( ) = 0; virtual void processTask( ) = 0;
*/
protected: protected:
SendTaskFunction<Data> & sendTask;
HandleResponseFunction<Data> & handleResponse;
ProcessTaskFunction<Data> & processTask;
IsFinishedFunction<Data> & isFinished;
void master( ) void master( )
{ {
int totalWorkers = assignmentAlgo.availableWorkers(); int totalWorkers = assignmentAlgo.availableWorkers();
@ -65,6 +122,7 @@ namespace eo
while( ! isFinished() ) while( ! isFinished() )
{ {
// Time restrictions // Time restrictions
/*
getrusage( RUSAGE_SELF , &_usage ); getrusage( RUSAGE_SELF , &_usage );
_current = _usage.ru_utime.tv_sec + _usage.ru_stime.tv_sec; _current = _usage.ru_utime.tv_sec + _usage.ru_stime.tv_sec;
if( _maxTime > 0 && _current > _maxTime ) if( _maxTime > 0 && _current > _maxTime )
@ -72,6 +130,7 @@ namespace eo
timeStopped = true; timeStopped = true;
break; break;
} }
*/
timerStat.start("master_wait_for_assignee"); timerStat.start("master_wait_for_assignee");
int assignee = assignmentAlgo.get( ); int assignee = assignmentAlgo.get( );
@ -130,10 +189,12 @@ namespace eo
# ifndef NDEBUG # ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl;
# endif # endif
/*
if( timeStopped ) if( timeStopped )
{ {
throw eoMaxTimeException( _current ); throw eoMaxTimeException( _current );
} }
*/
} }
void worker( ) void worker( )
@ -186,7 +247,7 @@ namespace eo
struct rusage _usage; struct rusage _usage;
long _current; long _current;
const long _maxTime; // const long _maxTime;
}; };
} }
} }

View file

@ -10,15 +10,177 @@ namespace eo
{ {
namespace mpi namespace mpi
{ {
template< typename EOT >
class ParallelApply : public Job
{
private:
struct ParallelApplyAssignment struct ParallelApplyAssignment
{ {
int index; int index;
int size; int size;
}; };
template<class EOT>
struct ParallelApplyData
{
ParallelApplyData() {}
ParallelApplyData(
eoUF<EOT&, void> & _proc,
std::vector<EOT>& _pop,
int _masterRank,
int _packetSize
) :
func( _proc ),
data( _pop ),
index( 0 ),
size( _pop.size() ),
packetSize( _packetSize ),
// job
masterRank( _masterRank ),
comm( Node::comm() )
{
tempArray = new EOT[ _packetSize ];
}
~ParallelApplyData()
{
delete [] tempArray;
}
std::vector<EOT> & data;
eoUF<EOT&, void> & func;
int index;
int size;
std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks;
int packetSize;
EOT* tempArray;
int masterRank;
bmpi::communicator& comm;
};
template<class Data>
struct SendTaskParallelApply : public SendTaskFunction< Data >
{
SendTaskParallelApply( Data & _d )
{
data( _d );
}
using SharedDataFunction< Data >::d;
// futureIndex, index, packetSize, size, comm, assignedTasks, data
void operator()(int wrkRank)
{
int futureIndex;
if( d.index + d.packetSize < d.size )
{
futureIndex = d.index + d.packetSize;
} else {
futureIndex = d.size;
}
int sentSize = futureIndex - d.index ;
d.comm.send( wrkRank, 1, sentSize );
eo::log << eo::progress << "Evaluating individual " << d.index << std::endl;
d.assignedTasks[ wrkRank ].index = d.index;
d.assignedTasks[ wrkRank ].size = sentSize;
d.comm.send( wrkRank, 1, & (d.data[ index ]) , sentSize );
d.index = futureIndex;
}
};
template<class Data>
struct HandleResponseParallelApply : public HandleResponseFunction< Data >
{
HandleResponseParallelApply( Data & _d )
{
data( _d );
}
using SharedDataFunction< Data >::d;
void operator()(int wrkRank)
{
d.comm.recv( wrkRank, 1, & (d.data[ d.assignedTasks[wrkRank].index ] ), d.assignedTasks[wrkRank].size );
}
};
template<class Data>
struct ProcessTaskParallelApply : public ProcessTaskFunction< Data >
{
ProcessTaskParallelApply( Data & _d )
{
data( _d );
}
using SharedDataFunction< Data >::d;
void operator()()
{
int recvSize;
d.comm.recv( d.masterRank, 1, recvSize );
d.comm.recv( d.masterRank, 1, d.tempArray, recvSize );
timerStat.start("worker_processes");
for( int i = 0; i < recvSize ; ++i )
{
d.func( d.tempArray[ i ] );
}
timerStat.stop("worker_processes");
d.comm.send( d.masterRank, 1, d.tempArray, recvSize );
}
};
template<class Data>
struct IsFinishedParallelApply : public IsFinishedFunction< Data >
{
IsFinishedParallelApply( Data & _d )
{
data( _d );
}
using SharedDataFunction< Data >::d;
bool operator()()
{
return d.index == d.size;
}
};
template< typename Data >
struct ParallelApplyStore : public JobStore< Data >
{
ParallelApplyStore( Data & data )
{
stpa = new SendTaskParallelApply< Data >( data );
hrpa = new HandleResponseParallelApply< Data >( data );
ptpa = new ProcessTaskParallelApply< Data >( data );
ispa = new IsFinishedParallelApply< Data >( data );
}
~ParallelApplyStore()
{
delete stpa;
delete hrpa;
delete ptpa;
delete ispa;
}
SendTaskFunction< Data > & sendTask() { return *stpa; }
HandleResponseFunction< Data > & handleResponse() { return *hrpa; }
ProcessTaskFunction< Data > & processTask() { return *ptpa; }
IsFinishedFunction< Data > & isFinished() { return *ispa; }
protected:
SendTaskParallelApply< Data >* stpa;
HandleResponseParallelApply< Data >* hrpa;
ProcessTaskParallelApply< Data >* ptpa;
IsFinishedParallelApply< Data >* ispa;
};
template< typename EOT >
class ParallelApply : public Job< ParallelApplyData<EOT> >
{
public: public:
ParallelApply( ParallelApply(
@ -26,85 +188,23 @@ namespace eo
std::vector<EOT>& _pop, std::vector<EOT>& _pop,
AssignmentAlgorithm & algo, AssignmentAlgorithm & algo,
int _masterRank, int _masterRank,
int _packetSize = 1, // long _maxTime = 0,
long _maxTime = 0 int _packetSize = 1
) : ) :
Job( algo, _masterRank, _maxTime ),
func( _proc ), Job< ParallelApplyData<EOT> >( algo, _masterRank, ParallelApplyStore< ParallelApplyData<EOT> >( sharedData ) ),
index( 0 ), // Job( algo, _masterRank, _maxTime ),
size( _pop.size() ), sharedData( _proc, _pop, _masterRank, _packetSize )
data( _pop ),
packetSize( _packetSize )
{ {
if ( _packetSize <= 0 ) if ( _packetSize <= 0 )
{ {
throw std::runtime_error("Packet size should not be negative."); throw std::runtime_error("Packet size should not be negative.");
} }
tempArray = new EOT[ packetSize ];
}
virtual ~ParallelApply()
{
delete [] tempArray;
}
virtual void sendTask( int wrkRank )
{
int futureIndex;
if( index + packetSize < size )
{
futureIndex = index + packetSize;
} else {
futureIndex = size;
}
int sentSize = futureIndex - index ;
comm.send( wrkRank, 1, sentSize );
eo::log << eo::progress << "Evaluating individual " << index << std::endl;
assignedTasks[ wrkRank ].index = index;
assignedTasks[ wrkRank ].size = sentSize;
comm.send( wrkRank, 1, &data[ index ] , sentSize );
index = futureIndex;
}
virtual void handleResponse( int wrkRank )
{
comm.recv( wrkRank, 1, &data[ assignedTasks[wrkRank].index ], assignedTasks[wrkRank].size );
}
virtual void processTask( )
{
int recvSize;
comm.recv( masterRank, 1, recvSize );
comm.recv( masterRank, 1, tempArray, recvSize );
timerStat.start("worker_processes");
for( int i = 0; i < recvSize ; ++i )
{
func( tempArray[ i ] );
}
timerStat.stop("worker_processes");
comm.send( masterRank, 1, tempArray, recvSize );
}
virtual bool isFinished()
{
return index == size;
} }
protected: protected:
std::vector<EOT> & data; ParallelApplyData<EOT> sharedData;
eoUF<EOT&, void>& func;
int index;
int size;
std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks;
int packetSize;
EOT* tempArray;
}; };
} }
} }