Cleaner version of functional parallel job.

This commit is contained in:
Benjamin Bouvier 2012-07-02 17:53:02 +02:00
commit ff61676fb7
3 changed files with 151 additions and 104 deletions

View file

@ -33,42 +33,91 @@ namespace eo
const int Finish = 1; const int Finish = 1;
} }
class SendTaskFunction : public eoUF<int, void> template< typename JobData, typename Wrapped >
struct SharedDataFunction
{
SharedDataFunction( Wrapped * w )
{
wrapped = w;
}
void data( JobData* _d )
{
d = _d;
}
protected:
JobData* d;
Wrapped* wrapped;
};
template< typename JobData >
struct SendTaskFunction : public eoUF<int, void>, public SharedDataFunction< JobData, SendTaskFunction<JobData> >
{ {
public: public:
SendTaskFunction( SendTaskFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, SendTaskFunction<JobData> >( w )
{
// empty
}
virtual ~SendTaskFunction() {} virtual ~SendTaskFunction() {}
}; };
class HandleResponseFunction : public eoUF<int, void> template< typename JobData >
struct HandleResponseFunction : public eoUF<int, void>, public SharedDataFunction< JobData, HandleResponseFunction<JobData> >
{ {
public: public:
HandleResponseFunction( HandleResponseFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, HandleResponseFunction<JobData> >( w )
{
// empty
}
virtual ~HandleResponseFunction() {} virtual ~HandleResponseFunction() {}
}; };
class ProcessTaskFunction : public eoF<void> template< typename JobData >
struct ProcessTaskFunction : public eoF<void>, public SharedDataFunction< JobData, ProcessTaskFunction<JobData> >
{ {
public: public:
ProcessTaskFunction( ProcessTaskFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, ProcessTaskFunction<JobData> >( w )
{
// empty
}
virtual ~ProcessTaskFunction() {} virtual ~ProcessTaskFunction() {}
}; };
class IsFinishedFunction : public eoF<bool> template< typename JobData >
struct IsFinishedFunction : public eoF<bool>, public SharedDataFunction< JobData, IsFinishedFunction<JobData> >
{ {
public: public:
IsFinishedFunction( IsFinishedFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, IsFinishedFunction<JobData> >( w )
{
// empty
}
virtual ~IsFinishedFunction() {} virtual ~IsFinishedFunction() {}
}; };
template< typename JobData >
struct JobStore struct JobStore
{ {
virtual SendTaskFunction & sendTask() const = 0; virtual SendTaskFunction<JobData> & sendTask() const = 0;
virtual HandleResponseFunction & handleResponse() const = 0; virtual HandleResponseFunction<JobData> & handleResponse() const = 0;
virtual ProcessTaskFunction & processTask() const = 0; virtual ProcessTaskFunction<JobData> & processTask() const = 0;
virtual IsFinishedFunction & isFinished() const = 0; virtual IsFinishedFunction<JobData> & isFinished() const = 0;
virtual JobData* data() = 0;
}; };
template< class JobData >
class Job class Job
{ {
public: public:
Job( AssignmentAlgorithm& _algo, int _masterRank, const JobStore & store ) : Job( AssignmentAlgorithm& _algo, int _masterRank, JobStore<JobData> & store ) :
// Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) : // Job( AssignmentAlgorithm& _algo, int _masterRank, long maxTime = 0 ) :
assignmentAlgo( _algo ), assignmentAlgo( _algo ),
comm( Node::comm() ), comm( Node::comm() ),
@ -81,6 +130,11 @@ namespace eo
isFinished( store.isFinished() ) isFinished( store.isFinished() )
{ {
_isMaster = Node::comm().rank() == _masterRank; _isMaster = Node::comm().rank() == _masterRank;
sendTask.data( store.data() );
handleResponse.data( store.data() );
processTask.data( store.data() );
isFinished.data( store.data() );
} }
/* /*
@ -94,10 +148,10 @@ namespace eo
protected: protected:
SendTaskFunction & sendTask; SendTaskFunction<JobData> & sendTask;
HandleResponseFunction & handleResponse; HandleResponseFunction<JobData> & handleResponse;
ProcessTaskFunction & processTask; ProcessTaskFunction<JobData> & processTask;
IsFinishedFunction & isFinished; IsFinishedFunction<JobData> & isFinished;
void master( ) void master( )
{ {

View file

@ -17,79 +17,29 @@ namespace eo
}; };
template<class EOT> template<class EOT>
class SendTaskParallelApply; struct JobData
template<class EOT>
class HandleResponseParallelApply;
template<class EOT>
class ProcessTaskParallelApply;
template<class EOT>
class IsFinishedParallelApply;
template<class EOT>
class ParallelApply;
template< class EOT >
class BaseParallelApply
{ {
public: JobData(
void owner(ParallelApply<EOT> * job)
{
j = job;
}
protected:
ParallelApply<EOT> * j;
};
template< typename EOT >
class ParallelApply : public Job
{
friend class SendTaskParallelApply<EOT>;
friend class HandleResponseParallelApply<EOT>;
friend class ProcessTaskParallelApply<EOT>;
friend class IsFinishedParallelApply<EOT>;
public:
ParallelApply(
eoUF<EOT&, void> & _proc, eoUF<EOT&, void> & _proc,
std::vector<EOT>& _pop, std::vector<EOT>& _pop,
AssignmentAlgorithm & algo,
int _masterRank, int _masterRank,
const JobStore& store,
// long _maxTime = 0, // long _maxTime = 0,
int _packetSize = 1 int _packetSize
) : ) :
Job( algo, _masterRank, store ), data( _pop ), func( _proc ), index( 0 ), size( _pop.size() ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() )
// Job( algo, _masterRank, _maxTime ),
func( _proc ),
data( _pop ),
packetSize( _packetSize ),
index( 0 ),
size( _pop.size() )
{ {
if ( _packetSize <= 0 ) if ( _packetSize <= 0 )
{ {
throw std::runtime_error("Packet size should not be negative."); throw std::runtime_error("Packet size should not be negative.");
} }
tempArray = new EOT [ _packetSize ]; tempArray = new EOT[ _packetSize ];
dynamic_cast< BaseParallelApply<EOT>& >( sendTask ).owner( this );
dynamic_cast< BaseParallelApply<EOT>& >( handleResponse ).owner( this );
dynamic_cast< BaseParallelApply<EOT>& >( processTask ).owner( this );
dynamic_cast< BaseParallelApply<EOT>& >( isFinished ).owner( this );
} }
~ParallelApply() ~JobData()
{ {
delete [] tempArray; delete [] tempArray;
} }
protected:
std::vector<EOT> & data; std::vector<EOT> & data;
eoUF<EOT&, void> & func; eoUF<EOT&, void> & func;
int index; int index;
@ -98,92 +48,131 @@ namespace eo
int packetSize; int packetSize;
EOT* tempArray; EOT* tempArray;
// bmpi::communicator& comm; int masterRank;
bmpi::communicator& comm;
}; };
template< class EOT > /*
class SendTaskParallelApply : public SendTaskFunction, public BaseParallelApply<EOT> template< typename EOT >
class ParallelApply : public Job< JobData<EOT> >
{ {
public: public:
using BaseParallelApply<EOT>::j;
ParallelApply(
// eoUF<EOT&, void> & _proc,
// std::vector<EOT>& _pop,
AssignmentAlgorithm & algo,
int _masterRank,
const JobStore< JobData<EOT> >& store
// long _maxTime = 0,
// int _packetSize = 1
) :
Job( algo, _masterRank, store )
// Job( algo, _masterRank, _maxTime ),
func( _proc ),
data( _pop ),
packetSize( _packetSize ),
index( 0 ),
size( _pop.size() )
{
// empty
}
protected:
// bmpi::communicator& comm;
};
*/
template< class EOT >
class SendTaskParallelApply : public SendTaskFunction< JobData<EOT> >
{
public:
using SendTaskFunction< JobData<EOT> >::d;
// futureIndex, index, packetSize, size, comm, assignedTasks, data // futureIndex, index, packetSize, size, comm, assignedTasks, data
void operator()(int wrkRank) void operator()(int wrkRank)
{ {
int futureIndex; int futureIndex;
if( j->index + j->packetSize < j->size ) if( d->index + d->packetSize < d->size )
{ {
futureIndex = j->index + j->packetSize; futureIndex = d->index + d->packetSize;
} else { } else {
futureIndex = j->size; futureIndex = d->size;
} }
int sentSize = futureIndex - j->index ; int sentSize = futureIndex - d->index ;
j->comm.send( wrkRank, 1, sentSize ); d->comm.send( wrkRank, 1, sentSize );
eo::log << eo::progress << "Evaluating individual " << j->index << std::endl; eo::log << eo::progress << "Evaluating individual " << d->index << std::endl;
j->assignedTasks[ wrkRank ].index = j->index; d->assignedTasks[ wrkRank ].index = d->index;
j->assignedTasks[ wrkRank ].size = sentSize; d->assignedTasks[ wrkRank ].size = sentSize;
j->comm.send( wrkRank, 1, & ( (j->data)[ j->index ] ) , sentSize ); d->comm.send( wrkRank, 1, & ( (d->data)[ d->index ] ) , sentSize );
j->index = futureIndex; d->index = futureIndex;
} }
}; };
template< class EOT > template< class EOT >
class HandleResponseParallelApply : public HandleResponseFunction, public BaseParallelApply<EOT> class HandleResponseParallelApply : public HandleResponseFunction< JobData<EOT> >
{ {
public: public:
using BaseParallelApply<EOT>::j; using HandleResponseFunction< JobData<EOT> >::d;
void operator()(int wrkRank) void operator()(int wrkRank)
{ {
j->comm.recv( wrkRank, 1, & (j->data[ j->assignedTasks[wrkRank].index ] ), j->assignedTasks[wrkRank].size ); d->comm.recv( wrkRank, 1, & (d->data[ d->assignedTasks[wrkRank].index ] ), d->assignedTasks[wrkRank].size );
} }
}; };
template< class EOT > template< class EOT >
class ProcessTaskParallelApply : public ProcessTaskFunction, public BaseParallelApply<EOT> class ProcessTaskParallelApply : public ProcessTaskFunction< JobData<EOT> >
{ {
public: public:
using BaseParallelApply<EOT>::j; using ProcessTaskFunction< JobData<EOT> >::d;
void operator()() void operator()()
{ {
int recvSize; int recvSize;
j->comm.recv( j->masterRank, 1, recvSize ); d->comm.recv( d->masterRank, 1, recvSize );
j->comm.recv( j->masterRank, 1, j->tempArray, recvSize ); d->comm.recv( d->masterRank, 1, d->tempArray, recvSize );
timerStat.start("worker_processes"); timerStat.start("worker_processes");
for( int i = 0; i < recvSize ; ++i ) for( int i = 0; i < recvSize ; ++i )
{ {
j->func( j->tempArray[ i ] ); d->func( d->tempArray[ i ] );
} }
timerStat.stop("worker_processes"); timerStat.stop("worker_processes");
j->comm.send( j->masterRank, 1, j->tempArray, recvSize ); d->comm.send( d->masterRank, 1, d->tempArray, recvSize );
} }
}; };
template< class EOT > template< class EOT >
class IsFinishedParallelApply : public IsFinishedFunction, public BaseParallelApply<EOT> class IsFinishedParallelApply : public IsFinishedFunction< JobData<EOT> >
{ {
public: public:
using IsFinishedFunction< JobData<EOT> >::d;
using BaseParallelApply<EOT>::j;
bool operator()() bool operator()()
{ {
return j->index == j->size; return d->index == d->size;
} }
}; };
template< class EOT > template< class EOT >
struct ParallelApplyStore : public JobStore struct ParallelApplyStore : public JobStore< JobData<EOT> >
{ {
ParallelApplyStore() ParallelApplyStore(
eoUF<EOT&, void> & _proc,
std::vector<EOT>& _pop,
int _masterRank,
// long _maxTime = 0,
int _packetSize = 1
)
: j( _proc, _pop, _masterRank, _packetSize )
{ {
stpa = new SendTaskParallelApply<EOT>; stpa = new SendTaskParallelApply<EOT>;
hrpa = new HandleResponseParallelApply<EOT>; hrpa = new HandleResponseParallelApply<EOT>;
@ -199,16 +188,20 @@ namespace eo
delete ispa; delete ispa;
} }
SendTaskFunction& sendTask() const { return *stpa; } SendTaskFunction< JobData<EOT> >& sendTask() const { return *stpa; }
HandleResponseFunction& handleResponse() const { return *hrpa; } HandleResponseFunction< JobData<EOT> >& handleResponse() const { return *hrpa; }
ProcessTaskFunction& processTask() const { return *ptpa; } ProcessTaskFunction< JobData<EOT> >& processTask() const { return *ptpa; }
IsFinishedFunction& isFinished() const { return *ispa; } IsFinishedFunction< JobData<EOT> >& isFinished() const { return *ispa; }
JobData<EOT>* data() { return &j; }
protected: protected:
SendTaskParallelApply<EOT>* stpa; SendTaskParallelApply<EOT>* stpa;
HandleResponseParallelApply<EOT>* hrpa; HandleResponseParallelApply<EOT>* hrpa;
ProcessTaskParallelApply<EOT>* ptpa; ProcessTaskParallelApply<EOT>* ptpa;
IsFinishedParallelApply<EOT>* ispa; IsFinishedParallelApply<EOT>* ispa;
JobData<EOT> j;
}; };
} }
} }

View file

@ -38,7 +38,7 @@ int main(int argc, char** argv)
{ {
v.push_back( rand() ); v.push_back( rand() );
} }
int offset = 0; int offset = 0;
vector<int> originalV = v; vector<int> originalV = v;
@ -46,8 +46,6 @@ int main(int argc, char** argv)
vector< Test > tests; vector< Test > tests;
ParallelApplyStore<int> store;
const int ALL = Node::comm().size(); const int ALL = Node::comm().size();
Test tIntervalStatic; Test tIntervalStatic;
@ -112,7 +110,9 @@ int main(int argc, char** argv)
for( unsigned int i = 0; i < tests.size(); ++i ) for( unsigned int i = 0; i < tests.size(); ++i )
{ {
ParallelApply<int> job( plusOneInstance, v, *(tests[i].assign), 0, store, 3 ); // ParallelApply<int> job( plusOneInstance, v, *(tests[i].assign), 0, store, 3 );
ParallelApplyStore< int > store( plusOneInstance, v, 0, 3 );
Job< JobData<int> > job( *(tests[i].assign), 0, store );
if( job.isMaster() ) if( job.isMaster() )
{ {