Static assignement algorithm

This commit is contained in:
Benjamin Bouvier 2012-06-22 16:13:08 +02:00
commit cf5317f614
2 changed files with 106 additions and 19 deletions

View file

@ -1,21 +1,24 @@
# ifndef __ASSIGNMENT_ALGORITHM_H__
# define __ASSIGNMENT_ALGORITHM_H__
# include <vector>
struct AssignmentAlgorithm
{
virtual int get( ) = 0;
virtual int size( ) = 0;
virtual int availableWorkers( ) = 0;
virtual void confirm( int wrkRank ) = 0;
virtual std::vector<int> idles( ) = 0;
};
struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm
{
public:
DynamicAssignmentAlgorithm( int offset, int size )
DynamicAssignmentAlgorithm( int first, int last )
{
for( int i = 0; offset + i <= size; ++i)
for( int i = first; i <= last; ++i)
{
availableWrk.push_back( offset + i );
availableWrk.push_back( i );
}
}
@ -30,7 +33,7 @@ struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm
return assignee;
}
int size()
int availableWorkers()
{
return availableWrk.size();
}
@ -40,10 +43,99 @@ struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm
availableWrk.push_back( rank );
}
std::vector<int> idles( )
{
return availableWrk;
}
protected:
std::vector< int > availableWrk;
};
struct StaticAssignmentAlgorithm : public AssignmentAlgorithm
{
public:
StaticAssignmentAlgorithm( int first, int last, int runs )
{
unsigned int nbWorkers = last - first + 1;
freeWorkers = nbWorkers;
offset = first;
attributions.reserve( nbWorkers );
busy.resize( nbWorkers, false );
// Let be the euclidean division of runs by nbWorkers :
// runs == q * nbWorkers + r, 0 <= r < nbWorkers
// This one liner affects q requests to each worker
for (unsigned int i = 0; i < nbWorkers; attributions[i++] = runs / nbWorkers) ;
// The first line computes r and the one liner affects the remaining
// r requests to workers, in ascending order
unsigned int diff = runs - (runs / nbWorkers) * nbWorkers;
for (unsigned int i = 0; i < diff; ++attributions[i++]);
}
int get( )
{
int assignee = -1;
for( unsigned i = 0; i < busy.size(); ++i )
{
if( !busy[i] && attributions[i] > 0 )
{
busy[i] = true;
--freeWorkers;
assignee = realRank( i );
break;
}
}
return assignee;
}
int availableWorkers( )
{
return freeWorkers;
}
std::vector<int> idles()
{
std::vector<int> ret;
for(unsigned int i = 0; i < busy.size(); ++i)
{
if( !busy[i] )
{
eo::log << "Idle : " << realRank(i) <<
" / attributions : " << attributions[i] << std::endl;
ret.push_back( realRank(i) );
}
}
afterIdle = true;
return ret;
}
void confirm( int rank )
{
int i = attributionsIndex( rank );
--attributions[ i ];
busy[ i ] = false;
++freeWorkers;
}
private:
int attributionsIndex( int rank )
{
return rank - offset;
}
int realRank( int index )
{
return index + offset;
}
std::vector<int> attributions;
std::vector<bool> busy;
bool afterIdle;
int runs;
int offset;
unsigned int freeWorkers;
};
# endif // __ASSIGNMENT_ALGORITHM_H__

View file

@ -65,14 +65,13 @@ class MpiJob
void master( )
{
int totalWorkers = assignmentAlgo.size();
int totalWorkers = assignmentAlgo.availableWorkers();
eo::log << eo::debug;
eo::log << "[M] Have " << totalWorkers << " workers." << std::endl;
while( ! isFinished() )
{
int assignee = assignmentAlgo.get( );
eo::log << "[M] Assignee : " << assignee << std::endl;
while( assignee <= 0 )
{
eo::log << "[M] Waitin' for node..." << std::endl;
@ -83,27 +82,22 @@ class MpiJob
assignmentAlgo.confirm( wrkRank );
assignee = assignmentAlgo.get( );
}
eo::log << "[M] Assignee : " << assignee << std::endl;
comm.send( assignee, EoMpi::Channel::Commands, EoMpi::Message::Continue );
sendTask( assignee );
}
eo::log << "[M] Frees all the idle." << std::endl;
// frees all the idle workers
int idle;
std::vector<int> idles;
while ( ( idle = assignmentAlgo.get( ) ) > 0 )
std::vector<int> idles = assignmentAlgo.idles();
for(unsigned int i = 0; i < idles.size(); ++i)
{
comm.send( idle, EoMpi::Channel::Commands, EoMpi::Message::Finish );
idles.push_back( idle );
}
for (unsigned int i = 0; i < idles.size(); ++i)
{
assignmentAlgo.confirm( idles[i] );
comm.send( idles[i], EoMpi::Channel::Commands, EoMpi::Message::Finish );
}
eo::log << "[M] Waits for all responses." << std::endl;
// wait for all responses
while( assignmentAlgo.size() != totalWorkers )
while( assignmentAlgo.availableWorkers() != totalWorkers )
{
mpi::status status = comm.probe( mpi::any_source, mpi::any_tag );
int wrkRank = status.source();
@ -121,14 +115,15 @@ class MpiJob
eo::log << eo::debug;
while( true )
{
eo::log << "[W] Waiting for an order..." << std::endl;
eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl;
comm.recv( masterRank, EoMpi::Channel::Commands, order );
if ( order == EoMpi::Message::Finish )
{
eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl;
return;
} else
{
eo::log << "[W] Processing task..." << std::endl;
eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl;
processTask( );
}
}