/* (c) Thales group, 2012 This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; version 2 of the License. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Contact: http://eodev.sourceforge.net Authors: Benjamin Bouvier */ # ifndef __EO_PARALLEL_APPLY_H__ # define __EO_PARALLEL_APPLY_H__ # include "eoMpi.h" # include // eoUF # include // std::vector population /** * @file eoParallelApply.h * * @brief Applies a functor with single parameter to elements of a table, in a parallel fashion. * * This file contains all the required classes to do a parallel apply of a table, in a parallel fashion. This can be * very useful when applying the function can be made without any dependances within the data. In EO, it occurs in * particular during the evaluation: the number of individuals to evaluate can be really high, and the evaluation of one * individual is independant from the evaluation of other individuals. * * Elements in the table are directly replaced, as the table is given by reference. No new table is made during the * process. * * User can tune this job, indicating how many elements of the table should be sent and evaluated by a worker, at a * time: this is called the "packet size", as individuals are groupped into a packet of individuals which are sent to * the worker before evaluation. The problem of choosing the optimal packet size is beyond the purposes of this documentation * and deserves a theoritical study. * * This job is the parallel equivalent to the function apply, defined in apply.h. It just applies the function to * every element of a table. In Python or Javascript, it's the equivalent of the function Map. */ namespace eo { namespace mpi { /** * @brief Structure used to save assignment to a worker, i.e which slice of the table it has to process. * * This slice is defined by the index of the first evaluated argument and the number of processed elements. */ struct ParallelApplyAssignment { int index; int size; }; /** * @brief Data useful for a parallel apply (map). * * A parallel apply needs at least the functor to apply to every element of the table, and the table itself, * whereas it can be set later with the function init(). Master rank is also needed, to send it informations and * receive informations from it, inside the functors (the job knows these values, but the functors don't). The * size of a packet can be tuned here. * * Internal attributes contain: * - (useful for master) the index of the next element to be evaluated. * - (useful for master) a map containing links between MPI ranks and slices of the table which the worker with * this rank has evaluated. Without this map, when receiving results from a worker, the master couldn't be * able to replace the right elements in the table. * * @ingroup Parallel */ template struct ParallelApplyData { /** * @brief Ctor for Parallel apply (map) data. * * @param _proc The functor to apply on each element in the table * @param _masterRank The MPI rank of the master * @param _packetSize The number of elements on which the function will be applied by the worker, at a time. * @param _pop The table to apply. If this value is NULL, user will have to call init() before launching the * job. */ ParallelApplyData( eoUF & _proc, int _masterRank, int _packetSize, std::vector * _pop = 0 ) : _data( _pop ), func( _proc ), index( 0 ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() ) { if ( _packetSize <= 0 ) { throw std::runtime_error("Packet size should not be negative."); } if( _pop ) { size = _pop->size(); } } /** * @brief Reinitializes the data for a new table to evaluate. */ void init( std::vector& _pop ) { index = 0; size = _pop.size(); _data = &_pop; assignedTasks.clear(); } std::vector& data() { return *_data; } // All elements are public since functors will often use them. std::vector * _data; eoUF & func; int index; int size; std::map< int /* worker rank */, ParallelApplyAssignment /* last assignment */> assignedTasks; int packetSize; std::vector tempArray; int masterRank; bmpi::communicator& comm; }; /** * @brief Send task functor implementation for the parallel apply (map) job. * * Master side: Sends a slice of the table to evaluate to the worker. * * Implementation details: * Finds the next slice of data to send to the worker, sends first the size and then the data, and memorizes * that this slice has been distributed to the worker, then updates the next position of element to evaluate. */ template< class EOT > class SendTaskParallelApply : public SendTaskFunction< ParallelApplyData > { public: using SendTaskFunction< ParallelApplyData >::d; SendTaskParallelApply( SendTaskParallelApply * w = 0 ) : SendTaskFunction< ParallelApplyData >( w ) { // empty } void operator()(int wrkRank) { int futureIndex; if( d->index + d->packetSize < d->size ) { futureIndex = d->index + d->packetSize; } else { futureIndex = d->size; } int sentSize = futureIndex - d->index ; d->comm.send( wrkRank, 1, sentSize ); eo::log << eo::progress << "Evaluating individual " << d->index << std::endl; d->assignedTasks[ wrkRank ].index = d->index; d->assignedTasks[ wrkRank ].size = sentSize; d->comm.send( wrkRank, 1, & ( (d->data())[ d->index ] ) , sentSize ); d->index = futureIndex; } }; /** * @brief Handle response functor implementation for the parallel apply (map) job. * * Master side: Replaces the slice of data attributed to the worker in the table. */ template< class EOT > class HandleResponseParallelApply : public HandleResponseFunction< ParallelApplyData > { public: using HandleResponseFunction< ParallelApplyData >::d; HandleResponseParallelApply( HandleResponseParallelApply * w = 0 ) : HandleResponseFunction< ParallelApplyData >( w ) { // empty } void operator()(int wrkRank) { d->comm.recv( wrkRank, 1, & (d->data()[ d->assignedTasks[wrkRank].index ] ), d->assignedTasks[wrkRank].size ); } }; /** * @brief Process task functor implementation for the parallel apply (map) job. * * Worker side: apply the function to the given slice of data. * * Implementation details: retrieves the number of elements to evaluate, retrieves them, applies the function * and then returns the results. */ template< class EOT > class ProcessTaskParallelApply : public ProcessTaskFunction< ParallelApplyData > { public: using ProcessTaskFunction< ParallelApplyData >::d; ProcessTaskParallelApply( ProcessTaskParallelApply * w = 0 ) : ProcessTaskFunction< ParallelApplyData >( w ) { // empty } void operator()() { int recvSize; d->comm.recv( d->masterRank, 1, recvSize ); d->tempArray.resize( recvSize ); d->comm.recv( d->masterRank, 1, & d->tempArray[0] , recvSize ); timerStat.start("worker_processes"); for( int i = 0; i < recvSize ; ++i ) { d->func( d->tempArray[ i ] ); } timerStat.stop("worker_processes"); d->comm.send( d->masterRank, 1, & d->tempArray[0], recvSize ); } }; /** * @brief Is finished functor implementation for the parallel apply (map) job. * * Master side: returns true if and only if the whole table has been evaluated. The job is also terminated only * when the whole table has been evaluated. */ template< class EOT > class IsFinishedParallelApply : public IsFinishedFunction< ParallelApplyData > { public: using IsFinishedFunction< ParallelApplyData >::d; IsFinishedParallelApply( IsFinishedParallelApply * w = 0 ) : IsFinishedFunction< ParallelApplyData >( w ) { // empty } bool operator()() { return d->index == d->size; } }; /** * @brief Store containing all the datas and the functors for the parallel apply (map) job. * * User can tune functors when constructing the object. For each functor which is not given, a default one is * generated. * * @ingroup Parallel */ template< class EOT > struct ParallelApplyStore : public JobStore< ParallelApplyData > { using JobStore< ParallelApplyData >::_stf; using JobStore< ParallelApplyData >::_hrf; using JobStore< ParallelApplyData >::_ptf; using JobStore< ParallelApplyData >::_iff; /** * @brief Main constructor for the parallel apply (map) job. * * @param _proc The procedure to apply to each element of the table. * @param _masterRank The rank of the master process. * @param _packetSize The number of elements of the table to be evaluated at a time, by the worker. * @param stpa Pointer to Send Task parallel apply functor descendant. If null, a default one is used. * @param hrpa Pointer to Handle Response parallel apply functor descendant. If null, a default one is used. * @param ptpa Pointer to Process Task parallel apply functor descendant. If null, a default one is used. * @param ifpa Pointer to Is Finished parallel apply functor descendant. If null, a default one is used. */ ParallelApplyStore( eoUF & _proc, int _masterRank, int _packetSize = 1, // JobStore functors SendTaskParallelApply * stpa = 0, HandleResponseParallelApply* hrpa = 0, ProcessTaskParallelApply* ptpa = 0, IsFinishedParallelApply* ifpa = 0 ) : _data( _proc, _masterRank, _packetSize ) { if( stpa == 0 ) { stpa = new SendTaskParallelApply; stpa->needDelete( true ); } if( hrpa == 0 ) { hrpa = new HandleResponseParallelApply; hrpa->needDelete( true ); } if( ptpa == 0 ) { ptpa = new ProcessTaskParallelApply; ptpa->needDelete( true ); } if( ifpa == 0 ) { ifpa = new IsFinishedParallelApply; ifpa->needDelete( true ); } _stf = stpa; _hrf = hrpa; _ptf = ptpa; _iff = ifpa; } ParallelApplyData* data() { return &_data; } /** * @brief Reinits the store with a new table to evaluate. * * @param _pop The table of elements to be evaluated. */ void data( std::vector& _pop ) { _data.init( _pop ); } virtual ~ParallelApplyStore() // for inheritance purposes only { } protected: ParallelApplyData _data; }; /** * @brief Parallel apply job. Present for convenience only. * * A typedef wouldn't have been working, as typedef on templates don't work in C++. Traits would be a * disgraceful overload for the user. * * @ingroup Parallel * @see eoParallelApply.h */ template< typename EOT > class ParallelApply : public MultiJob< ParallelApplyData > { public: ParallelApply( AssignmentAlgorithm & algo, int _masterRank, ParallelApplyStore & store ) : MultiJob< ParallelApplyData >( algo, _masterRank, store ) { // empty } }; } } # endif // __EO_PARALLEL_APPLY_H__