Merge /home/nojhan/mnt/alevol/eo

This commit is contained in:
Johann Dreo 2012-08-10 11:32:16 +02:00
commit a308303d12
11 changed files with 269 additions and 77 deletions

View file

@ -314,6 +314,9 @@ namespace eo
* This is a functor implementing void operator()(int), and also a shared data function, containing wrapper on
* its own type.
*
* The master has to receive worker's data on channel (= MPI tag) eo::mpi::Channel::Messages. No other tags are
* allowed.
*
* @ingroup MPI
*/
template< typename JobData >
@ -335,7 +338,8 @@ namespace eo
* This is where the real computation happen.
* Whenever the master sends the command "Continue" to workers, which indicates the worker will receive a task,
* the worker calls this functor. The user has to explicitly retrieve the data, handle it and transmit it,
* processed, back to the master. If the worker does not send any data back to the master, the latter will
* processed, back to the master. Data sent back needs to be transmitted via channel (= MPI tag)
* eo::mpi::Channel::Messages, and no one else. If the worker does not send any data back to the master, the latter will
* consider the worker isn't done and a deadlock could occur.
*
* This is a functor implementing void operator()(), and also a shared data function, containing wrapper on its
@ -447,10 +451,53 @@ namespace eo
IsFinishedFunction<JobData> & isFinished() { return *_iff; }
// Setters
void sendTask( SendTaskFunction<JobData>* stf ) { _stf = stf; }
void handleResponse( HandleResponseFunction<JobData>* hrf ) { _hrf = hrf; }
void processTask( ProcessTaskFunction<JobData>* ptf ) { _ptf = ptf; }
void isFinished( IsFinishedFunction<JobData>* iff ) { _iff = iff; }
void sendTask( SendTaskFunction<JobData>* stf )
{
if( !stf )
return;
if( _stf && _stf->needDelete() )
{
delete _stf;
}
_stf = stf;
}
void handleResponse( HandleResponseFunction<JobData>* hrf )
{
if( !hrf )
return;
if( _hrf && _hrf->needDelete() )
{
delete _hrf;
}
_hrf = hrf;
}
void processTask( ProcessTaskFunction<JobData>* ptf )
{
if( !ptf )
return;
if( _ptf && _ptf->needDelete() )
{
delete _ptf;
}
_ptf = ptf;
}
void isFinished( IsFinishedFunction<JobData>* iff )
{
if( !iff )
return;
if( _iff && _iff->needDelete() )
{
delete _iff;
}
_iff = iff;
}
/**
* @brief Helpers for wrapping send task functor.
@ -605,10 +652,8 @@ namespace eo
~FinallyBlock()
{
# ifndef NDEBUG
eo::log << eo::debug;
eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl;
# endif
eo::log << eo::debug << "[M" << comm.rank() << "] Frees all the idle." << std::endl;
// frees all the idle workers
timerStat.start("master_wait_for_idles");
std::vector<int> idles = assignmentAlgo.idles();
@ -618,23 +663,21 @@ namespace eo
}
timerStat.stop("master_wait_for_idles");
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl;
# endif
eo::log << eo::debug << "[M" << comm.rank() << "] Waits for all responses." << std::endl;
// wait for all responses
timerStat.start("master_wait_for_all_responses");
while( assignmentAlgo.availableWorkers() != totalWorkers )
{
bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag );
bmpi::status status = comm.probe( bmpi::any_source, eo::mpi::Channel::Messages );
int wrkRank = status.source();
that.handleResponse( wrkRank );
comm.send( wrkRank, Channel::Commands, Message::Finish );
assignmentAlgo.confirm( wrkRank );
}
timerStat.stop("master_wait_for_all_responses");
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl;
# endif
eo::log << eo::debug << "[M" << comm.rank() << "] Leaving master task." << std::endl;
}
protected:
@ -658,10 +701,8 @@ namespace eo
void master( )
{
int totalWorkers = assignmentAlgo.availableWorkers();
# ifndef NDEBUG
eo::log << eo::debug;
eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl;
# endif
eo::log << eo::debug << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl;
try {
FinallyBlock finally( totalWorkers, assignmentAlgo, *this );
while( ! isFinished() )
@ -670,22 +711,20 @@ namespace eo
int assignee = assignmentAlgo.get( );
while( assignee <= 0 )
{
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl;
# endif
bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag );
eo::log << eo::debug << "[M" << comm.rank() << "] Waitin' for node..." << std::endl;
bmpi::status status = comm.probe( bmpi::any_source, eo::mpi::Channel::Messages );
int wrkRank = status.source();
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl;
# endif
eo::log << eo::debug << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl;
handleResponse( wrkRank );
assignmentAlgo.confirm( wrkRank );
assignee = assignmentAlgo.get( );
}
timerStat.stop("master_wait_for_assignee");
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl;
# endif
eo::log << eo::debug << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl;
timerStat.start("master_wait_for_send");
comm.send( assignee, Channel::Commands, Message::Continue );
@ -709,29 +748,22 @@ namespace eo
void worker( )
{
int order;
# ifndef NDEBUG
eo::log << eo::debug;
# endif
timerStat.start("worker_wait_for_order");
comm.recv( masterRank, Channel::Commands, order );
timerStat.stop("worker_wait_for_order");
while( true )
{
# ifndef NDEBUG
eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl;
# endif
eo::log << eo::debug << "[W" << comm.rank() << "] Waiting for an order..." << std::endl;
if ( order == workerStopCondition )
{
# ifndef NDEBUG
eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl;
# endif
eo::log << eo::debug << "[W" << comm.rank() << "] Leaving worker task." << std::endl;
return;
} else if( order == Message::Continue )
{
# ifndef NDEBUG
eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl;
# endif
eo::log << eo::debug << "[W" << comm.rank() << "] Processing task..." << std::endl;
processTask( );
}

View file

@ -99,7 +99,7 @@ namespace eo
* STATIC ASSIGNMENT ALGORITHM **************************
*******************************************************/
StaticAssignmentAlgorithm::StaticAssignmentAlgorithm( std::vector<int>& workers, int runs )
StaticAssignmentAlgorithm::StaticAssignmentAlgorithm( const std::vector<int>& workers, int runs )
{
init( workers, runs );
}
@ -138,7 +138,7 @@ namespace eo
init( workers, runs );
}
void StaticAssignmentAlgorithm::init( std::vector<int> & workers, int runs )
void StaticAssignmentAlgorithm::init( const std::vector<int> & workers, int runs )
{
unsigned int nbWorkers = workers.size();
freeWorkers = nbWorkers;

View file

@ -177,7 +177,7 @@ namespace eo
* @param workers std::vector of MPI ranks of workers which will be used.
* @param runs Fixed amount of runs, strictly positive.
*/
StaticAssignmentAlgorithm( std::vector<int>& workers, int runs );
StaticAssignmentAlgorithm( const std::vector<int>& workers, int runs );
/**
* @brief Uses a range of workers.
@ -215,7 +215,7 @@ namespace eo
* @param workers Vector of hosts' ranks
* @param runs Fixed amount of runs, strictly positive.
*/
void init( std::vector<int> & workers, int runs );
void init( const std::vector<int> & workers, int runs );
public:
int get( );

View file

@ -53,10 +53,14 @@ namespace eo
{
typedef eoUF< eoPop<EOT>&, void> ResetAlgo;
MultiStartData( bmpi::communicator& _comm, eoAlgo<EOT>& _algo, int _masterRank, ResetAlgo & _resetAlgo )
MultiStartData(
bmpi::communicator& _comm,
eoAlgo<EOT>& _algo,
int _masterRank,
ResetAlgo & _resetAlgo )
:
runs( 0 ), pop(), bests(),
comm( _comm ), algo( _algo ), masterRank( _masterRank ), resetAlgo( _resetAlgo )
runs( 0 ), bests(), pop(),
comm( _comm ), algo( _algo ), resetAlgo( _resetAlgo ), masterRank( _masterRank )
{
// empty
}
@ -134,7 +138,7 @@ namespace eo
{
EOT individual;
MultiStartData< EOT >& d = *_data;
d.comm.recv( wrkRank, 1, individual );
d.comm.recv( wrkRank, eo::mpi::Channel::Messages, individual );
d.bests.push_back( individual );
}
};
@ -155,7 +159,7 @@ namespace eo
{
_data->resetAlgo( _data->pop );
_data->algo( _data->pop );
_data->comm.send( _data->masterRank, 1, _data->pop.best_element() );
_data->comm.send( _data->masterRank, eo::mpi::Channel::Messages, _data->pop.best_element() );
}
};
@ -245,28 +249,28 @@ namespace eo
{
_data.runs = runs;
int nbWorkers = workers.size();
unsigned nbWorkers = workers.size();
std::vector< int > seeds = _getSeeds( nbWorkers );
if( eo::mpi::Node::comm().rank() == _masterRank )
{
if( seeds.size() < nbWorkers )
{
// Random seeds
for( int i = seeds.size(); i < nbWorkers; ++i )
for( unsigned i = seeds.size(); i < nbWorkers; ++i )
{
seeds.push_back( eo::rng.rand() );
}
}
for( int i = 0 ; i < nbWorkers ; ++i )
for( unsigned i = 0 ; i < nbWorkers ; ++i )
{
int wrkRank = workers[i];
eo::mpi::Node::comm().send( wrkRank, 1, seeds[ i ] );
eo::mpi::Node::comm().send( wrkRank, eo::mpi::Channel::Commands, seeds[ i ] );
}
} else
{
int seed;
eo::mpi::Node::comm().recv( _masterRank, 1, seed );
eo::mpi::Node::comm().recv( _masterRank, eo::mpi::Channel::Commands, seed );
eo::log << eo::debug << eo::mpi::Node::comm().rank() << "- Seed: " << seed << std::endl;
eo::rng.reseed( seed );
}
@ -298,8 +302,7 @@ namespace eo
int masterRank,
MultiStartStore< EOT > & store,
// dynamic parameters
int runs,
const std::vector<int>& seeds = std::vector<int>() ) :
int runs ) :
OneShotJob< MultiStartData< EOT > >( algo, masterRank, store )
{
store.init( algo.idles(), runs );
@ -397,7 +400,7 @@ namespace eo
/**************************************
* DEFAULT RESET ALGO IMPLEMENTATIONS *
**************************************
**************************************/
/**
* @brief For a Genetic Algorithm, reinits the population by copying the original one
@ -415,7 +418,19 @@ namespace eo
eoEvalFunc<EOT>& eval) :
_continuator( continuator ),
_originalPop( originalPop ),
_eval( eval )
_pop_eval( eval )
{
// empty
}
ReuseOriginalPopEA(
eoCountContinue<EOT> & continuator,
const eoPop<EOT>& originalPop,
eoPopEvalFunc<EOT>& pop_eval
) :
_continuator( continuator ),
_originalPop( originalPop ),
_pop_eval( pop_eval )
{
// empty
}
@ -423,17 +438,14 @@ namespace eo
void operator()( eoPop<EOT>& pop )
{
pop = _originalPop; // copies the original population
for(unsigned i = 0, size = pop.size(); i < size; ++i)
{
_eval( pop[i] );
}
_pop_eval( pop, pop );
_continuator.reset();
}
private:
eoCountContinue<EOT> & _continuator;
const eoPop<EOT>& _originalPop;
eoEvalFunc<EOT>& _eval;
eoPopEvalFunc<EOT>& _pop_eval;
};
/**
@ -465,6 +477,18 @@ namespace eo
}
}
ReuseSamePopEA(
eoCountContinue<EOT>& continuator,
const eoPop<EOT>& originalPop,
eoPopEvalFunc<EOT>& pop_eval
) :
_continuator( continuator ),
_originalPop( originalPop ),
_firstTime( true )
{
pop_eval( _originalPop, _originalPop );
}
void operator()( eoPop<EOT>& pop )
{
if( _firstTime )

View file

@ -173,14 +173,14 @@ namespace eo
int sentSize = futureIndex - _data->index ;
_data->comm.send( wrkRank, 1, sentSize );
_data->comm.send( wrkRank, eo::mpi::Channel::Messages, sentSize );
eo::log << eo::progress << "Evaluating individual " << _data->index << std::endl;
eo::log << eo::debug << "Evaluating individual " << _data->index << std::endl;
_data->assignedTasks[ wrkRank ].index = _data->index;
_data->assignedTasks[ wrkRank ].size = sentSize;
_data->comm.send( wrkRank, 1, & ( (_data->table())[ _data->index ] ) , sentSize );
_data->comm.send( wrkRank, eo::mpi::Channel::Messages, & ( (_data->table())[ _data->index ] ) , sentSize );
_data->index = futureIndex;
}
};
@ -203,7 +203,7 @@ namespace eo
void operator()(int wrkRank)
{
_data->comm.recv( wrkRank, 1, & (_data->table()[ _data->assignedTasks[wrkRank].index ] ), _data->assignedTasks[wrkRank].size );
_data->comm.recv( wrkRank, eo::mpi::Channel::Messages, & (_data->table()[ _data->assignedTasks[wrkRank].index ] ), _data->assignedTasks[wrkRank].size );
}
};
@ -230,16 +230,16 @@ namespace eo
{
int recvSize;
_data->comm.recv( _data->masterRank, 1, recvSize );
_data->comm.recv( _data->masterRank, eo::mpi::Channel::Messages, recvSize );
_data->tempArray.resize( recvSize );
_data->comm.recv( _data->masterRank, 1, & _data->tempArray[0] , recvSize );
_data->comm.recv( _data->masterRank, eo::mpi::Channel::Messages, & _data->tempArray[0] , recvSize );
timerStat.start("worker_processes");
for( int i = 0; i < recvSize ; ++i )
{
_data->func( _data->tempArray[ i ] );
}
timerStat.stop("worker_processes");
_data->comm.send( _data->masterRank, 1, & _data->tempArray[0], recvSize );
_data->comm.send( _data->masterRank, eo::mpi::Channel::Messages, & _data->tempArray[0], recvSize );
}
};

View file

@ -68,9 +68,9 @@ private:
eoValueParam<bool> _isDynamic;
eoValueParam<std::string> _prefix;
eoValueParam<unsigned int> _nthreads;
eoValueParam<unsigned int> _packetSize;
eoValueParam<bool> _enableResults;
eoValueParam<bool> _doMeasure;
eoValueParam<unsigned int> _packetSize;
double _t_start;
};

View file

@ -32,6 +32,7 @@ SET (TEST_LIST
t-mpi-multipleRoles
t-mpi-eval
t-mpi-multistart
t-mpi-distrib-exp
)
FOREACH (test ${TEST_LIST})

View file

@ -0,0 +1,135 @@
# include <unistd.h> // usleep
# include <iostream>
# include <string>
# include <vector>
# include <eo>
# include <mpi/eoParallelApply.h>
# include "../test/mpi/t-mpi-common.h"
using namespace eo::mpi;
typedef SerializableBase<int> type;
struct Wait : public eoUF< type &, void >
{
void operator()( type & milliseconds )
{
std::cout << "Sleeping for " << milliseconds << "ms..." << std::endl;
// usleep takes an input in microseconds
usleep( milliseconds * 1000 );
}
} wait;
class Distribution : public std::vector< type >
{
public:
/**
* @brief Really fills the vector with the distribution values.
*/
void fill( unsigned size )
{
for( unsigned i = 0; i < size; ++i )
{
push_back( next_element() );
}
}
/**
* @brief Returns the next element of the distribution to put in the
* vector.
*
* @returns Number of milliseconds to wait
*/
virtual int next_element() = 0;
// Idea for function name: enlarge_your_parser
/**
* @brief Creates params and retrieves values from parser
*/
virtual void make_parser( eoParser & parser ) = 0;
/**
* @brief Returns true if this distribution has been activated by the
* command line.
*
* Serves to main program to check if at least one distribution has been
* activated.
*/
bool isActive() { return _active; }
protected:
bool _active;
};
class UniformDistribution : public Distribution
{
public:
UniformDistribution() : _rng(0)
{
// empty
}
void make_parser( eoParser & parser )
{
_active = parser.createParam( false, "uniform", "Uniform distribution", '\0', "Uniform").value();
_min = parser.createParam( 0.0, "uniform-min", "Minimum for uniform distribution", '\0', "Uniform").value();
_max = parser.createParam( 1.0, "uniform-max", "Maximum for uniform distribution", '\0', "Uniform").value();
}
int next_element()
{
return std::floor( 1000. * _rng.uniform( _min, _max ) );
}
protected:
eoRng _rng;
double _min;
double _max;
} uniformDistribution;
int main( int argc, char** argv )
{
Node::init( argc, argv );
eoParser parser( argc, argv );
// TODO for each available distribution, check if activated.
// If no distribution is activated, show an error message
// If two distributions or more are activated, show an error message
// Otherwise, use the activated distribution as distrib
Distribution & distrib = uniformDistribution;
// Make parser of distribution here
distrib.make_parser( parser );
unsigned size = parser.createParam( 10U, "size", "Number of elements to distribute.", 's', "Distribution").value();
unsigned packet_size = parser.createParam( 1U, "packet_size", "Number of elements to distribute at each time for a single worker.", 'p', "Parallelization").value();
make_parallel( parser );
make_help( parser );
ParallelApplyStore< type> store( wait, DEFAULT_MASTER, packet_size );
// Fill distribution
distrib.fill( size );
store.data( distrib );
DynamicAssignmentAlgorithm scheduling;
ParallelApply< type > job( scheduling, DEFAULT_MASTER, store );
job.run();
if( job.isMaster() )
{
EmptyJob( scheduling, DEFAULT_MASTER ); // to terminate parallel apply
}
return 0;
}

