Removed useless continuator functor, replaced with a virtual method into MpiJob.
This commit is contained in:
parent
28ab2004ea
commit
fb871382e0
2 changed files with 13 additions and 68 deletions
|
|
@ -6,41 +6,19 @@
|
|||
# include <eoFunctor.h>
|
||||
# include <vector>
|
||||
|
||||
template< typename EOT >
|
||||
struct ParallelApplyContinuator : public BaseContinuator
|
||||
{
|
||||
ParallelApplyContinuator( int index, int size )
|
||||
{
|
||||
_index = index;
|
||||
_size = size;
|
||||
}
|
||||
|
||||
void index( int i ) { _index = i; }
|
||||
|
||||
bool operator()()
|
||||
{
|
||||
return _index < _size;
|
||||
}
|
||||
|
||||
private:
|
||||
int _index;
|
||||
int _size;
|
||||
};
|
||||
|
||||
template< typename EOT >
|
||||
class ParallelApply : public MpiJob
|
||||
{
|
||||
public:
|
||||
|
||||
ParallelApply( eoUF<EOT&, void> & _proc, std::vector<EOT>& _pop, AssignmentAlgorithm & algo, int _masterRank ) :
|
||||
MpiJob( algo,
|
||||
new ParallelApplyContinuator<EOT>( 0, _pop.size() ),
|
||||
_masterRank ),
|
||||
MpiJob( algo, _masterRank ),
|
||||
func( _proc ),
|
||||
index( 0 ),
|
||||
size( _pop.size() ),
|
||||
data( _pop )
|
||||
{
|
||||
pa_continuator = static_cast<ParallelApplyContinuator<EOT>*>( _continuator );
|
||||
// empty
|
||||
}
|
||||
|
||||
virtual void sendTask( int wrkRank )
|
||||
|
|
@ -48,7 +26,6 @@ class ParallelApply : public MpiJob
|
|||
assignedTasks[ wrkRank ] = index;
|
||||
comm.send( wrkRank, 1, data[ index ] );
|
||||
++index;
|
||||
pa_continuator->index( index );
|
||||
}
|
||||
|
||||
virtual void handleResponse( int wrkRank )
|
||||
|
|
@ -64,11 +41,16 @@ class ParallelApply : public MpiJob
|
|||
comm.send( _masterRank, 1, ind );
|
||||
}
|
||||
|
||||
bool isFinished()
|
||||
{
|
||||
return index = size;
|
||||
}
|
||||
|
||||
protected:
|
||||
vector<EOT> & data;
|
||||
eoUF<EOT&, void>& func;
|
||||
int index;
|
||||
ParallelApplyContinuator<EOT> * pa_continuator;
|
||||
int size;
|
||||
std::map< int /* worker rank */, int /* index in vector */> assignedTasks;
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -45,31 +45,20 @@ class MpiNode
|
|||
static mpi::communicator _comm;
|
||||
};
|
||||
|
||||
struct BaseContinuator
|
||||
{
|
||||
virtual bool operator()() = 0;
|
||||
};
|
||||
|
||||
// template< typename EOT >
|
||||
class MpiJob
|
||||
{
|
||||
public:
|
||||
|
||||
MpiJob( AssignmentAlgorithm& algo, BaseContinuator* c, int masterRank ) :
|
||||
MpiJob( AssignmentAlgorithm& algo, int masterRank ) :
|
||||
assignmentAlgo( algo ),
|
||||
comm( MpiNode::comm() ),
|
||||
_masterRank( masterRank ),
|
||||
_continuator( c )
|
||||
_masterRank( masterRank )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
~MpiJob()
|
||||
{
|
||||
delete _continuator;
|
||||
}
|
||||
|
||||
// master
|
||||
virtual bool isFinished() = 0;
|
||||
virtual void sendTask( int wrkRank ) = 0;
|
||||
virtual void handleResponse( int wrkRank ) = 0;
|
||||
// worker
|
||||
|
|
@ -80,7 +69,7 @@ class MpiJob
|
|||
int totalWorkers = assignmentAlgo.size();
|
||||
cout << "[M] Have " << totalWorkers << " workers." << endl;
|
||||
|
||||
while( (*_continuator)() )
|
||||
while( ! isFinished() )
|
||||
{
|
||||
int assignee = assignmentAlgo.get( );
|
||||
cout << "[M] Assignee : " << assignee << endl;
|
||||
|
|
@ -98,31 +87,6 @@ class MpiJob
|
|||
sendTask( assignee );
|
||||
}
|
||||
|
||||
/*
|
||||
for( int i = 0, size = data.size();
|
||||
i < size;
|
||||
++i)
|
||||
{
|
||||
cout << "[M] Beginning loop for i = " << i << endl;
|
||||
int assignee = assignmentAlgo.get( );
|
||||
cout << "[M] Assignee : " << assignee << endl;
|
||||
while( assignee <= 0 )
|
||||
{
|
||||
cout << "[M] Waitin' for node..." << endl;
|
||||
mpi::status status = comm.probe( mpi::any_source, mpi::any_tag );
|
||||
int wrkRank = status.source();
|
||||
cout << "[M] Node " << wrkRank << " just terminated." << endl;
|
||||
handleResponse( wrkRank, assignedTasks[ wrkRank ] );
|
||||
assignmentAlgo.confirm( wrkRank );
|
||||
assignee = assignmentAlgo.get( );
|
||||
}
|
||||
cout << "[M] Assignee found : " << assignee << endl;
|
||||
assignedTasks[ assignee ] = i;
|
||||
comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue );
|
||||
sendTask( assignee, i );
|
||||
}
|
||||
*/
|
||||
|
||||
cout << "[M] Frees all the idle." << endl;
|
||||
// frees all the idle workers
|
||||
int idle;
|
||||
|
|
@ -176,7 +140,6 @@ class MpiJob
|
|||
|
||||
protected:
|
||||
AssignmentAlgorithm& assignmentAlgo;
|
||||
BaseContinuator* _continuator;
|
||||
mpi::communicator& comm;
|
||||
int _masterRank;
|
||||
};
|
||||
|
|
|
|||
Reference in a new issue