Using again parallel apply into eoPopEvalFunc::eoParallelPopEvalFunc.

This commit is contained in:
Benjamin Bouvier 2012-07-03 15:53:19 +02:00
commit 24c29db6f3
7 changed files with 125 additions and 35 deletions

View file

@ -36,6 +36,7 @@
# ifdef WITH_MPI # ifdef WITH_MPI
# include <mpi/eoMpi.h> # include <mpi/eoMpi.h>
# include <mpi/eoMultiParallelApply.h> # include <mpi/eoMultiParallelApply.h>
# include <mpi/eoTerminateJob.h>
# endif // WITH_MPI # endif // WITH_MPI
/** /**
@ -94,7 +95,8 @@ void parallelApply(
int _packetSize, int _packetSize,
int _maxTime) int _maxTime)
{ {
eo::mpi::MultiParallelApply<EOT> job( _proc, _pop, _algo, _masterRank, _packetSize, _maxTime ); eo::mpi::ParallelEvalStore<EOT> store( _proc, _pop, _masterRank, _packetSize );
eo::mpi::ParallelApply<EOT> job( _algo, _masterRank, store );
job.run(); job.run();
} }
#endif #endif

View file

@ -99,6 +99,15 @@ public:
// empty // empty
} }
~eoParallelPopLoopEval()
{
if( eo::mpi::Node::comm().rank() == masterRank )
{
eo::mpi::EmptyJob job( assignAlgo, masterRank );
job.run();
}
}
/** Do the job: simple loop over the offspring */ /** Do the job: simple loop over the offspring */
void operator()(eoPop<EOT> & _parents, eoPop<EOT> & _offspring) void operator()(eoPop<EOT> & _parents, eoPop<EOT> & _offspring)
{ {

View file

@ -1,4 +1,3 @@
# ifndef __EO_MULTI_PARALLEL_APPLY_H__ # ifndef __EO_MULTI_PARALLEL_APPLY_H__
# define __EO_MULTI_PARALLEL_APPLY_H__ # define __EO_MULTI_PARALLEL_APPLY_H__
@ -8,36 +7,41 @@ namespace eo
{ {
namespace mpi namespace mpi
{ {
template< typename EOT > template< class EOT >
class MultiParallelApply : public ParallelApply<EOT> class ProcessTaskParallelEval : public ProcessTaskParallelApply<EOT>
{ {
public: public:
// using ParallelApply<EOT>::comm; using ProcessTaskParallelApply<EOT>::_wrapped;
using ParallelApply<EOT>::masterRank; using ProcessTaskParallelApply<EOT>::d;
MultiParallelApply( void operator()()
eoUF<EOT&, void> & _proc, {
std::vector<EOT>& _pop, int order = Message::Continue;
AssignmentAlgorithm & algo, while( order != Message::Finish )
int _masterRank,
int _packetSize = 1,
long _maxTime = 0
) :
ParallelApply<EOT>( _proc, _pop, algo, _masterRank, _packetSize, _maxTime )
{ {
// empty _wrapped->operator()();
d->comm.recv( d->masterRank, Channel::Commands, order );
} }
}
};
virtual void processTask( ) template< class EOT >
{ struct ParallelEvalStore : public ParallelApplyStore< EOT >
int order = Message::Continue; {
while( order != Message::Finish ) using ParallelApplyStore<EOT>::wrapProcessTask;
{
ParallelApply<EOT>::processTask( ); ParallelEvalStore(
ParallelApply<EOT>::comm.recv( masterRank, Channel::Commands, order ); eoUF<EOT&, void> & _proc,
} std::vector<EOT>& _pop,
} int _masterRank,
// long _maxTime = 0,
int _packetSize = 1
) :
ParallelApplyStore< EOT >( _proc, _pop, _masterRank, _packetSize )
{
wrapProcessTask( new ProcessTaskParallelEval<EOT> );
}
}; };
} }
} }

View file

@ -24,7 +24,7 @@ namespace eo
std::vector<EOT>& _pop, std::vector<EOT>& _pop,
int _masterRank, int _masterRank,
// long _maxTime = 0, // long _maxTime = 0,
int _packetSize // FIXME = 1 ? int _packetSize
) : ) :
data( _pop ), func( _proc ), index( 0 ), size( _pop.size() ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() ) data( _pop ), func( _proc ), index( 0 ), size( _pop.size() ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() )
{ {
@ -163,14 +163,19 @@ namespace eo
std::vector<EOT>& _pop, std::vector<EOT>& _pop,
int _masterRank, int _masterRank,
// long _maxTime = 0, // long _maxTime = 0,
int _packetSize = 1 int _packetSize = 1,
) // JobStore functors
: _data( _proc, _pop, _masterRank, _packetSize ) SendTaskParallelApply<EOT> * stpa = new SendTaskParallelApply<EOT>,
HandleResponseParallelApply<EOT>* hrpa = new HandleResponseParallelApply<EOT>,
ProcessTaskParallelApply<EOT>* ptpa = new ProcessTaskParallelApply<EOT>,
IsFinishedParallelApply<EOT>* ifpa = new IsFinishedParallelApply<EOT>
) :
_data( _proc, _pop, _masterRank, _packetSize )
{ {
_stf = new SendTaskParallelApply<EOT>; _stf = stpa;
_hrf = new HandleResponseParallelApply<EOT>; _hrf = hrpa;
_ptf = new ProcessTaskParallelApply<EOT>; _ptf = ptpa;
_iff = new IsFinishedParallelApply<EOT>; _iff = ifpa;
} }
ParallelApplyData<EOT>* data() { return &_data; } ParallelApplyData<EOT>* data() { return &_data; }

