Steps of MpiJob are now functors which can be wrapped (using decorator pattern).

This commit is contained in:
Benjamin Bouvier 2012-07-03 13:57:20 +02:00
commit 6bb2ccfbd6
4 changed files with 158 additions and 50 deletions

View file

@ -44,6 +44,10 @@ namespace eo
void data( JobData* _d )
{
d = _d;
if( wrapped )
{
wrapped->data( _d );
}
}
protected:

View file

@ -17,14 +17,14 @@ namespace eo
};
template<class EOT>
struct JobData
struct ParallelApplyData
{
JobData(
ParallelApplyData(
eoUF<EOT&, void> & _proc,
std::vector<EOT>& _pop,
int _masterRank,
// long _maxTime = 0,
int _packetSize
int _packetSize // FIXME = 1 ?
) :
data( _pop ), func( _proc ), index( 0 ), size( _pop.size() ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() )
{
@ -35,7 +35,7 @@ namespace eo
tempArray = new EOT[ _packetSize ];
}
~JobData()
~ParallelApplyData()
{
delete [] tempArray;
}
@ -52,43 +52,16 @@ namespace eo
bmpi::communicator& comm;
};
/*
template< typename EOT >
class ParallelApply : public Job< JobData<EOT> >
template< class EOT >
class SendTaskParallelApply : public SendTaskFunction< ParallelApplyData<EOT> >
{
public:
using SendTaskFunction< ParallelApplyData<EOT> >::d;
ParallelApply(
// eoUF<EOT&, void> & _proc,
// std::vector<EOT>& _pop,
AssignmentAlgorithm & algo,
int _masterRank,
const JobStore< JobData<EOT> >& store
// long _maxTime = 0,
// int _packetSize = 1
) :
Job( algo, _masterRank, store )
// Job( algo, _masterRank, _maxTime ),
func( _proc ),
data( _pop ),
packetSize( _packetSize ),
index( 0 ),
size( _pop.size() )
SendTaskParallelApply( SendTaskParallelApply<EOT> * w = 0 ) : SendTaskFunction< ParallelApplyData<EOT> >( w )
{
// empty
}
protected:
// bmpi::communicator& comm;
};
*/
template< class EOT >
class SendTaskParallelApply : public SendTaskFunction< JobData<EOT> >
{
public:
using SendTaskFunction< JobData<EOT> >::d;
// futureIndex, index, packetSize, size, comm, assignedTasks, data
void operator()(int wrkRank)
@ -117,10 +90,15 @@ namespace eo
};
template< class EOT >
class HandleResponseParallelApply : public HandleResponseFunction< JobData<EOT> >
class HandleResponseParallelApply : public HandleResponseFunction< ParallelApplyData<EOT> >
{
public:
using HandleResponseFunction< JobData<EOT> >::d;
using HandleResponseFunction< ParallelApplyData<EOT> >::d;
HandleResponseParallelApply( HandleResponseParallelApply<EOT> * w = 0 ) : HandleResponseFunction< ParallelApplyData<EOT> >( w )
{
// empty
}
void operator()(int wrkRank)
{
@ -129,10 +107,15 @@ namespace eo
};
template< class EOT >
class ProcessTaskParallelApply : public ProcessTaskFunction< JobData<EOT> >
class ProcessTaskParallelApply : public ProcessTaskFunction< ParallelApplyData<EOT> >
{
public:
using ProcessTaskFunction< JobData<EOT> >::d;
using ProcessTaskFunction< ParallelApplyData<EOT> >::d;
ProcessTaskParallelApply( ProcessTaskParallelApply<EOT> * w = 0 ) : ProcessTaskFunction< ParallelApplyData<EOT> >( w )
{
// empty
}
void operator()()
{
@ -151,10 +134,15 @@ namespace eo
};
template< class EOT >
class IsFinishedParallelApply : public IsFinishedFunction< JobData<EOT> >
class IsFinishedParallelApply : public IsFinishedFunction< ParallelApplyData<EOT> >
{
public:
using IsFinishedFunction< JobData<EOT> >::d;
using IsFinishedFunction< ParallelApplyData<EOT> >::d;
IsFinishedParallelApply( IsFinishedParallelApply<EOT> * w = 0 ) : IsFinishedFunction< ParallelApplyData<EOT> >( w )
{
// empty
}
bool operator()()
{
@ -163,7 +151,7 @@ namespace eo
};
template< class EOT >
struct ParallelApplyStore : public JobStore< JobData<EOT> >
struct ParallelApplyStore : public JobStore< ParallelApplyData<EOT> >
{
ParallelApplyStore(
eoUF<EOT&, void> & _proc,
@ -172,7 +160,7 @@ namespace eo
// long _maxTime = 0,
int _packetSize = 1
)
: j( _proc, _pop, _masterRank, _packetSize )
: _data( _proc, _pop, _masterRank, _packetSize )
{
stpa = new SendTaskParallelApply<EOT>;
hrpa = new HandleResponseParallelApply<EOT>;
@ -188,20 +176,46 @@ namespace eo
delete ispa;
}
SendTaskFunction< JobData<EOT> >& sendTask() const { return *stpa; }
HandleResponseFunction< JobData<EOT> >& handleResponse() const { return *hrpa; }
ProcessTaskFunction< JobData<EOT> >& processTask() const { return *ptpa; }
IsFinishedFunction< JobData<EOT> >& isFinished() const { return *ispa; }
SendTaskParallelApply< EOT >& sendTask() const { return *stpa; }
HandleResponseParallelApply< EOT >& handleResponse() const { return *hrpa; }
ProcessTaskParallelApply< EOT > & processTask() const { return *ptpa; }
IsFinishedParallelApply< EOT >& isFinished() const { return *ispa; }
JobData<EOT>* data() { return &j; }
void sendTask( SendTaskParallelApply< EOT >* _stpa ) { stpa = _stpa; }
void handleResponse( HandleResponseParallelApply< EOT >* _hrpa ) { hrpa = _hrpa; }
void processTask( ProcessTaskParallelApply< EOT >* _ptpa ) { ptpa = _ptpa; }
void isFinished( IsFinishedParallelApply< EOT >* _ispa ) { ispa = _ispa; }
ParallelApplyData<EOT>* data() { return &_data; }
protected:
// TODO commenter : Utiliser des pointeurs pour éviter d'écraser les fonctions wrappées
SendTaskParallelApply<EOT>* stpa;
HandleResponseParallelApply<EOT>* hrpa;
ProcessTaskParallelApply<EOT>* ptpa;
IsFinishedParallelApply<EOT>* ispa;
JobData<EOT> j;
ParallelApplyData<EOT> _data;
};
template< typename EOT >
class ParallelApply : public Job< ParallelApplyData<EOT> >
{
public:
ParallelApply(
AssignmentAlgorithm & algo,
int _masterRank,
ParallelApplyStore<EOT> & store
) :
Job< ParallelApplyData<EOT> >( algo, _masterRank, store )
{
// empty
}
protected:
// bmpi::communicator& comm;
};
}
}

View file

@ -112,7 +112,8 @@ int main(int argc, char** argv)
{
// ParallelApply<int> job( plusOneInstance, v, *(tests[i].assign), 0, store, 3 );
ParallelApplyStore< int > store( plusOneInstance, v, 0, 3 );
Job< JobData<int> > job( *(tests[i].assign), 0, store );
// Job< JobData<int> > job( *(tests[i].assign), 0, store );
ParallelApply< int > job( *(tests[i].assign), 0, store );
if( job.isMaster() )
{

89
eo/test/mpi/wrapper.cpp Normal file
View file

@ -0,0 +1,89 @@
# include <mpi/eoMpi.h>
# include <mpi/eoParallelApply.h>
# include <iostream>
# include <vector>
using namespace std;
using namespace eo::mpi;
struct plusOne : public eoUF< int&, void >
{
void operator() ( int & x )
{
++x;
}
};
template< class EOT >
struct ShowWrappedResult : public IsFinishedParallelApply<EOT>
{
using IsFinishedParallelApply<EOT>::wrapped;
ShowWrappedResult ( IsFinishedParallelApply<EOT> * w ) : IsFinishedParallelApply<EOT>( w ), times( 0 )
{
// empty
}
bool operator()()
{
bool wrappedValue = wrapped->operator()(); // (*wrapped)();
cout << times << ") Wrapped function would say that it is " << ( wrappedValue ? "":"not ") << "finished" << std::endl;
++times;
return wrappedValue;
}
private:
int times;
};
// These tests require at least 3 processes to be launched.
int main(int argc, char** argv)
{
// eo::log << eo::setlevel( eo::debug );
eo::log << eo::setlevel( eo::quiet );
Node::init( argc, argv );
srand( time(0) );
vector<int> v;
for( int i = 0; i < 1000; ++i )
{
v.push_back( rand() );
}
int offset = 0;
vector<int> originalV = v;
plusOne plusOneInstance;
StaticAssignmentAlgorithm assign( v.size() );
ParallelApplyStore< int > store( plusOneInstance, v, 0, 1 );
IsFinishedParallelApply< int >& wrapped = store.isFinished();
ShowWrappedResult< int >* wrapper = new ShowWrappedResult<int>( &wrapped );
store.isFinished( wrapper );
// Job< ParallelApplyData<int> > job( assign, 0, store );
ParallelApply<int> job( assign, 0, store );
job.run();
if( job.isMaster() )
{
++offset;
for(int i = 0; i < v.size(); ++i)
{
cout << v[i] << ' ';
if( originalV[i] + offset != v[i] )
{
cout << " <-- ERROR at this point." << endl;
exit( EXIT_FAILURE );
}
}
cout << endl;
}
return 0;
}