View file

@ -118,7 +118,7 @@ struct Work: public eoUF< SerializableVector< SerializableBase<int> >&, void >
{
cout << "Work phase..." << endl;
subtask( v, Node::comm().rank() );
for( int i = 0; i < v.size(); ++i )
for( unsigned i = 0; i < v.size(); ++i )
{
v[i] *= 2;
}
@ -168,7 +168,7 @@ int main(int argc, char** argv)
EmptyJob stop( algo, 0 );
v = metaV[0];
cout << "Results : " << endl;
for(int i = 0; i < v.size(); ++i)
for(unsigned i = 0; i < v.size(); ++i)
{
cout << v[i] << ' ';
}

View file

@ -198,7 +198,7 @@ int main(int argc, char** argv)
// job.
EmptyJob stop( *(tests[i].assign), eo::mpi::DEFAULT_MASTER );
++offset;
for(int i = 0; i < v.size(); ++i)
for(unsigned i = 0; i < v.size(); ++i)
{
cout << v[i] << ' ';
if( originalV[i] + offset != v[i] )

View file

@ -116,7 +116,7 @@ int main(int argc, char** argv)
if( job.isMaster() )
{
++offset;
for(int i = 0; i < v.size(); ++i)
for(unsigned i = 0; i < v.size(); ++i)
{
cout << v[i] << ' ';
if( originalV[i] + offset != v[i] )