/* (c) Thales group, 2012 This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; version 2 of the License. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Contact: http://eodev.sourceforge.net Authors: Benjamin Bouvier */ # ifndef __EO_MPI_H__ # define __EO_MPI_H__ # include // std::vector # include # include # include # include # include "eoMpiNode.h" # include "eoMpiAssignmentAlgorithm.h" namespace eo { /** * @ingroup Parallel * @defgroup MPI Message Passing Interface parallelization * @{ */ /** * @brief MPI parallelization helpers for EO. * * This namespace contains parallelization functions which help to parallelize computations in EO. It is based on a * generic algorithm, which is then customized with functors, corresponding to the algorithm main steps. These * computations are centralized, i.e there is one central host whose role is to handle the steps of the algorithm ; * we call it the "master". The other hosts just have to perform a "dummy" computation, which may be any kind of * processing ; we call them, the "slaves", or less pejoratively, the "workers". Workers can communicate to each * other, but they receive their orders from the Master and send him back some results. A worker can also be the * master of a different parallelization process, as soon as it is a part of its work. Machines of the network, also * called hosts, are identified by an unique number: their rank. At any time during the execution of the program, * all the hosts know the total number of hosts. * * A parallelized Job is a set of tasks which are independant (i.e can be executed in random order without * modifiying the result) and take a data input and compute a data output to be sent to the Master. The data can be * of any type, however they have to be serialized to be sent over a network. It is sufficient that they can be * serialized through boost. * * @todo For serialization purposes, don't depend upon boost. It would be easy to use only eoserial and send strings * via mpi. * * The main steps of the algorithm are the following: * - For the master: * - Have we done with the treatment we are doing ? * - If this is the case, we can quit. * - Otherwise, send an input data to some available worker. * - If there's no available worker, wait for a worker to be free. * - When receiving the response, handle it (eventually compute something on the output data, store it...). * - Go back to the first step. * - For the worker, it is even easier: * - Wait for an order. * - If there's nothing to do, just quit. * - Otherwise, eventually retrieve data and do the work. * - Go back to the first step. * * There is of course some network adjustements to do and precisions to give there, but the main ideas are present. As the * job is fully centralized, this is the master who tells the workers when to quit and when to work. * * The idea behind these MPI helpers is to be the most generic possible. If we look back at the steps of the * algorithm, we found that the steps can be splitted into 2 parts: the first consists in the steps of any * parallelization algorithm and the other consists in the specific parts of the algorithm. Ideally, the user should * just have to implement the specific parts of the algorithm. We identified these parts to be: * - For the master: * - What does mean to have terminated ? There are only two alternatives, in our binary world: either it is * terminated, or it is not. Hence we only need a function returning a boolean to know if we're done with the * computation : we'll call it IsFinished. * - What do we have to do when we send a task ? We don't have any a priori on the form of the sent data, or * the number of sent data. Moreover, as the tasks are all independant, we don't care of who will do the * computation, as soon as it's done. Knowing the rank of the worker will be sufficient to send him data. We * have identified another function, taking a single argument which is the rank of the worker: we'll call it * SendTask. * - What do we have to do when we receive a response from a worker ? One more time, we don't know which form * or structure can have the receive data, only the user can know. Also we let the user the charge to retrieve * the data ; he just has to know from who the master will retrieve the data. Here is another function, taking * a rank (the sender's one) as a function argument : this will be HandleResponse. * - For the worker: * - What is the processing ? It can have any nature. We just need to be sure that a data is sent back to the * master, but it seems difficult to check that: it will be the role of the user to assert that data is sent by * the worker at the end of an execution. We've got identified our last function: ProcessTask. * * In term of implementation, it would be annoying to have only abstract classes with these 4 methods to implement. It * would mean that if you want to alter just one of these 4 functions, you have to implement a new sub class, with a * new constructor which could have the same signature. Besides, this fashion doesn't allow you to add dynamic * functionalities, using the design pattern Decorator for instance, without implement a class for each type of * decoration you want to add. For these reasons, we decided to transform function into functors ; the user can then * wrap the existing, basic comportments into more sophisticated computations, whenever he wants, and without the * notion of order. We retrieve here the power of extension given by the design pattern Decorator. * * Our 4 functors could have a big amount of data in common (see eoParallelApply to have an idea). * So as to make it easy for the user to implement these 4 functors, we consider that these functors * have to share a common data structure. This data structure is referenced (as a pointer) in the 4 functors, so the * user doesn't need to pass a lot of parameters to each functor constructor. * * There are two kinds of jobs: * - The job which are launched a fixed and well known amount of times, i.e both master and workers know how many * times they will be launched. They are "one shot jobs". * - The job which are launched an unknown amount of times, for instance embedded in a while loop for which we don't * know the amount of repetitions (typically, eoEasyEA loop is a good example, as we don't know the continuator * condition). They are called "multi job". * As the master tells the workers to quit, we have to differentiate these two kinds of jobs. When the job is of the * kind "multi job", the workers would have to perform a while(true) loop so as to receive the orders ; but even if * the master tells them to quit, they would begin another job and wait for another order, while the master would * have quit: this would cause a deadlock and workers processes would be blocked, waiting for an order. */ namespace mpi { /** * @brief A timer which allows user to generate statistics about computation times. */ extern eoTimerStat timerStat; /** * @brief Tags used in MPI messages for framework communication * * These tags are used for framework communication and fits "channels", so as to differentiate when we're * sending an order to a worker (Commands) or data (Messages). They are not reserved by the framework and can be * used by the user, but he is not bound to. * * @ingroup MPI */ namespace Channel { const int Commands = 0; const int Messages = 1; } /** * @brief Simple orders used by the framework. * * These orders are sent by the master to the workers, to indicate to them if they should receive another task * to do (Continue), if an one shot job is done (Finish) or if a multi job is done (Kill). * * @ingroup MPI */ namespace Message { const int Continue = 0; const int Finish = 1; const int Kill = 2; } /** * @brief If the job only has one master, the user can use this constant, so as not to worry with integer ids. * * @ingroup MPI */ const int DEFAULT_MASTER = 0; /** * @brief Base class for the 4 algorithm functors. * * This class can embed a data (JobData) and a wrapper, so as to make all the 4 functors wrappable. * We can add a wrapper at initialization or at any time when executing the program. * * According to RAII, the boolean needDelete helps to know if we have to use the operator delete on the wrapper * or not. Hence, if any functor is wrapped, user has just to put this boolean to true, to indicate to wrapper * that it should call delete. This allows to mix wrapper initialized in the heap (with new) or in the stack. * * @param JobData a Data type, which can have any form. It can a struct, a single int, anything. * * @param Wrapped the type of the functor, which will be stored as a pointer under the name _wrapped. * This allows to wrap directly the functor in functors of the same type * here, instead of dealing with SharedDataFunction* that we would have to cast all the time. * Doing also allows to handle the wrapped functor as the functor we're writing, when coding the wrappers, * instead of doing some static_cast. For instance, if there are 2 functors subclasses, fA and fB, fA * implementing doFa() and fB implementing doFb(), we could have the following code: * @code * struct fA_wrapper * { * // some code * void doFa() * { * _wrapped->doFa(); * std::cout << "I'm a fA wrapper!" << std::endl; * // if we didn't have the second template parameter, but a SharedDataFunction, we would have to do this: * static_cast(_wrapped)->doFa(); * // do other things (it's a wrapper) * } * }; * * struct fB_wrapper * { * // some code * void doFb() * { * _wrapped->doFb(); // and not: static_cast(_wrapped)->doFb(); * } * }; * @endcode * This makes the code easier to write for the user. * * @ingroup MPI */ template< typename JobData, typename Wrapped > struct SharedDataFunction { /** * @brief Default constructor. * * The user is not bound to give a wrapped functor. */ SharedDataFunction( Wrapped * w = 0 ) : _wrapped( w ), _needDelete( false ) { // empty } /** * @brief Destructor. * * Calls delete on the wrapped function, only if necessary. */ virtual ~SharedDataFunction() { if( _wrapped && _wrapped->needDelete() ) { delete _wrapped; } } /** * @brief Setter for the wrapped function. * * It doesn't do anything on the current wrapped function, like deleting it. */ void wrapped( Wrapped * w ) { _wrapped = w; } /** * @brief Setter for the data present in the functor. * * Calls the setter on the functor and on the wrapped functors, in a Composite pattern fashion. */ void data( JobData* _d ) { d = _d; if( _wrapped ) { _wrapped->data( _d ); } } /** * @brief Returns true if we need to use operator delete on this wrapper, false otherwise. **/ bool needDelete() { return _needDelete; } void needDelete( bool b ) { _needDelete = b; } protected: JobData* d; Wrapped* _wrapped; // Pointer and not a reference so as to be set at any time and to avoid affectation bool _needDelete; }; /** * @brief Functor (master side) used to send a task to the worker. * * The user doesn't have to know which worker will receive a task, so we just indicate to master the rank of the * worker. The data used for computation have to be explicitly sent by the master to the worker, with indicated * rank. Once this functor has been called, the worker is considered busy until it sends a return message to the * master. * * This is a functor implementing void operator()(int), and also a shared data function, containing wrapper on its * own type. * * @ingroup MPI */ template< typename JobData > struct SendTaskFunction : public eoUF, public SharedDataFunction< JobData, SendTaskFunction > { public: SendTaskFunction( SendTaskFunction* w = 0 ) : SharedDataFunction >( w ) { // empty } virtual ~SendTaskFunction() {} // for inherited classes }; /** * @brief Functor (master side) used to indicate what to do when receiving a response. * * The master calls this function as soon as it receives some data, in some channel. Thanks to MPI, we retrieve * the rank of the data's sender. This functor is then called with this rank. There is no memoization of a link * between sent data and rank, so the user has to implement it, if he needs it. * * This is a functor implementing void operator()(int), and also a shared data function, containing wrapper on * its own type. * * @ingroup MPI */ template< typename JobData > struct HandleResponseFunction : public eoUF, public SharedDataFunction< JobData, HandleResponseFunction > { public: HandleResponseFunction( HandleResponseFunction* w = 0 ) : SharedDataFunction >( w ) { // empty } virtual ~HandleResponseFunction() {} // for inherited classes }; /** * @brief Functor (worker side) implementing the processing to do. * * This is where the real computation happen. * Whenever the master sends the command "Continue" to workers, which indicates the worker will receive a task, * the worker calls this functor. The user has to explicitly retrieve the data, handle it and transmit it, * processed, back to the master. If the worker does not send any data back to the master, the latter will * consider the worker isn't done and a deadlock could occur. * * This is a functor implementing void operator()(), and also a shared data function, containing wrapper on its * own type. * * @ingroup MPI */ template< typename JobData > struct ProcessTaskFunction : public eoF, public SharedDataFunction< JobData, ProcessTaskFunction > { public: ProcessTaskFunction( ProcessTaskFunction* w = 0 ) : SharedDataFunction >( w ) { // empty } virtual ~ProcessTaskFunction() {} // for inherited classes }; /** * @brief Functor (master side) indicating whether the job is done or not. * * The master loops on this functor to know when to stop. When this functor returns true, the master will wait * for the last responses and properly stops the job. Whenever this functor returns false, the master will send * tasks, until this functor returns true. * * This is a functor implementing bool operator()(), and also a shared function, containing wrapper on its own * type. * * @ingroup MPI */ template< typename JobData > struct IsFinishedFunction : public eoF, public SharedDataFunction< JobData, IsFinishedFunction > { public: IsFinishedFunction( IsFinishedFunction* w = 0 ) : SharedDataFunction >( w ) { // empty } virtual ~IsFinishedFunction() {} // for inherited classes }; /** * @brief Contains all the required data and the functors to launch a job. * * Splitting the functors and data from the job in itself allows to use the same functors and data for multiples * instances of the same job. You define your store once and can use it a lot of times during your program. If * the store was included in the job, you'd have to give again all the functors and all the datas to each * invokation of the job. * * Job store contains the 4 functors (pointers, so as to be able to wrap them ; references couldn't have * permitted that) described above and the JobData used by all these functors. It contains * also helpers to easily wrap the functors, getters and setters on all of them. * * The user has to implement data(), which is the getter for retrieving JobData. We don't have any idea of who * owns the data, moreover it is impossible to initialize it in this generic JobStore, as we don't know its * form. As a matter of fact, the user has to define this in the JobStore subclasses. * * @ingroup MPI */ template< typename JobData > struct JobStore { /** * @brief Default ctor with the 4 functors. */ JobStore( SendTaskFunction* stf, HandleResponseFunction* hrf, ProcessTaskFunction* ptf, IsFinishedFunction* iff ) : _stf( stf ), _hrf( hrf ), _ptf( ptf ), _iff( iff ) { // empty } /** * @brief Empty ctor, useful for not forcing users to call the other constructor. * * When using this constructor, the user have to care about the 4 functors pointers, otherwise null pointer * segfaults have to be expected. */ JobStore() { // empty } /** * @brief Default destructor. * * JobStore is the highest layer which calls needDelete on its functors. */ ~JobStore() { if( _stf->needDelete() ) delete _stf; if( _hrf->needDelete() ) delete _hrf; if( _ptf->needDelete() ) delete _ptf; if( _iff->needDelete() ) delete _iff; } // Getters SendTaskFunction & sendTask() { return *_stf; } HandleResponseFunction & handleResponse() { return *_hrf; } ProcessTaskFunction & processTask() { return *_ptf; } IsFinishedFunction & isFinished() { return *_iff; } // Setters void sendTask( SendTaskFunction* stf ) { _stf = stf; } void handleResponse( HandleResponseFunction* hrf ) { _hrf = hrf; } void processTask( ProcessTaskFunction* ptf ) { _ptf = ptf; } void isFinished( IsFinishedFunction* iff ) { _iff = iff; } /** * @brief Helpers for wrapping send task functor. */ void wrapSendTask( SendTaskFunction* stf ) { if( stf ) { stf->wrapped( _stf ); _stf = stf; } } /** * @brief Helpers for wrapping handle response functor. */ void wrapHandleResponse( HandleResponseFunction* hrf ) { if( hrf ) { hrf->wrapped( _hrf ); _hrf = hrf; } } /** * @brief Helpers for wrapping process task functor. */ void wrapProcessTask( ProcessTaskFunction* ptf ) { if( ptf ) { ptf->wrapped( _ptf ); _ptf = ptf; } } /** * @brief Helpers for wrapping is finished functor. */ void wrapIsFinished( IsFinishedFunction* iff ) { if( iff ) { iff->wrapped( _iff ); _iff = iff; } } virtual JobData* data() = 0; protected: SendTaskFunction< JobData >* _stf; HandleResponseFunction< JobData >* _hrf; ProcessTaskFunction< JobData >* _ptf; IsFinishedFunction< JobData >* _iff; }; /** * @brief Class implementing the centralized job algorithm. * * This class handles all the job algorithm. With its store and its assignment (scheduling) algorithm, it * executes the general algorithm described above, adding some networking, so as to make the global process * work. It initializes all the functors with the data, then launches the main loop, indicating to workers when * they will have to work and when they will finish, by sending them a termination message (integer that can be * customized). As the algorithm is centralized, it is also mandatory to indicate what is the MPI rank of the * master process, hence the workers will know from who they should receive their commands. * * Any of the 3 master functors can launch exception, it will be catched and rethrown as a std::runtime_exception * to the higher layers. * * @ingroup MPI */ template< class JobData > class Job { public: /** * @brief Main constructor for Job. * * @param _algo The used assignment (scheduling) algorithm. It has to be initialized, with its maximum * possible number of workers (some workers referenced in this algorithm shouldn't be busy). See * AssignmentAlgorithm for more details. * * @param _masterRank The MPI rank of the master. * * @param _workerStopCondition Number of the message which will cause the workers to terminate. It could * be one of the constants defined in eo::mpi::Commands, or any other integer. The user has to be sure * that a message containing this integer will be sent to each worker on the Commands channel, otherwise * deadlock will happen. Master sends Finish messages at the end of a simple job, but as a job can * happen multiples times (multi job), workers don't have to really finish on these messages but on * another message. This is here where you can configurate it. See also OneShotJob and MultiJob. * * @param store The JobStore containing functors and data for this job. */ Job( AssignmentAlgorithm& _algo, int _masterRank, int _workerStopCondition, JobStore & store ) : assignmentAlgo( _algo ), masterRank( _masterRank ), workerStopCondition( _workerStopCondition ), comm( Node::comm() ), // Functors sendTask( store.sendTask() ), handleResponse( store.handleResponse() ), processTask( store.processTask() ), isFinished( store.isFinished() ) { _isMaster = Node::comm().rank() == _masterRank; sendTask.data( store.data() ); handleResponse.data( store.data() ); processTask.data( store.data() ); isFinished.data( store.data() ); } protected: /** * @brief Finally block of the main algorithm * * Herb Sutter's trick for having a finally block, in a try/catch section: invoke a class at the * beginning of the try, its destructor will be called in every cases. * * This implements the end of the master algorithm: * - sends to all available workers that they are free, * - waits for last responses, handles them and sends termination messages to last workers. */ struct FinallyBlock { FinallyBlock( int _totalWorkers, AssignmentAlgorithm& _algo, Job< JobData > & _that ) : totalWorkers( _totalWorkers ), assignmentAlgo( _algo ), that( _that ), // global field comm( Node::comm() ) { // empty } ~FinallyBlock() { # ifndef NDEBUG eo::log << eo::debug; eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl; # endif // frees all the idle workers timerStat.start("master_wait_for_idles"); std::vector idles = assignmentAlgo.idles(); for(unsigned int i = 0; i < idles.size(); ++i) { comm.send( idles[i], Channel::Commands, Message::Finish ); } timerStat.stop("master_wait_for_idles"); # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl; # endif // wait for all responses timerStat.start("master_wait_for_all_responses"); while( assignmentAlgo.availableWorkers() != totalWorkers ) { bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); int wrkRank = status.source(); that.handleResponse( wrkRank ); comm.send( wrkRank, Channel::Commands, Message::Finish ); assignmentAlgo.confirm( wrkRank ); } timerStat.stop("master_wait_for_all_responses"); # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; # endif } protected: int totalWorkers; AssignmentAlgorithm& assignmentAlgo; Job< JobData > & that; bmpi::communicator & comm; }; /** * @brief Master part of the job. * * Launches the parallelized job algorithm : while there is something to do (! IsFinished ), get a * worker who will be the assignee ; if no worker is available, wait for a response, handle it and reask * for an assignee. Then send the command and the task. * Once there is no more to do (IsFinished), indicate to all available workers that they're free, wait * for all the responses and send termination messages (see also FinallyBlock). */ void master( ) { int totalWorkers = assignmentAlgo.availableWorkers(); # ifndef NDEBUG eo::log << eo::debug; eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl; # endif try { FinallyBlock finally( totalWorkers, assignmentAlgo, *this ); while( ! isFinished() ) { timerStat.start("master_wait_for_assignee"); int assignee = assignmentAlgo.get( ); while( assignee <= 0 ) { # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl; # endif bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); int wrkRank = status.source(); # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl; # endif handleResponse( wrkRank ); assignmentAlgo.confirm( wrkRank ); assignee = assignmentAlgo.get( ); } timerStat.stop("master_wait_for_assignee"); # ifndef NDEBUG eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl; # endif timerStat.start("master_wait_for_send"); comm.send( assignee, Channel::Commands, Message::Continue ); sendTask( assignee ); timerStat.stop("master_wait_for_send"); } } catch( const std::exception & e ) { std::string s = e.what(); s.append( " in eoMpi loop"); throw std::runtime_error( s ); } } /** * @brief Worker part of the algorithm. * * The algorithm is more much simpler: wait for an order; if it's termination message, leave. Otherwise, * prepare to work. */ void worker( ) { int order; # ifndef NDEBUG eo::log << eo::debug; # endif timerStat.start("worker_wait_for_order"); comm.recv( masterRank, Channel::Commands, order ); timerStat.stop("worker_wait_for_order"); while( true ) { # ifndef NDEBUG eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl; # endif if ( order == workerStopCondition ) { # ifndef NDEBUG eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl; # endif return; } else if( order == Message::Continue ) { # ifndef NDEBUG eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl; # endif processTask( ); } timerStat.start("worker_wait_for_order"); comm.recv( masterRank, Channel::Commands, order ); timerStat.stop("worker_wait_for_order"); } } public: /** * @brief Launches the job algorithm, according to the role of the host (roles are deduced from the * master rank indicated in the constructor). */ void run( ) { ( _isMaster ) ? master( ) : worker( ); } /** * @brief Returns true if the current host is the master, false otherwise. */ bool isMaster( ) { return _isMaster; } protected: AssignmentAlgorithm& assignmentAlgo; int masterRank; const int workerStopCondition; bmpi::communicator& comm; SendTaskFunction & sendTask; HandleResponseFunction & handleResponse; ProcessTaskFunction & processTask; IsFinishedFunction & isFinished; bool _isMaster; }; /** * @brief Job that will be launched only once. * * As explained in eo::mpi documentation, jobs can happen either a well known amount of times or an unknown * amount of times. This class implements the general case when the job is launched a well known amount of * times. The job will be terminated on both sides (master and worker) once the master would have said it. * * It uses the message Message::Finish as the termination message. * * @ingroup MPI */ template< class JobData > class OneShotJob : public Job< JobData > { public: OneShotJob( AssignmentAlgorithm& algo, int masterRank, JobStore & store ) : Job( algo, masterRank, Message::Finish, store ) { // empty } }; /** * @brief Job that will be launched an unknown amount of times, in worker side. * * As explained in eo::mpi documentation, jobs can happen either a well known amount of times or an unknown * amount of times. This class implements the general case when the job is launched an unknown amount of times, for * instance in a while loop. The master will run many jobs (or the same job many times), but the workers will * launch it only once. * * It uses the message Message::Kill as the termination message. This message can be launched with an EmptyJob, * launched only by the master. If no Message::Kill is sent on the Channels::Commands, the worker will wait * forever, which will cause a deadlock. * * @ingroup MPI */ template< class JobData > class MultiJob : public Job< JobData > { public: MultiJob ( AssignmentAlgorithm& algo, int masterRank, JobStore & store ) : Job( algo, masterRank, Message::Kill, store ) { // empty } }; } /** * @} */ } # endif // __EO_MPI_H__