# ifndef __EO_PARALLEL_APPLY_H__ # define __EO_PARALLEL_APPLY_H__ # include "eoMpi.h" # include # include namespace eo { namespace mpi { struct ParallelApplyAssignment { int index; int size; }; template struct ParallelApplyData { ParallelApplyData( eoUF & _proc, std::vector & _pop, int _masterRank, // long _maxTime = 0, int _packetSize ) : _data( &_pop ), func( _proc ), index( 0 ), size( _pop.size() ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() ) { if ( _packetSize <= 0 ) { throw std::runtime_error("Packet size should not be negative."); } tempArray = new EOT[ _packetSize ]; } void init( std::vector& _pop ) { index = 0; size = _pop.size(); _data = &_pop; assignedTasks.clear(); } ~ParallelApplyData() { delete [] tempArray; } std::vector& data() { return *_data; } std::vector * _data; eoUF & func; int index; int size; std::map< int /* worker rank */, ParallelApplyAssignment /* min indexes in vector */> assignedTasks; int packetSize; EOT* tempArray; int masterRank; bmpi::communicator& comm; }; template< class EOT > class SendTaskParallelApply : public SendTaskFunction< ParallelApplyData > { public: using SendTaskFunction< ParallelApplyData >::d; SendTaskParallelApply( SendTaskParallelApply * w = 0 ) : SendTaskFunction< ParallelApplyData >( w ) { // empty } void operator()(int wrkRank) { int futureIndex; if( d->index + d->packetSize < d->size ) { futureIndex = d->index + d->packetSize; } else { futureIndex = d->size; } int sentSize = futureIndex - d->index ; d->comm.send( wrkRank, 1, sentSize ); eo::log << eo::progress << "Evaluating individual " << d->index << std::endl; d->assignedTasks[ wrkRank ].index = d->index; d->assignedTasks[ wrkRank ].size = sentSize; d->comm.send( wrkRank, 1, & ( (d->data())[ d->index ] ) , sentSize ); d->index = futureIndex; } }; template< class EOT > class HandleResponseParallelApply : public HandleResponseFunction< ParallelApplyData > { public: using HandleResponseFunction< ParallelApplyData >::d; HandleResponseParallelApply( HandleResponseParallelApply * w = 0 ) : HandleResponseFunction< ParallelApplyData >( w ) { // empty } void operator()(int wrkRank) { d->comm.recv( wrkRank, 1, & (d->data()[ d->assignedTasks[wrkRank].index ] ), d->assignedTasks[wrkRank].size ); } }; template< class EOT > class ProcessTaskParallelApply : public ProcessTaskFunction< ParallelApplyData > { public: using ProcessTaskFunction< ParallelApplyData >::d; ProcessTaskParallelApply( ProcessTaskParallelApply * w = 0 ) : ProcessTaskFunction< ParallelApplyData >( w ) { // empty } void operator()() { int recvSize; d->comm.recv( d->masterRank, 1, recvSize ); d->comm.recv( d->masterRank, 1, d->tempArray, recvSize ); timerStat.start("worker_processes"); for( int i = 0; i < recvSize ; ++i ) { d->func( d->tempArray[ i ] ); } timerStat.stop("worker_processes"); d->comm.send( d->masterRank, 1, d->tempArray, recvSize ); } }; template< class EOT > class IsFinishedParallelApply : public IsFinishedFunction< ParallelApplyData > { public: using IsFinishedFunction< ParallelApplyData >::d; IsFinishedParallelApply( IsFinishedParallelApply * w = 0 ) : IsFinishedFunction< ParallelApplyData >( w ) { // empty } bool operator()() { return d->index == d->size; } }; template< class EOT > struct ParallelApplyStore : public JobStore< ParallelApplyData > { using JobStore< ParallelApplyData >::_stf; using JobStore< ParallelApplyData >::_hrf; using JobStore< ParallelApplyData >::_ptf; using JobStore< ParallelApplyData >::_iff; ParallelApplyStore( eoUF & _proc, std::vector& _pop, int _masterRank, // long _maxTime = 0, int _packetSize = 1, // JobStore functors SendTaskParallelApply * stpa = new SendTaskParallelApply, HandleResponseParallelApply* hrpa = new HandleResponseParallelApply, ProcessTaskParallelApply* ptpa = new ProcessTaskParallelApply, IsFinishedParallelApply* ifpa = new IsFinishedParallelApply ) : _data( _proc, _pop, _masterRank, _packetSize ) { _stf = stpa; _hrf = hrpa; _ptf = ptpa; _iff = ifpa; } ParallelApplyData* data() { return &_data; } virtual ~ParallelApplyStore() { } protected: ParallelApplyData _data; }; // TODO commentaire : impossible de faire un typedef sur un template sans passer // par un traits => complique la tâche de l'utilisateur pour rien. template< typename EOT > class ParallelApply : public MultiJob< ParallelApplyData > { public: ParallelApply( AssignmentAlgorithm & algo, int _masterRank, ParallelApplyStore & store ) : MultiJob< ParallelApplyData >( algo, _masterRank, store ) { // empty } }; } } # endif // __EO_PARALLEL_APPLY_H__