View file

@ -7,6 +7,73 @@ namespace eo
{ {
namespace mpi namespace mpi
{ {
struct DummySendTaskFunction : public SendTaskFunction<void>
{
void operator()( int _ )
{
}
};
struct DummyHandleResponseFunction : public HandleResponseFunction<void>
{
void operator()( int _ )
{
}
};
struct DummyProcessTaskFunction : public ProcessTaskFunction<void>
{
void operator()()
{
// nothing!
}
};
struct DummyIsFinishedFunction : public IsFinishedFunction<void>
{
bool operator()()
{
return true;
}
};
struct DummyJobStore : public JobStore<void>
{
using JobStore<void>::_stf;
using JobStore<void>::_hrf;
using JobStore<void>::_ptf;
using JobStore<void>::_iff;
DummyJobStore()
{
_stf = new DummySendTaskFunction;
_hrf = new DummyHandleResponseFunction;
_ptf = new DummyProcessTaskFunction;
_iff = new DummyIsFinishedFunction;
}
~DummyJobStore()
{
delete _stf;
delete _hrf;
delete _ptf;
delete _iff;
}
void* data() { return 0; }
};
struct EmptyJob : public Job<void>
{
EmptyJob( AssignmentAlgorithm& algo, int masterRank ) :
Job<void>( algo, masterRank, *(new DummyJobStore) )
// FIXME memory leak => will be corrected by using const correctness
{
// empty
}
};
/*
class TerminateJob : public Job class TerminateJob : public Job
{ {
public: public:
@ -36,6 +103,7 @@ namespace eo
return true; return true;
} }
}; };
*/
} }
} }

View file

@ -11,6 +11,8 @@
#include <mpi/eoMpi.h> #include <mpi/eoMpi.h>
#include <mpi/eoTerminateJob.h>
#include <boost/mpi.hpp> #include <boost/mpi.hpp>
#include <vector> #include <vector>
@ -83,6 +85,7 @@ typedef eoRealSerializable EOT;
int main(int ac, char** av) int main(int ac, char** av)
{ {
eo::mpi::Node::init( ac, av ); eo::mpi::Node::init( ac, av );
eo::log << eo::setlevel( eo::debug );
eoParser parser(ac, av); eoParser parser(ac, av);
@ -110,7 +113,7 @@ int main(int ac, char** av)
eo::log << eo::setlevel( eo::debug ); eo::log << eo::setlevel( eo::debug );
eo::mpi::DynamicAssignmentAlgorithm assign; eo::mpi::DynamicAssignmentAlgorithm assign;
eoParallelPopLoopEval< EOT > popEval( eval, assign, 0, 3 ); eoParallelPopLoopEval< EOT > popEval( eval, assign, eo::mpi::DEFAULT_MASTER, 3 );
popEval( pop, pop ); popEval( pop, pop );
eo::log << eo::quiet << "DONE!" << std::endl; eo::log << eo::quiet << "DONE!" << std::endl;

View file

@ -110,7 +110,6 @@ int main(int argc, char** argv)
for( unsigned int i = 0; i < tests.size(); ++i ) for( unsigned int i = 0; i < tests.size(); ++i )
{ {
// ParallelApply<int> job( plusOneInstance, v, *(tests[i].assign), 0, store, 3 );
ParallelApplyStore< int > store( plusOneInstance, v, eo::mpi::DEFAULT_MASTER, 3 ); ParallelApplyStore< int > store( plusOneInstance, v, eo::mpi::DEFAULT_MASTER, 3 );
// Job< JobData<int> > job( *(tests[i].assign), eo::mpi::DEFAULT_MASTER, store ); // Job< JobData<int> > job( *(tests[i].assign), eo::mpi::DEFAULT_MASTER, store );
ParallelApply< int > job( *(tests[i].assign), eo::mpi::DEFAULT_MASTER, store ); ParallelApply< int > job( *(tests[i].assign), eo::mpi::DEFAULT_MASTER, store );