Moving out ./eo/src/mpi/ content -> ./eompi/src/
This commit is contained in:
parent
51a1af0924
commit
a4f5fd8012
12 changed files with 0 additions and 0 deletions
35
eompi/src/CMakeLists.txt
Normal file
35
eompi/src/CMakeLists.txt
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
######################################################################################
|
||||
### 1) Include the sources
|
||||
######################################################################################
|
||||
|
||||
include_directories(${EO_SRC_DIR}/src)
|
||||
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
|
||||
|
||||
######################################################################################
|
||||
### 2) Define the eompi target
|
||||
######################################################################################
|
||||
|
||||
set(EOMPI_LIB_OUTPUT_PATH ${EO_BIN_DIR}/lib)
|
||||
set(LIBRARY_OUTPUT_PATH ${EOMPI_LIB_OUTPUT_PATH})
|
||||
|
||||
set(EOMPI_SOURCES
|
||||
eoMpi.cpp
|
||||
eoMpiAssignmentAlgorithm.cpp
|
||||
eoMpiNode.cpp
|
||||
implMpi.cpp
|
||||
)
|
||||
|
||||
add_library(eompi STATIC ${EOMPI_SOURCES})
|
||||
install(TARGETS eompi ARCHIVE DESTINATION ${LIB} COMPONENT libraries)
|
||||
|
||||
file(GLOB HDRS *.h)
|
||||
install(FILES ${HDRS} DESTINATION include${INSTALL_SUB_DIR}/eo/mpi COMPONENT headers)
|
||||
|
||||
######################################################################################
|
||||
### 3) Optionnal
|
||||
######################################################################################
|
||||
|
||||
set(EOMPI_VERSION ${GLOBAL_VERSION})
|
||||
set_target_properties(eompi PROPERTIES VERSION "${EOMPI_VERSION}")
|
||||
|
||||
######################################################################################
|
||||
48
eompi/src/eoMpi.cpp
Normal file
48
eompi/src/eoMpi.cpp
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# include "eoMpi.h"
|
||||
|
||||
namespace eo
|
||||
{
|
||||
namespace mpi
|
||||
{
|
||||
/**********************************************
|
||||
* *********** GLOBALS ************************
|
||||
* *******************************************/
|
||||
eoTimerStat timerStat;
|
||||
|
||||
namespace Channel
|
||||
{
|
||||
const int Commands = 0;
|
||||
const int Messages = 1;
|
||||
}
|
||||
|
||||
namespace Message
|
||||
{
|
||||
const int Continue = 0;
|
||||
const int Finish = 1;
|
||||
const int Kill = 2;
|
||||
}
|
||||
|
||||
const int DEFAULT_MASTER = 0;
|
||||
}
|
||||
}
|
||||
868
eompi/src/eoMpi.h
Normal file
868
eompi/src/eoMpi.h
Normal file
|
|
@ -0,0 +1,868 @@
|
|||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# ifndef __EO_MPI_H__
|
||||
# define __EO_MPI_H__
|
||||
|
||||
# include <vector> // std::vector
|
||||
|
||||
# include <utils/eoLogger.h>
|
||||
# include <utils/eoTimer.h>
|
||||
# include <eoFunctor.h>
|
||||
# include <eoExceptions.h>
|
||||
|
||||
# include "eoMpiNode.h"
|
||||
# include "eoMpiAssignmentAlgorithm.h"
|
||||
|
||||
namespace eo
|
||||
{
|
||||
/**
|
||||
* @ingroup Parallel
|
||||
* @defgroup MPI Message Passing Interface
|
||||
* @brief See namespace eo::mpi to have all explanations about this module.
|
||||
* @{
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief MPI parallelization helpers for EO.
|
||||
*
|
||||
* This namespace contains parallelization functions which help to parallelize computations in EO. It is based on a
|
||||
* generic algorithm, which is then customized with functors, corresponding to the algorithm main steps. These
|
||||
* computations are centralized, i.e there is one central host whose role is to handle the steps of the algorithm ;
|
||||
* we call it the "master". The other hosts just have to perform a "dummy" computation, which may be any kind of
|
||||
* processing ; we call them, the "slaves", or less pejoratively, the "workers". Workers can communicate to each
|
||||
* other, but they receive their orders from the Master and send him back some results. A worker can also be the
|
||||
* master of a different parallelization process, as soon as it is a part of its work. Machines of the network, also
|
||||
* called hosts, are identified by an unique number: their rank. At any time during the execution of the program,
|
||||
* all the hosts know the total number of hosts.
|
||||
*
|
||||
* A parallelized Job is a set of tasks which are independant (i.e can be executed in random order without
|
||||
* modifiying the result) and take a data input and compute a data output to be sent to the Master. The data can be
|
||||
* of any type, however they have to be serialized to be sent over a network. It is sufficient that they can be
|
||||
* serialized through boost.
|
||||
*
|
||||
* @todo For serialization purposes, don't depend upon boost. It would be easy to use only eoserial and send strings
|
||||
* via mpi.
|
||||
*
|
||||
* The main steps of the algorithm are the following:
|
||||
* - For the master:
|
||||
* - Have we done with the treatment we are doing ?
|
||||
* - If this is the case, we can quit.
|
||||
* - Otherwise, send an input data to some available worker.
|
||||
* - If there's no available worker, wait for a worker to be free.
|
||||
* - When receiving the response, handle it (eventually compute something on the output data, store it...).
|
||||
* - Go back to the first step.
|
||||
* - For the worker, it is even easier:
|
||||
* - Wait for an order.
|
||||
* - If there's nothing to do, just quit.
|
||||
* - Otherwise, eventually retrieve data and do the work.
|
||||
* - Go back to the first step.
|
||||
*
|
||||
* There is of course some network adjustements to do and precisions to give there, but the main ideas are present. As the
|
||||
* job is fully centralized, this is the master who tells the workers when to quit and when to work.
|
||||
*
|
||||
* The idea behind these MPI helpers is to be the most generic possible. If we look back at the steps of the
|
||||
* algorithm, we found that the steps can be splitted into 2 parts: the first consists in the steps of any
|
||||
* parallelization algorithm and the other consists in the specific parts of the algorithm. Ideally, the user should
|
||||
* just have to implement the specific parts of the algorithm. We identified these parts to be:
|
||||
* - For the master:
|
||||
* - What does mean to have terminated ? There are only two alternatives, in our binary world: either it is
|
||||
* terminated, or it is not. Hence we only need a function returning a boolean to know if we're done with the
|
||||
* computation : we'll call it IsFinished.
|
||||
* - What do we have to do when we send a task ? We don't have any a priori on the form of the sent data, or
|
||||
* the number of sent data. Moreover, as the tasks are all independant, we don't care of who will do the
|
||||
* computation, as soon as it's done. Knowing the rank of the worker will be sufficient to send him data. We
|
||||
* have identified another function, taking a single argument which is the rank of the worker: we'll call it
|
||||
* SendTask.
|
||||
* - What do we have to do when we receive a response from a worker ? One more time, we don't know which form
|
||||
* or structure can have the receive data, only the user can know. Also we let the user the charge to retrieve
|
||||
* the data ; he just has to know from who the master will retrieve the data. Here is another function, taking
|
||||
* a rank (the sender's one) as a function argument : this will be HandleResponse.
|
||||
* - For the worker:
|
||||
* - What is the processing ? It can have any nature. We just need to be sure that a data is sent back to the
|
||||
* master, but it seems difficult to check that: it will be the role of the user to assert that data is sent by
|
||||
* the worker at the end of an execution. We've got identified our last function: ProcessTask.
|
||||
*
|
||||
* In term of implementation, it would be annoying to have only abstract classes with these 4 methods to implement. It
|
||||
* would mean that if you want to alter just one of these 4 functions, you have to implement a new sub class, with a
|
||||
* new constructor which could have the same signature. Besides, this fashion doesn't allow you to add dynamic
|
||||
* functionalities, using the design pattern Decorator for instance, without implement a class for each type of
|
||||
* decoration you want to add. For these reasons, we decided to transform function into functors ; the user can then
|
||||
* wrap the existing, basic comportments into more sophisticated computations, whenever he wants, and without the
|
||||
* notion of order. We retrieve here the power of extension given by the design pattern Decorator.
|
||||
*
|
||||
* Our 4 functors could have a big amount of data in common (see eoParallelApply to have an idea).
|
||||
* So as to make it easy for the user to implement these 4 functors, we consider that these functors
|
||||
* have to share a common data structure. This data structure is referenced (as a pointer) in the 4 functors, so the
|
||||
* user doesn't need to pass a lot of parameters to each functor constructor.
|
||||
*
|
||||
* There are two kinds of jobs:
|
||||
* - The job which are launched a fixed and well known amount of times, i.e both master and workers know how many
|
||||
* times they will be launched. They are "one shot jobs".
|
||||
* - The job which are launched an unknown amount of times, for instance embedded in a while loop for which we don't
|
||||
* know the amount of repetitions (typically, eoEasyEA loop is a good example, as we don't know the continuator
|
||||
* condition). They are called "multi job".
|
||||
* As the master tells the workers to quit, we have to differentiate these two kinds of jobs. When the job is of the
|
||||
* kind "multi job", the workers would have to perform a while(true) loop so as to receive the orders ; but even if
|
||||
* the master tells them to quit, they would begin another job and wait for another order, while the master would
|
||||
* have quit: this would cause a deadlock and workers processes would be blocked, waiting for an order.
|
||||
*/
|
||||
namespace mpi
|
||||
{
|
||||
/**
|
||||
* @brief A timer which allows user to generate statistics about computation times.
|
||||
*/
|
||||
extern eoTimerStat timerStat;
|
||||
|
||||
/**
|
||||
* @brief Tags used in MPI messages for framework communication
|
||||
*
|
||||
* These tags are used for framework communication and fits "channels", so as to differentiate when we're
|
||||
* sending an order to a worker (Commands) or data (Messages). They are not reserved by the framework and can be
|
||||
* used by the user, but he is not bound to.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
namespace Channel
|
||||
{
|
||||
extern const int Commands;
|
||||
extern const int Messages;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Simple orders used by the framework.
|
||||
*
|
||||
* These orders are sent by the master to the workers, to indicate to them if they should receive another task
|
||||
* to do (Continue), if an one shot job is done (Finish) or if a multi job is done (Kill).
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
namespace Message
|
||||
{
|
||||
extern const int Continue;
|
||||
extern const int Finish;
|
||||
extern const int Kill;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief If the job only has one master, the user can use this constant, so as not to worry with integer ids.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
extern const int DEFAULT_MASTER;
|
||||
|
||||
/**
|
||||
* @brief Base class for the 4 algorithm functors.
|
||||
*
|
||||
* This class can embed a data (JobData) and a wrapper, so as to make all the 4 functors wrappable.
|
||||
* We can add a wrapper at initialization or at any time when executing the program.
|
||||
*
|
||||
* According to RAII, the boolean needDelete helps to know if we have to use the operator delete on the wrapper
|
||||
* or not. Hence, if any functor is wrapped, user has just to put this boolean to true, to indicate to wrapper
|
||||
* that it should call delete. This allows to mix wrapper initialized in the heap (with new) or in the stack.
|
||||
*
|
||||
* @param JobData a Data type, which can have any form. It can a struct, a single int, anything.
|
||||
*
|
||||
* @param Wrapped the type of the functor, which will be stored as a pointer under the name _wrapped.
|
||||
* This allows to wrap directly the functor in functors of the same type
|
||||
* here, instead of dealing with SharedDataFunction* that we would have to cast all the time.
|
||||
* Doing also allows to handle the wrapped functor as the functor we're writing, when coding the wrappers,
|
||||
* instead of doing some static_cast. For instance, if there are 2 functors subclasses, fA and fB, fA
|
||||
* implementing doFa() and fB implementing doFb(), we could have the following code:
|
||||
* @code
|
||||
* struct fA_wrapper
|
||||
* {
|
||||
* // some code
|
||||
* void doFa()
|
||||
* {
|
||||
* _wrapped->doFa();
|
||||
* std::cout << "I'm a fA wrapper!" << std::endl;
|
||||
* // if we didn't have the second template parameter, but a SharedDataFunction, we would have to do this:
|
||||
* static_cast<fA*>(_wrapped)->doFa();
|
||||
* // do other things (it's a wrapper)
|
||||
* }
|
||||
* };
|
||||
*
|
||||
* struct fB_wrapper
|
||||
* {
|
||||
* // some code
|
||||
* void doFb()
|
||||
* {
|
||||
* _wrapped->doFb(); // and not: static_cast<fB*>(_wrapped)->doFb();
|
||||
* }
|
||||
* };
|
||||
* @endcode
|
||||
* This makes the code easier to write for the user.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< typename JobData, typename Wrapped >
|
||||
struct SharedDataFunction
|
||||
{
|
||||
/**
|
||||
* @brief Default constructor.
|
||||
*
|
||||
* The user is not bound to give a wrapped functor.
|
||||
*/
|
||||
SharedDataFunction( Wrapped * w = 0 ) : _data( 0 ), _wrapped( w ), _needDelete( false )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Destructor.
|
||||
*
|
||||
* Calls delete on the wrapped function, only if necessary.
|
||||
*/
|
||||
virtual ~SharedDataFunction()
|
||||
{
|
||||
if( _wrapped && _wrapped->needDelete() )
|
||||
{
|
||||
delete _wrapped;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Setter for the wrapped function.
|
||||
*
|
||||
* It doesn't do anything on the current wrapped function, like deleting it.
|
||||
*/
|
||||
void wrapped( Wrapped * w )
|
||||
{
|
||||
_wrapped = w;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Setter for the data present in the functor.
|
||||
*
|
||||
* Calls the setter on the functor and on the wrapped functors, in a Composite pattern fashion.
|
||||
*/
|
||||
void data( JobData* d )
|
||||
{
|
||||
_data = d;
|
||||
if( _wrapped )
|
||||
{
|
||||
_wrapped->data( d );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns true if we need to use operator delete on this wrapper, false otherwise.
|
||||
*
|
||||
* Allows the user to reject delete responsability to the framework, by setting this value to true.
|
||||
**/
|
||||
bool needDelete() { return _needDelete; }
|
||||
void needDelete( bool b ) { _needDelete = b; }
|
||||
|
||||
protected:
|
||||
JobData* _data;
|
||||
Wrapped* _wrapped; // Pointer and not a reference so as to be set at any time and to avoid affectation
|
||||
bool _needDelete;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Functor (master side) used to send a task to the worker.
|
||||
*
|
||||
* The user doesn't have to know which worker will receive a task, so we just indicate to master the rank of the
|
||||
* worker. The data used for computation have to be explicitly sent by the master to the worker, with indicated
|
||||
* rank. Once this functor has been called, the worker is considered busy until it sends a return message to the
|
||||
* master.
|
||||
*
|
||||
* This is a functor implementing void operator()(int), and also a shared data function, containing wrapper on its
|
||||
* own type.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< typename JobData >
|
||||
struct SendTaskFunction : public eoUF<int, void>, public SharedDataFunction< JobData, SendTaskFunction<JobData> >
|
||||
{
|
||||
public:
|
||||
|
||||
SendTaskFunction( SendTaskFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, SendTaskFunction<JobData> >( w )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
virtual ~SendTaskFunction() {} // for inherited classes
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Functor (master side) used to indicate what to do when receiving a response.
|
||||
*
|
||||
* The master calls this function as soon as it receives some data, in some channel. Thanks to MPI, we retrieve
|
||||
* the rank of the data's sender. This functor is then called with this rank. There is no memoization of a link
|
||||
* between sent data and rank, so the user has to implement it, if he needs it.
|
||||
*
|
||||
* This is a functor implementing void operator()(int), and also a shared data function, containing wrapper on
|
||||
* its own type.
|
||||
*
|
||||
* The master has to receive worker's data on channel (= MPI tag) eo::mpi::Channel::Messages. No other tags are
|
||||
* allowed.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< typename JobData >
|
||||
struct HandleResponseFunction : public eoUF<int, void>, public SharedDataFunction< JobData, HandleResponseFunction<JobData> >
|
||||
{
|
||||
public:
|
||||
|
||||
HandleResponseFunction( HandleResponseFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, HandleResponseFunction<JobData> >( w )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
virtual ~HandleResponseFunction() {} // for inherited classes
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Functor (worker side) implementing the processing to do.
|
||||
*
|
||||
* This is where the real computation happen.
|
||||
* Whenever the master sends the command "Continue" to workers, which indicates the worker will receive a task,
|
||||
* the worker calls this functor. The user has to explicitly retrieve the data, handle it and transmit it,
|
||||
* processed, back to the master. Data sent back needs to be transmitted via channel (= MPI tag)
|
||||
* eo::mpi::Channel::Messages, and no one else. If the worker does not send any data back to the master, the latter will
|
||||
* consider the worker isn't done and a deadlock could occur.
|
||||
*
|
||||
* This is a functor implementing void operator()(), and also a shared data function, containing wrapper on its
|
||||
* own type.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< typename JobData >
|
||||
struct ProcessTaskFunction : public eoF<void>, public SharedDataFunction< JobData, ProcessTaskFunction<JobData> >
|
||||
{
|
||||
public:
|
||||
|
||||
ProcessTaskFunction( ProcessTaskFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, ProcessTaskFunction<JobData> >( w )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
virtual ~ProcessTaskFunction() {} // for inherited classes
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Functor (master side) indicating whether the job is done or not.
|
||||
*
|
||||
* The master loops on this functor to know when to stop. When this functor returns true, the master will wait
|
||||
* for the last responses and properly stops the job. Whenever this functor returns false, the master will send
|
||||
* tasks, until this functor returns true.
|
||||
*
|
||||
* This is a functor implementing bool operator()(), and also a shared function, containing wrapper on its own
|
||||
* type.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< typename JobData >
|
||||
struct IsFinishedFunction : public eoF<bool>, public SharedDataFunction< JobData, IsFinishedFunction<JobData> >
|
||||
{
|
||||
public:
|
||||
|
||||
IsFinishedFunction( IsFinishedFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, IsFinishedFunction<JobData> >( w )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
virtual ~IsFinishedFunction() {} // for inherited classes
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Contains all the required data and the functors to launch a job.
|
||||
*
|
||||
* Splitting the functors and data from the job in itself allows to use the same functors and data for multiples
|
||||
* instances of the same job. You define your store once and can use it a lot of times during your program. If
|
||||
* the store was included in the job, you'd have to give again all the functors and all the datas to each
|
||||
* invokation of the job.
|
||||
*
|
||||
* Job store contains the 4 functors (pointers, so as to be able to wrap them ; references couldn't have
|
||||
* permitted that) described above and the JobData used by all these functors. It contains
|
||||
* also helpers to easily wrap the functors, getters and setters on all of them.
|
||||
*
|
||||
* The user has to implement data(), which is the getter for retrieving JobData. We don't have any idea of who
|
||||
* owns the data, moreover it is impossible to initialize it in this generic JobStore, as we don't know its
|
||||
* form. As a matter of fact, the user has to define this in the JobStore subclasses.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< typename JobData >
|
||||
struct JobStore
|
||||
{
|
||||
/**
|
||||
* @brief Default ctor with the 4 functors.
|
||||
*/
|
||||
JobStore(
|
||||
SendTaskFunction<JobData>* stf,
|
||||
HandleResponseFunction<JobData>* hrf,
|
||||
ProcessTaskFunction<JobData>* ptf,
|
||||
IsFinishedFunction<JobData>* iff
|
||||
) :
|
||||
_stf( stf ), _hrf( hrf ), _ptf( ptf ), _iff( iff )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Empty ctor, useful for not forcing users to call the other constructor.
|
||||
*
|
||||
* When using this constructor, the user have to care about the 4 functors pointers, otherwise null pointer
|
||||
* segfaults have to be expected.
|
||||
*/
|
||||
JobStore()
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Default destructor.
|
||||
*
|
||||
* JobStore is the highest layer which calls needDelete on its functors.
|
||||
*/
|
||||
~JobStore()
|
||||
{
|
||||
if( _stf->needDelete() ) delete _stf;
|
||||
if( _hrf->needDelete() ) delete _hrf;
|
||||
if( _ptf->needDelete() ) delete _ptf;
|
||||
if( _iff->needDelete() ) delete _iff;
|
||||
}
|
||||
|
||||
// Getters
|
||||
SendTaskFunction<JobData> & sendTask() { return *_stf; }
|
||||
HandleResponseFunction<JobData> & handleResponse() { return *_hrf; }
|
||||
ProcessTaskFunction<JobData> & processTask() { return *_ptf; }
|
||||
IsFinishedFunction<JobData> & isFinished() { return *_iff; }
|
||||
|
||||
// Setters
|
||||
void sendTask( SendTaskFunction<JobData>* stf )
|
||||
{
|
||||
if( !stf )
|
||||
return;
|
||||
|
||||
if( _stf && _stf->needDelete() )
|
||||
{
|
||||
delete _stf;
|
||||
}
|
||||
_stf = stf;
|
||||
}
|
||||
|
||||
void handleResponse( HandleResponseFunction<JobData>* hrf )
|
||||
{
|
||||
if( !hrf )
|
||||
return;
|
||||
|
||||
if( _hrf && _hrf->needDelete() )
|
||||
{
|
||||
delete _hrf;
|
||||
}
|
||||
_hrf = hrf;
|
||||
}
|
||||
|
||||
void processTask( ProcessTaskFunction<JobData>* ptf )
|
||||
{
|
||||
if( !ptf )
|
||||
return;
|
||||
|
||||
if( _ptf && _ptf->needDelete() )
|
||||
{
|
||||
delete _ptf;
|
||||
}
|
||||
_ptf = ptf;
|
||||
}
|
||||
|
||||
void isFinished( IsFinishedFunction<JobData>* iff )
|
||||
{
|
||||
if( !iff )
|
||||
return;
|
||||
|
||||
if( _iff && _iff->needDelete() )
|
||||
{
|
||||
delete _iff;
|
||||
}
|
||||
_iff = iff;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Helpers for wrapping send task functor.
|
||||
*/
|
||||
void wrapSendTask( SendTaskFunction<JobData>* stf )
|
||||
{
|
||||
if( stf )
|
||||
{
|
||||
stf->wrapped( _stf );
|
||||
_stf = stf;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Helpers for wrapping handle response functor.
|
||||
*/
|
||||
void wrapHandleResponse( HandleResponseFunction<JobData>* hrf )
|
||||
{
|
||||
if( hrf )
|
||||
{
|
||||
hrf->wrapped( _hrf );
|
||||
_hrf = hrf;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Helpers for wrapping process task functor.
|
||||
*/
|
||||
void wrapProcessTask( ProcessTaskFunction<JobData>* ptf )
|
||||
{
|
||||
if( ptf )
|
||||
{
|
||||
ptf->wrapped( _ptf );
|
||||
_ptf = ptf;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Helpers for wrapping is finished functor.
|
||||
*/
|
||||
void wrapIsFinished( IsFinishedFunction<JobData>* iff )
|
||||
{
|
||||
if( iff )
|
||||
{
|
||||
iff->wrapped( _iff );
|
||||
_iff = iff;
|
||||
}
|
||||
}
|
||||
|
||||
virtual JobData* data() = 0;
|
||||
|
||||
protected:
|
||||
|
||||
SendTaskFunction< JobData >* _stf;
|
||||
HandleResponseFunction< JobData >* _hrf;
|
||||
ProcessTaskFunction< JobData >* _ptf;
|
||||
IsFinishedFunction< JobData >* _iff;
|
||||
};
|
||||
|
||||
/**
|
||||
* @example t-mpi-wrapper.cpp
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Class implementing the centralized job algorithm.
|
||||
*
|
||||
* This class handles all the job algorithm. With its store and its assignment (scheduling) algorithm, it
|
||||
* executes the general algorithm described above, adding some networking, so as to make the global process
|
||||
* work. It initializes all the functors with the data, then launches the main loop, indicating to workers when
|
||||
* they will have to work and when they will finish, by sending them a termination message (integer that can be
|
||||
* customized). As the algorithm is centralized, it is also mandatory to indicate what is the MPI rank of the
|
||||
* master process, hence the workers will know from who they should receive their commands.
|
||||
*
|
||||
* Any of the 3 master functors can launch exception, it will be catched and rethrown as a std::runtime_exception
|
||||
* to the higher layers.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< class JobData >
|
||||
class Job
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* @brief Main constructor for Job.
|
||||
*
|
||||
* @param _algo The used assignment (scheduling) algorithm. It has to be initialized, with its maximum
|
||||
* possible number of workers (some workers referenced in this algorithm shouldn't be busy). See
|
||||
* AssignmentAlgorithm for more details.
|
||||
*
|
||||
* @param _masterRank The MPI rank of the master.
|
||||
*
|
||||
* @param _workerStopCondition Number of the message which will cause the workers to terminate. It could
|
||||
* be one of the constants defined in eo::mpi::Commands, or any other integer. The user has to be sure
|
||||
* that a message containing this integer will be sent to each worker on the Commands channel, otherwise
|
||||
* deadlock will happen. Master sends Finish messages at the end of a simple job, but as a job can
|
||||
* happen multiples times (multi job), workers don't have to really finish on these messages but on
|
||||
* another message. This is here where you can configurate it. See also OneShotJob and MultiJob.
|
||||
*
|
||||
* @param store The JobStore containing functors and data for this job.
|
||||
*/
|
||||
Job( AssignmentAlgorithm& _algo,
|
||||
int _masterRank,
|
||||
int _workerStopCondition,
|
||||
JobStore<JobData> & _store
|
||||
) :
|
||||
assignmentAlgo( _algo ),
|
||||
masterRank( _masterRank ),
|
||||
workerStopCondition( _workerStopCondition ),
|
||||
comm( Node::comm() ),
|
||||
// Functors
|
||||
store( _store ),
|
||||
sendTask( _store.sendTask() ),
|
||||
handleResponse( _store.handleResponse() ),
|
||||
processTask( _store.processTask() ),
|
||||
isFinished( _store.isFinished() )
|
||||
{
|
||||
_isMaster = Node::comm().rank() == _masterRank;
|
||||
|
||||
sendTask.data( _store.data() );
|
||||
handleResponse.data( _store.data() );
|
||||
processTask.data( _store.data() );
|
||||
isFinished.data( _store.data() );
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
/**
|
||||
* @brief Finally block of the main algorithm
|
||||
*
|
||||
* Herb Sutter's trick for having a finally block, in a try/catch section: invoke a class at the
|
||||
* beginning of the try, its destructor will be called in every cases.
|
||||
*
|
||||
* This implements the end of the master algorithm:
|
||||
* - sends to all available workers that they are free,
|
||||
* - waits for last responses, handles them and sends termination messages to last workers.
|
||||
*/
|
||||
struct FinallyBlock
|
||||
{
|
||||
FinallyBlock(
|
||||
int _totalWorkers,
|
||||
AssignmentAlgorithm& _algo,
|
||||
Job< JobData > & _that
|
||||
) :
|
||||
totalWorkers( _totalWorkers ),
|
||||
assignmentAlgo( _algo ),
|
||||
that( _that ),
|
||||
// global field
|
||||
comm( Node::comm() )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
~FinallyBlock()
|
||||
{
|
||||
eo::log << eo::debug << "[M" << comm.rank() << "] Frees all the idle." << std::endl;
|
||||
|
||||
// frees all the idle workers
|
||||
timerStat.start("master_wait_for_idles");
|
||||
std::vector<int> idles = assignmentAlgo.idles();
|
||||
for(unsigned int i = 0; i < idles.size(); ++i)
|
||||
{
|
||||
comm.send( idles[i], Channel::Commands, Message::Finish );
|
||||
}
|
||||
timerStat.stop("master_wait_for_idles");
|
||||
|
||||
eo::log << eo::debug << "[M" << comm.rank() << "] Waits for all responses." << std::endl;
|
||||
|
||||
// wait for all responses
|
||||
timerStat.start("master_wait_for_all_responses");
|
||||
while( assignmentAlgo.availableWorkers() != totalWorkers )
|
||||
{
|
||||
bmpi::status status = comm.probe( bmpi::any_source, eo::mpi::Channel::Messages );
|
||||
int wrkRank = status.source();
|
||||
that.handleResponse( wrkRank );
|
||||
comm.send( wrkRank, Channel::Commands, Message::Finish );
|
||||
assignmentAlgo.confirm( wrkRank );
|
||||
}
|
||||
timerStat.stop("master_wait_for_all_responses");
|
||||
|
||||
eo::log << eo::debug << "[M" << comm.rank() << "] Leaving master task." << std::endl;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
int totalWorkers;
|
||||
AssignmentAlgorithm& assignmentAlgo;
|
||||
Job< JobData > & that;
|
||||
|
||||
bmpi::communicator & comm;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Master part of the job.
|
||||
*
|
||||
* Launches the parallelized job algorithm : while there is something to do (! IsFinished ), get a
|
||||
* worker who will be the assignee ; if no worker is available, wait for a response, handle it and reask
|
||||
* for an assignee. Then send the command and the task.
|
||||
* Once there is no more to do (IsFinished), indicate to all available workers that they're free, wait
|
||||
* for all the responses and send termination messages (see also FinallyBlock).
|
||||
*/
|
||||
void master( )
|
||||
{
|
||||
int totalWorkers = assignmentAlgo.availableWorkers();
|
||||
eo::log << eo::debug << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl;
|
||||
|
||||
try {
|
||||
FinallyBlock finally( totalWorkers, assignmentAlgo, *this );
|
||||
while( ! isFinished() )
|
||||
{
|
||||
timerStat.start("master_wait_for_assignee");
|
||||
int assignee = assignmentAlgo.get( );
|
||||
while( assignee <= 0 )
|
||||
{
|
||||
eo::log << eo::debug << "[M" << comm.rank() << "] Waitin' for node..." << std::endl;
|
||||
|
||||
bmpi::status status = comm.probe( bmpi::any_source, eo::mpi::Channel::Messages );
|
||||
int wrkRank = status.source();
|
||||
|
||||
eo::log << eo::debug << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl;
|
||||
|
||||
handleResponse( wrkRank );
|
||||
assignmentAlgo.confirm( wrkRank );
|
||||
assignee = assignmentAlgo.get( );
|
||||
}
|
||||
timerStat.stop("master_wait_for_assignee");
|
||||
|
||||
eo::log << eo::debug << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl;
|
||||
|
||||
timerStat.start("master_wait_for_send");
|
||||
comm.send( assignee, Channel::Commands, Message::Continue );
|
||||
sendTask( assignee );
|
||||
timerStat.stop("master_wait_for_send");
|
||||
}
|
||||
} catch( const std::exception & e )
|
||||
{
|
||||
std::string s = e.what();
|
||||
s.append( " in eoMpi loop");
|
||||
throw std::runtime_error( s );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Worker part of the algorithm.
|
||||
*
|
||||
* The algorithm is more much simpler: wait for an order; if it's termination message, leave. Otherwise,
|
||||
* prepare to work.
|
||||
*/
|
||||
void worker( )
|
||||
{
|
||||
int order;
|
||||
|
||||
timerStat.start("worker_wait_for_order");
|
||||
comm.recv( masterRank, Channel::Commands, order );
|
||||
timerStat.stop("worker_wait_for_order");
|
||||
|
||||
while( true )
|
||||
{
|
||||
eo::log << eo::debug << "[W" << comm.rank() << "] Waiting for an order..." << std::endl;
|
||||
|
||||
if ( order == workerStopCondition )
|
||||
{
|
||||
eo::log << eo::debug << "[W" << comm.rank() << "] Leaving worker task." << std::endl;
|
||||
return;
|
||||
} else if( order == Message::Continue )
|
||||
{
|
||||
eo::log << eo::debug << "[W" << comm.rank() << "] Processing task..." << std::endl;
|
||||
processTask( );
|
||||
}
|
||||
|
||||
timerStat.start("worker_wait_for_order");
|
||||
comm.recv( masterRank, Channel::Commands, order );
|
||||
timerStat.stop("worker_wait_for_order");
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
* @brief Launches the job algorithm, according to the role of the host (roles are deduced from the
|
||||
* master rank indicated in the constructor).
|
||||
*/
|
||||
void run( )
|
||||
{
|
||||
( _isMaster ) ? master( ) : worker( );
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns true if the current host is the master, false otherwise.
|
||||
*/
|
||||
bool isMaster( )
|
||||
{
|
||||
return _isMaster;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
AssignmentAlgorithm& assignmentAlgo;
|
||||
int masterRank;
|
||||
const int workerStopCondition;
|
||||
bmpi::communicator& comm;
|
||||
|
||||
JobStore<JobData>& store;
|
||||
SendTaskFunction<JobData> & sendTask;
|
||||
HandleResponseFunction<JobData> & handleResponse;
|
||||
ProcessTaskFunction<JobData> & processTask;
|
||||
IsFinishedFunction<JobData> & isFinished;
|
||||
|
||||
bool _isMaster;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Job that will be launched only once.
|
||||
*
|
||||
* As explained in eo::mpi documentation, jobs can happen either a well known amount of times or an unknown
|
||||
* amount of times. This class implements the general case when the job is launched a well known amount of
|
||||
* times. The job will be terminated on both sides (master and worker) once the master would have said it.
|
||||
*
|
||||
* It uses the message Message::Finish as the termination message.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< class JobData >
|
||||
class OneShotJob : public Job< JobData >
|
||||
{
|
||||
public:
|
||||
OneShotJob( AssignmentAlgorithm& algo,
|
||||
int masterRank,
|
||||
JobStore<JobData> & store )
|
||||
: Job<JobData>( algo, masterRank, Message::Finish, store )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Job that will be launched an unknown amount of times, in worker side.
|
||||
*
|
||||
* As explained in eo::mpi documentation, jobs can happen either a well known amount of times or an unknown
|
||||
* amount of times. This class implements the general case when the job is launched an unknown amount of times, for
|
||||
* instance in a while loop. The master will run many jobs (or the same job many times), but the workers will
|
||||
* launch it only once.
|
||||
*
|
||||
* It uses the message Message::Kill as the termination message. This message can be launched with an EmptyJob,
|
||||
* launched only by the master. If no Message::Kill is sent on the Channels::Commands, the worker will wait
|
||||
* forever, which will cause a deadlock.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< class JobData >
|
||||
class MultiJob : public Job< JobData >
|
||||
{
|
||||
public:
|
||||
MultiJob ( AssignmentAlgorithm& algo,
|
||||
int masterRank,
|
||||
JobStore<JobData> & store )
|
||||
: Job<JobData>( algo, masterRank, Message::Kill, store )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @}
|
||||
*/
|
||||
}
|
||||
# endif // __EO_MPI_H__
|
||||
|
||||
224
eompi/src/eoMpiAssignmentAlgorithm.cpp
Normal file
224
eompi/src/eoMpiAssignmentAlgorithm.cpp
Normal file
|
|
@ -0,0 +1,224 @@
|
|||
# include "eoMpiAssignmentAlgorithm.h"
|
||||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# include "eoMpiNode.h"
|
||||
|
||||
namespace eo
|
||||
{
|
||||
namespace mpi
|
||||
{
|
||||
const int REST_OF_THE_WORLD = -1;
|
||||
|
||||
/********************************************************
|
||||
* DYNAMIC ASSIGNMENT ALGORITHM *************************
|
||||
*******************************************************/
|
||||
|
||||
DynamicAssignmentAlgorithm::DynamicAssignmentAlgorithm( )
|
||||
{
|
||||
for(int i = 1; i < Node::comm().size(); ++i)
|
||||
{
|
||||
availableWrk.push_back( i );
|
||||
}
|
||||
}
|
||||
|
||||
DynamicAssignmentAlgorithm::DynamicAssignmentAlgorithm( int unique )
|
||||
{
|
||||
availableWrk.push_back( unique );
|
||||
}
|
||||
|
||||
DynamicAssignmentAlgorithm::DynamicAssignmentAlgorithm( const std::vector<int> & workers )
|
||||
{
|
||||
availableWrk = workers;
|
||||
}
|
||||
|
||||
DynamicAssignmentAlgorithm::DynamicAssignmentAlgorithm( int first, int last )
|
||||
{
|
||||
if( last == REST_OF_THE_WORLD )
|
||||
{
|
||||
last = Node::comm().size() - 1;
|
||||
}
|
||||
|
||||
for( int i = first; i <= last; ++i)
|
||||
{
|
||||
availableWrk.push_back( i );
|
||||
}
|
||||
}
|
||||
|
||||
int DynamicAssignmentAlgorithm::get( )
|
||||
{
|
||||
int assignee = -1;
|
||||
if (! availableWrk.empty() )
|
||||
{
|
||||
assignee = availableWrk.back();
|
||||
availableWrk.pop_back();
|
||||
}
|
||||
return assignee;
|
||||
}
|
||||
|
||||
int DynamicAssignmentAlgorithm::availableWorkers()
|
||||
{
|
||||
return availableWrk.size();
|
||||
}
|
||||
|
||||
void DynamicAssignmentAlgorithm::confirm( int rank )
|
||||
{
|
||||
availableWrk.push_back( rank );
|
||||
}
|
||||
|
||||
std::vector<int> DynamicAssignmentAlgorithm::idles( )
|
||||
{
|
||||
return availableWrk;
|
||||
}
|
||||
|
||||
void DynamicAssignmentAlgorithm::reinit( int _ )
|
||||
{
|
||||
++_;
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
/********************************************************
|
||||
* STATIC ASSIGNMENT ALGORITHM **************************
|
||||
*******************************************************/
|
||||
|
||||
StaticAssignmentAlgorithm::StaticAssignmentAlgorithm( const std::vector<int>& workers, int runs )
|
||||
{
|
||||
init( workers, runs );
|
||||
}
|
||||
|
||||
StaticAssignmentAlgorithm::StaticAssignmentAlgorithm( int first, int last, int runs )
|
||||
{
|
||||
std::vector<int> workers;
|
||||
|
||||
if( last == REST_OF_THE_WORLD )
|
||||
{
|
||||
last = Node::comm().size() - 1;
|
||||
}
|
||||
|
||||
for(int i = first; i <= last; ++i)
|
||||
{
|
||||
workers.push_back( i );
|
||||
}
|
||||
init( workers, runs );
|
||||
}
|
||||
|
||||
StaticAssignmentAlgorithm::StaticAssignmentAlgorithm( int runs )
|
||||
{
|
||||
std::vector<int> workers;
|
||||
for(int i = 1; i < Node::comm().size(); ++i)
|
||||
{
|
||||
workers.push_back( i );
|
||||
}
|
||||
|
||||
init( workers, runs );
|
||||
}
|
||||
|
||||
StaticAssignmentAlgorithm::StaticAssignmentAlgorithm( int unique, int runs )
|
||||
{
|
||||
std::vector<int> workers;
|
||||
workers.push_back( unique );
|
||||
init( workers, runs );
|
||||
}
|
||||
|
||||
void StaticAssignmentAlgorithm::init( const std::vector<int> & workers, int runs )
|
||||
{
|
||||
unsigned int nbWorkers = workers.size();
|
||||
freeWorkers = nbWorkers;
|
||||
|
||||
busy.clear();
|
||||
busy.resize( nbWorkers, false );
|
||||
realRank = workers;
|
||||
|
||||
if( runs <= 0 )
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
attributions.clear();
|
||||
attributions.reserve( nbWorkers );
|
||||
|
||||
// Let be the euclidean division of runs by nbWorkers :
|
||||
// runs == q * nbWorkers + r, 0 <= r < nbWorkers
|
||||
// This one liner affects q requests to each worker
|
||||
for (unsigned int i = 0; i < nbWorkers; attributions[i++] = runs / nbWorkers) ;
|
||||
// The first line computes r and the one liner affects the remaining
|
||||
// r requests to workers, in ascending order
|
||||
unsigned int diff = runs - (runs / nbWorkers) * nbWorkers;
|
||||
for (unsigned int i = 0; i < diff; ++attributions[i++]);
|
||||
}
|
||||
|
||||
int StaticAssignmentAlgorithm::get( )
|
||||
{
|
||||
int assignee = -1;
|
||||
for( unsigned i = 0; i < busy.size(); ++i )
|
||||
{
|
||||
if( !busy[i] && attributions[i] > 0 )
|
||||
{
|
||||
busy[i] = true;
|
||||
--freeWorkers;
|
||||
assignee = realRank[ i ];
|
||||
break;
|
||||
}
|
||||
}
|
||||
return assignee;
|
||||
}
|
||||
|
||||
int StaticAssignmentAlgorithm::availableWorkers( )
|
||||
{
|
||||
return freeWorkers;
|
||||
}
|
||||
|
||||
std::vector<int> StaticAssignmentAlgorithm::idles()
|
||||
{
|
||||
std::vector<int> ret;
|
||||
for(unsigned int i = 0; i < busy.size(); ++i)
|
||||
{
|
||||
if( !busy[i] )
|
||||
{
|
||||
ret.push_back( realRank[i] );
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void StaticAssignmentAlgorithm::confirm( int rank )
|
||||
{
|
||||
int i = -1; // i is the real index in table
|
||||
for( unsigned int j = 0; j < realRank.size(); ++j )
|
||||
{
|
||||
if( realRank[j] == rank )
|
||||
{
|
||||
i = j;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
--attributions[ i ];
|
||||
busy[ i ] = false;
|
||||
++freeWorkers;
|
||||
}
|
||||
|
||||
void StaticAssignmentAlgorithm::reinit( int runs )
|
||||
{
|
||||
init( realRank, runs );
|
||||
}
|
||||
}
|
||||
}
|
||||
239
eompi/src/eoMpiAssignmentAlgorithm.h
Normal file
239
eompi/src/eoMpiAssignmentAlgorithm.h
Normal file
|
|
@ -0,0 +1,239 @@
|
|||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# ifndef __MPI_ASSIGNMENT_ALGORITHM_H__
|
||||
# define __MPI_ASSIGNMENT_ALGORITHM_H__
|
||||
|
||||
# include <vector> // std::vector
|
||||
|
||||
namespace eo
|
||||
{
|
||||
namespace mpi
|
||||
{
|
||||
/**
|
||||
* @brief Constant indicating to use all the resting available workers, in assignment algorithms constructor
|
||||
* using an interval.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
extern const int REST_OF_THE_WORLD;
|
||||
|
||||
/**
|
||||
* @brief Contains informations on the available workers and allows to find assignees for jobs.
|
||||
*
|
||||
* Available workers are workers who aren't processing anything. When they've received an order, workers switch
|
||||
* from the state "available" to the state "busy", and the master has to wait for their response for considering
|
||||
* them available again.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
struct AssignmentAlgorithm
|
||||
{
|
||||
/**
|
||||
* @brief Gets the rank of an available worker, so as to send it a task.
|
||||
*
|
||||
* @return The MPI rank of an available worker, or -1 if there is no available worker.
|
||||
*/
|
||||
virtual int get( ) = 0;
|
||||
|
||||
/**
|
||||
* @brief Gets the number of total available workers.
|
||||
*
|
||||
* Before the first call, it is equal to the total number of present workers, as specified in the
|
||||
* specific assignment algorithm constructor. It allows the Job class to know when all the responses have
|
||||
* been received, by comparing this number to the total number of workers.
|
||||
*
|
||||
* @return Integer indicating how many workers are available.
|
||||
*/
|
||||
virtual int availableWorkers( ) = 0;
|
||||
|
||||
/**
|
||||
* @brief Reinject the worker of indicated rank in the available state.
|
||||
*
|
||||
* @param wrkRank The MPI rank of the worker who has finished its job.
|
||||
*/
|
||||
virtual void confirm( int wrkRank ) = 0;
|
||||
|
||||
/**
|
||||
* @brief Indicates who are the workers which do nothing.
|
||||
*
|
||||
* At the end of the algorithm, the master has to warn all the workers that it's done. All the workers mean,
|
||||
* the workers which are currently processing data, and the other ones who could be waiting : the idles.
|
||||
* This function indicates to the master which worker aren't doing anything.
|
||||
*
|
||||
* @return A std::vector containing all the MPI ranks of the idles workers.
|
||||
*/
|
||||
virtual std::vector<int> idles( ) = 0;
|
||||
|
||||
/**
|
||||
* @brief Reinitializes the assignment algorithm with the right number of runs.
|
||||
*
|
||||
* In fact, this is only useful for static assignment algorithm, which has to be reinitialized every time
|
||||
* it's used, in the case of a Multi Job. It's the user's responsability to call this function.
|
||||
*
|
||||
* @todo Not really clean. Find a better way to do it.
|
||||
*/
|
||||
virtual void reinit( int runs ) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Assignment (scheduling) algorithm which handles workers in a queue.
|
||||
*
|
||||
* With this assignment algorithm, workers are put in a queue and may be called an unlimited number of times.
|
||||
* Whenever a worker returns, it is added to the queue, and it becomes available for the next call to get().
|
||||
* The available workers are all located in the queue at any time, so the number of available workers is
|
||||
* directly equal to the size of the queue.
|
||||
*
|
||||
* This kind of assignment is adapted for tasks whose execution time is stochastic or unknown, but without any
|
||||
* warranty to be faster than other assignments.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
struct DynamicAssignmentAlgorithm : public AssignmentAlgorithm
|
||||
{
|
||||
public:
|
||||
|
||||
/**
|
||||
* @brief Uses all the hosts whose rank is higher to 1, inclusive, as workers.
|
||||
*/
|
||||
DynamicAssignmentAlgorithm( );
|
||||
|
||||
/**
|
||||
* @brief Uses the unique host with given rank as a worker.
|
||||
*
|
||||
* @param unique MPI rank of the unique worker.
|
||||
*/
|
||||
DynamicAssignmentAlgorithm( int unique );
|
||||
|
||||
/**
|
||||
* @brief Uses the workers whose ranks are present in the argument as workers.
|
||||
*
|
||||
* @param workers std::vector containing MPI ranks of workers.
|
||||
*/
|
||||
DynamicAssignmentAlgorithm( const std::vector<int> & workers );
|
||||
|
||||
/**
|
||||
* @brief Uses a range of ranks as workers.
|
||||
*
|
||||
* @param first The first worker to be included (inclusive)
|
||||
* @param last The last worker to be included (inclusive). If last == eo::mpi::REST_OF_THE_WORLD, all
|
||||
* hosts whose rank is higher than first are taken.
|
||||
*/
|
||||
DynamicAssignmentAlgorithm( int first, int last );
|
||||
|
||||
virtual int get( );
|
||||
|
||||
int availableWorkers();
|
||||
|
||||
void confirm( int rank );
|
||||
|
||||
std::vector<int> idles( );
|
||||
|
||||
void reinit( int _ );
|
||||
|
||||
protected:
|
||||
std::vector< int > availableWrk;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Assignment algorithm which gives to each worker a precise number of tasks to do, in a round robin
|
||||
* fashion.
|
||||
*
|
||||
* This scheduling algorithm attributes, at initialization or when calling reinit(), a fixed amount of runs to
|
||||
* distribute to the workers. The amount of runs is then equally distributed between all workers ; if total
|
||||
* number of runs is not a direct multiple of workers number, then remainding unaffected runs are distributed to
|
||||
* workers from the first to the last, in a round-robin fashion.
|
||||
*
|
||||
* This scheduling should be used when the amount of runs can be computed or is fixed and when we guess that the
|
||||
* duration of processing task will be the same for each run. There is no warranty that this algorithm is more
|
||||
* or less efficient that another one. When having a doubt, use DynamicAssignmentAlgorithm.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
struct StaticAssignmentAlgorithm : public AssignmentAlgorithm
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* @brief Uses a given precise set of workers.
|
||||
*
|
||||
* @param workers std::vector of MPI ranks of workers which will be used.
|
||||
* @param runs Fixed amount of runs, strictly positive.
|
||||
*/
|
||||
StaticAssignmentAlgorithm( const std::vector<int>& workers, int runs );
|
||||
|
||||
/**
|
||||
* @brief Uses a range of workers.
|
||||
*
|
||||
* @param first The first MPI rank of worker to use
|
||||
* @param last The last MPI rank of worker to use. If it's equal to REST_OF_THE_WORLD, then all the
|
||||
* workers from the first one are taken as workers.
|
||||
* @param runs Fixed amount of runs, strictly positive.
|
||||
*/
|
||||
StaticAssignmentAlgorithm( int first, int last, int runs );
|
||||
|
||||
/**
|
||||
* @brief Uses all the hosts whose rank is higher than 1 (inclusive) as workers.
|
||||
*
|
||||
* @param runs Fixed amount of runs, strictly positive. If it's not set, you'll have to call reinit()
|
||||
* later.
|
||||
*/
|
||||
StaticAssignmentAlgorithm( int runs = 0 );
|
||||
|
||||
/**
|
||||
* @brief Uses an unique host as worker.
|
||||
*
|
||||
* @param unique The MPI rank of the host which will be the worker.
|
||||
* @param runs Fixed amount of runs, strictly positive.
|
||||
*/
|
||||
StaticAssignmentAlgorithm( int unique, int runs );
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief Initializes the static scheduling.
|
||||
*
|
||||
* Gives to each worker an equal attribution number, equal to runs / workers.size(), eventually plus one
|
||||
* if number of workers is not a divisor of runs.
|
||||
*
|
||||
* @param workers Vector of hosts' ranks
|
||||
* @param runs Fixed amount of runs, strictly positive.
|
||||
*/
|
||||
void init( const std::vector<int> & workers, int runs );
|
||||
|
||||
public:
|
||||
int get( );
|
||||
|
||||
int availableWorkers( );
|
||||
|
||||
std::vector<int> idles();
|
||||
|
||||
void confirm( int rank );
|
||||
|
||||
void reinit( int runs );
|
||||
|
||||
private:
|
||||
std::vector<int> attributions;
|
||||
std::vector<int> realRank;
|
||||
std::vector<bool> busy;
|
||||
unsigned int freeWorkers;
|
||||
};
|
||||
}
|
||||
}
|
||||
# endif // __MPI_ASSIGNMENT_ALGORITHM_H__
|
||||
40
eompi/src/eoMpiNode.cpp
Normal file
40
eompi/src/eoMpiNode.cpp
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# include "eoMpiNode.h"
|
||||
|
||||
namespace eo
|
||||
{
|
||||
namespace mpi
|
||||
{
|
||||
void Node::init( int argc, char** argv )
|
||||
{
|
||||
static bmpi::environment env( argc, argv );
|
||||
}
|
||||
|
||||
bmpi::communicator& Node::comm()
|
||||
{
|
||||
return _comm;
|
||||
}
|
||||
|
||||
bmpi::communicator Node::_comm;
|
||||
}
|
||||
}
|
||||
65
eompi/src/eoMpiNode.h
Normal file
65
eompi/src/eoMpiNode.h
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# ifndef __MPI_NODE_H__
|
||||
# define __MPI_NODE_H__
|
||||
|
||||
# include "implMpi.h"
|
||||
namespace bmpi = mpi;
|
||||
|
||||
namespace eo
|
||||
{
|
||||
namespace mpi
|
||||
{
|
||||
/**
|
||||
* @brief Global object used to reach mpi::communicator everywhere.
|
||||
*
|
||||
* mpi::communicator is the main object used to send and receive messages between the different hosts of
|
||||
* a MPI algorithm.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
class Node
|
||||
{
|
||||
public:
|
||||
|
||||
/**
|
||||
* @brief Initializes the MPI environment with argc and argv.
|
||||
*
|
||||
* Should be called at the beginning of every parallel program.
|
||||
*
|
||||
* @param argc Main's argc
|
||||
* @param argv Main's argv
|
||||
*/
|
||||
static void init( int argc, char** argv );
|
||||
|
||||
/**
|
||||
* @brief Returns the global mpi::communicator
|
||||
*/
|
||||
static bmpi::communicator& comm();
|
||||
|
||||
protected:
|
||||
static bmpi::communicator _comm;
|
||||
};
|
||||
}
|
||||
}
|
||||
# endif // __MPI_NODE_H__
|
||||
|
||||
515
eompi/src/eoMultiStart.h
Normal file
515
eompi/src/eoMultiStart.h
Normal file
|
|
@ -0,0 +1,515 @@
|
|||
# ifndef __EO_MULTISTART_H__
|
||||
# define __EO_MULTISTART_H__
|
||||
|
||||
# include <eo>
|
||||
# include "eoMpi.h"
|
||||
|
||||
/**
|
||||
* @ingroup MPI
|
||||
* @{
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file eoMultiStart.h
|
||||
*
|
||||
* Contains implementation of a MPI job which consists in a multi start, which basically consists in the following:
|
||||
* the same eoAlgo is launched on computers of a clusters, with different seeds for each. As the eoAlgo are most of
|
||||
* the time stochastics, the results won't be the same. It is fully equivalent to launch the same program but with
|
||||
* different seeds.
|
||||
*
|
||||
* It follows the structure of a MPI job, as described in eoMpi.h. The basic algorithm is trivial:
|
||||
* - Loop while we have a run to perform.
|
||||
* - Worker performs runs and send their best solution (individual with best fitness) to the master.
|
||||
* - Master retrieves the best solution and adds it to a eoPop of best solutions (the user can chooses what he does
|
||||
* with this population, for instance: retrieve the best element, etc.)
|
||||
*
|
||||
* The principal concerns about this algorithm are:
|
||||
* - How do we reinitialize the algorithm? An eoAlgo can have several forms, and initializations have to be performed
|
||||
* before each "start". We can hence decide whether we reinits the population or keep the same population obtained
|
||||
* after the previous start, we have to reinitialize continuator, etc. This is customizable in the store.
|
||||
*
|
||||
* - Which seeds should be chosen? If we want the run to be re-runnable with the same results, we need to be sure that
|
||||
* the seeds are the same. But user can not care about this, and just want random seeds. This is customizable in the
|
||||
* store.
|
||||
*
|
||||
* These concerns are handled by functors, inheriting from MultiStartStore<EOT>::ResetAlgo (for the first concern), and
|
||||
* MultiStartStore<EOT>::GetSeeds (for the second one). There are default implementations, but there is no problem about
|
||||
* specializing them or coding your own, by directly inheriting from them.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
|
||||
namespace eo
|
||||
{
|
||||
namespace mpi
|
||||
{
|
||||
/**
|
||||
* @brief Data used by the Multi Start job.
|
||||
*
|
||||
* This data is shared between the different Job functors. More details are given for each attribute.
|
||||
*/
|
||||
template< class EOT >
|
||||
struct MultiStartData
|
||||
{
|
||||
typedef eoUF< eoPop<EOT>&, void> ResetAlgo;
|
||||
|
||||
MultiStartData(
|
||||
bmpi::communicator& _comm,
|
||||
eoAlgo<EOT>& _algo,
|
||||
int _masterRank,
|
||||
ResetAlgo & _resetAlgo )
|
||||
:
|
||||
runs( 0 ), bests(), pop(),
|
||||
comm( _comm ), algo( _algo ), resetAlgo( _resetAlgo ), masterRank( _masterRank )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
// dynamic parameters
|
||||
/**
|
||||
* @brief Total remaining number of runs.
|
||||
*
|
||||
* It's decremented as the runs are performed.
|
||||
*/
|
||||
int runs;
|
||||
|
||||
/**
|
||||
* @brief eoPop of the best individuals, which are the one sent by the workers.
|
||||
*/
|
||||
eoPop< EOT > bests;
|
||||
|
||||
/**
|
||||
* @brief eoPop on which the worker is working.
|
||||
*/
|
||||
eoPop< EOT > pop;
|
||||
|
||||
// static parameters
|
||||
/**
|
||||
* @brief Communicator, used to send and retrieve messages.
|
||||
*/
|
||||
bmpi::communicator& comm;
|
||||
|
||||
/**
|
||||
* @brief Algorithm which will be performed by the worker.
|
||||
*/
|
||||
eoAlgo<EOT>& algo;
|
||||
|
||||
/**
|
||||
* @brief Reset Algo functor, which defines how to reset the algo (above) before re running it.
|
||||
*/
|
||||
ResetAlgo& resetAlgo;
|
||||
|
||||
// Rank of master
|
||||
int masterRank;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Send task (master side) in the Multi Start job.
|
||||
*
|
||||
* It only consists in decrementing the number of runs, as the worker already have the population and
|
||||
* all the necessary parameters to run the eoAlgo.
|
||||
*/
|
||||
template< class EOT >
|
||||
class SendTaskMultiStart : public SendTaskFunction< MultiStartData< EOT > >
|
||||
{
|
||||
public:
|
||||
using SendTaskFunction< MultiStartData< EOT > >::_data;
|
||||
|
||||
void operator()( int wrkRank )
|
||||
{
|
||||
wrkRank++; // unused
|
||||
--(_data->runs);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Handle Response (master side) in the Multi Start job.
|
||||
*
|
||||
* It consists in retrieving the best solution sent by the worker and adds it to a population of best
|
||||
* solutions.
|
||||
*/
|
||||
template< class EOT >
|
||||
class HandleResponseMultiStart : public HandleResponseFunction< MultiStartData< EOT > >
|
||||
{
|
||||
public:
|
||||
using HandleResponseFunction< MultiStartData< EOT > >::_data;
|
||||
|
||||
void operator()( int wrkRank )
|
||||
{
|
||||
EOT individual;
|
||||
MultiStartData< EOT >& d = *_data;
|
||||
d.comm.recv( wrkRank, eo::mpi::Channel::Messages, individual );
|
||||
d.bests.push_back( individual );
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Process Task (worker side) in the Multi Start job.
|
||||
*
|
||||
* Consists in resetting the algorithm and launching it on the population, then
|
||||
* send the best individual (the one with the best fitness) to the master.
|
||||
*/
|
||||
template< class EOT >
|
||||
class ProcessTaskMultiStart : public ProcessTaskFunction< MultiStartData< EOT > >
|
||||
{
|
||||
public:
|
||||
using ProcessTaskFunction< MultiStartData<EOT > >::_data;
|
||||
|
||||
void operator()()
|
||||
{
|
||||
_data->resetAlgo( _data->pop );
|
||||
_data->algo( _data->pop );
|
||||
_data->comm.send( _data->masterRank, eo::mpi::Channel::Messages, _data->pop.best_element() );
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Is Finished (master side) in the Multi Start job.
|
||||
*
|
||||
* The job is finished if and only if all the runs have been performed.
|
||||
*/
|
||||
template< class EOT >
|
||||
class IsFinishedMultiStart : public IsFinishedFunction< MultiStartData< EOT > >
|
||||
{
|
||||
public:
|
||||
using IsFinishedFunction< MultiStartData< EOT > >::_data;
|
||||
|
||||
bool operator()()
|
||||
{
|
||||
return _data->runs <= 0;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Store for the Multi Start job.
|
||||
*
|
||||
* Contains the data used by the workers (algo,...) and functor to
|
||||
* send the seeds.
|
||||
*/
|
||||
template< class EOT >
|
||||
class MultiStartStore : public JobStore< MultiStartData< EOT > >
|
||||
{
|
||||
public:
|
||||
|
||||
/**
|
||||
* @brief Generic functor to reset an algorithm before it's launched by
|
||||
* the worker.
|
||||
*
|
||||
* This reset algorithm should reinits population (if necessary), continuator, etc.
|
||||
*/
|
||||
typedef typename MultiStartData<EOT>::ResetAlgo ResetAlgo;
|
||||
|
||||
/**
|
||||
* @brief Generic functor which returns a vector of seeds for the workers.
|
||||
*
|
||||
* If this vector hasn't enough seeds to send, random ones are generated and
|
||||
* sent to the workers.
|
||||
*/
|
||||
typedef eoUF< int, std::vector<int> > GetSeeds;
|
||||
|
||||
/**
|
||||
* @brief Default ctor for MultiStartStore.
|
||||
*
|
||||
* @param algo The algorithm to launch in parallel
|
||||
* @param masterRank The MPI rank of the master
|
||||
* @param resetAlgo The ResetAlgo functor
|
||||
* @param getSeeds The GetSeeds functor
|
||||
*/
|
||||
MultiStartStore(
|
||||
eoAlgo<EOT> & algo,
|
||||
int masterRank,
|
||||
ResetAlgo & resetAlgo,
|
||||
GetSeeds & getSeeds
|
||||
)
|
||||
: _data( eo::mpi::Node::comm(), algo, masterRank, resetAlgo ),
|
||||
_getSeeds( getSeeds ),
|
||||
_masterRank( masterRank )
|
||||
{
|
||||
// Default job functors for this one.
|
||||
this->_iff = new IsFinishedMultiStart< EOT >;
|
||||
this->_iff->needDelete(true);
|
||||
this->_stf = new SendTaskMultiStart< EOT >;
|
||||
this->_stf->needDelete(true);
|
||||
this->_hrf = new HandleResponseMultiStart< EOT >;
|
||||
this->_hrf->needDelete(true);
|
||||
this->_ptf = new ProcessTaskMultiStart< EOT >;
|
||||
this->_ptf->needDelete(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Send new seeds to the workers before a job.
|
||||
*
|
||||
* Uses the GetSeeds functor given in constructor. If there's not
|
||||
* enough seeds to send, random seeds are sent to the workers.
|
||||
*
|
||||
* @param workers Vector of MPI ranks of the workers
|
||||
* @param runs The number of runs to perform
|
||||
*/
|
||||
void init( const std::vector<int>& workers, int runs )
|
||||
{
|
||||
_data.runs = runs;
|
||||
|
||||
unsigned nbWorkers = workers.size();
|
||||
std::vector< int > seeds = _getSeeds( nbWorkers );
|
||||
if( eo::mpi::Node::comm().rank() == _masterRank )
|
||||
{
|
||||
if( seeds.size() < nbWorkers )
|
||||
{
|
||||
// Random seeds
|
||||
for( unsigned i = seeds.size(); i < nbWorkers; ++i )
|
||||
{
|
||||
seeds.push_back( eo::rng.rand() );
|
||||
}
|
||||
}
|
||||
|
||||
for( unsigned i = 0 ; i < nbWorkers ; ++i )
|
||||
{
|
||||
int wrkRank = workers[i];
|
||||
eo::mpi::Node::comm().send( wrkRank, eo::mpi::Channel::Commands, seeds[ i ] );
|
||||
}
|
||||
} else
|
||||
{
|
||||
int seed;
|
||||
eo::mpi::Node::comm().recv( _masterRank, eo::mpi::Channel::Commands, seed );
|
||||
eo::log << eo::debug << eo::mpi::Node::comm().rank() << "- Seed: " << seed << std::endl;
|
||||
eo::rng.reseed( seed );
|
||||
}
|
||||
}
|
||||
|
||||
MultiStartData<EOT>* data()
|
||||
{
|
||||
return &_data;
|
||||
}
|
||||
|
||||
private:
|
||||
MultiStartData< EOT > _data;
|
||||
GetSeeds & _getSeeds;
|
||||
int _masterRank;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief MultiStart job, created for convenience.
|
||||
*
|
||||
* This is an OneShotJob, which means workers leave it along with
|
||||
* the master.
|
||||
*/
|
||||
template< class EOT >
|
||||
class MultiStart : public OneShotJob< MultiStartData< EOT > >
|
||||
{
|
||||
public:
|
||||
|
||||
MultiStart( AssignmentAlgorithm & algo,
|
||||
int masterRank,
|
||||
MultiStartStore< EOT > & store,
|
||||
// dynamic parameters
|
||||
int runs ) :
|
||||
OneShotJob< MultiStartData< EOT > >( algo, masterRank, store )
|
||||
{
|
||||
store.init( algo.idles(), runs );
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Returns the best solution, at the end of the job.
|
||||
*
|
||||
* Warning: if you call this function from a worker, or from the master before the
|
||||
* launch of the job, you will only get an empty population!
|
||||
*
|
||||
* @return Population of best individuals retrieved by the master.
|
||||
*/
|
||||
eoPop<EOT>& best_individuals()
|
||||
{
|
||||
return this->store.data()->bests;
|
||||
}
|
||||
};
|
||||
|
||||
/*************************************
|
||||
* DEFAULT GET SEEDS IMPLEMENTATIONS *
|
||||
************************************/
|
||||
|
||||
/**
|
||||
* @brief Uses the internal default seed generator to get seeds,
|
||||
* which means: random seeds are sent.
|
||||
*/
|
||||
template<class EOT>
|
||||
struct DummyGetSeeds : public MultiStartStore<EOT>::GetSeeds
|
||||
{
|
||||
std::vector<int> operator()( int n )
|
||||
{
|
||||
return std::vector<int>();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Sends seeds to the workers, which are multiple of a number
|
||||
* given by the master. If no number is given, a random one is used.
|
||||
*
|
||||
* This functor ensures that even if the same store is used with
|
||||
* different jobs, the seeds will be different.
|
||||
*/
|
||||
template<class EOT>
|
||||
struct MultiplesOfNumber : public MultiStartStore<EOT>::GetSeeds
|
||||
{
|
||||
MultiplesOfNumber ( int n = 0 )
|
||||
{
|
||||
while( n == 0 )
|
||||
{
|
||||
n = eo::rng.rand();
|
||||
}
|
||||
_seed = n;
|
||||
_i = 0;
|
||||
}
|
||||
|
||||
std::vector<int> operator()( int n )
|
||||
{
|
||||
std::vector<int> ret;
|
||||
for( unsigned int i = 0; i < n; ++i )
|
||||
{
|
||||
ret.push_back( (++_i) * _seed );
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
unsigned int _seed;
|
||||
unsigned int _i;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Returns random seeds to the workers. We can controle which seeds are generated
|
||||
* by precising the seed of the master.
|
||||
*/
|
||||
template<class EOT>
|
||||
struct GetRandomSeeds : public MultiStartStore<EOT>::GetSeeds
|
||||
{
|
||||
GetRandomSeeds( int seed )
|
||||
{
|
||||
eo::rng.reseed( seed );
|
||||
}
|
||||
|
||||
std::vector<int> operator()( int n )
|
||||
{
|
||||
std::vector<int> ret;
|
||||
for(int i = 0; i < n; ++i)
|
||||
{
|
||||
ret.push_back( eo::rng.rand() );
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
/**************************************
|
||||
* DEFAULT RESET ALGO IMPLEMENTATIONS *
|
||||
**************************************/
|
||||
|
||||
/**
|
||||
* @brief For a Genetic Algorithm, reinits the population by copying the original one
|
||||
* given in constructor, and reinits the continuator.
|
||||
*
|
||||
* The evaluator should also be given, as the population needs to be evaluated
|
||||
* before each run.
|
||||
*/
|
||||
template<class EOT>
|
||||
struct ReuseOriginalPopEA: public MultiStartStore<EOT>::ResetAlgo
|
||||
{
|
||||
ReuseOriginalPopEA(
|
||||
eoCountContinue<EOT> & continuator,
|
||||
const eoPop<EOT>& originalPop,
|
||||
eoEvalFunc<EOT>& eval) :
|
||||
_continuator( continuator ),
|
||||
_originalPop( originalPop ),
|
||||
_pop_eval( eval )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
ReuseOriginalPopEA(
|
||||
eoCountContinue<EOT> & continuator,
|
||||
const eoPop<EOT>& originalPop,
|
||||
eoPopEvalFunc<EOT>& pop_eval
|
||||
) :
|
||||
_continuator( continuator ),
|
||||
_originalPop( originalPop ),
|
||||
_pop_eval( pop_eval )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
void operator()( eoPop<EOT>& pop )
|
||||
{
|
||||
pop = _originalPop; // copies the original population
|
||||
_pop_eval( pop, pop );
|
||||
_continuator.reset();
|
||||
}
|
||||
|
||||
private:
|
||||
eoCountContinue<EOT> & _continuator;
|
||||
const eoPop<EOT>& _originalPop;
|
||||
eoPopEvalFunc<EOT>& _pop_eval;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief For a Genetic Algorithm, reuses the same population without
|
||||
* modifying it after a run.
|
||||
*
|
||||
* This means, if you launch a run after another one, you'll make evolve
|
||||
* the same population.
|
||||
*
|
||||
* The evaluator should also be sent, as the population needs to be evaluated
|
||||
* at the first time.
|
||||
*/
|
||||
template< class EOT >
|
||||
struct ReuseSamePopEA : public MultiStartStore<EOT>::ResetAlgo
|
||||
{
|
||||
ReuseSamePopEA(
|
||||
eoCountContinue<EOT>& continuator,
|
||||
const eoPop<EOT>& originalPop,
|
||||
eoEvalFunc<EOT>& eval
|
||||
) :
|
||||
_continuator( continuator ),
|
||||
_originalPop( originalPop ),
|
||||
_firstTime( true )
|
||||
{
|
||||
for( unsigned i = 0, size = originalPop.size();
|
||||
i < size; ++i )
|
||||
{
|
||||
eval(_originalPop[i]);
|
||||
}
|
||||
}
|
||||
|
||||
ReuseSamePopEA(
|
||||
eoCountContinue<EOT>& continuator,
|
||||
const eoPop<EOT>& originalPop,
|
||||
eoPopEvalFunc<EOT>& pop_eval
|
||||
) :
|
||||
_continuator( continuator ),
|
||||
_originalPop( originalPop ),
|
||||
_firstTime( true )
|
||||
{
|
||||
pop_eval( _originalPop, _originalPop );
|
||||
}
|
||||
|
||||
void operator()( eoPop<EOT>& pop )
|
||||
{
|
||||
if( _firstTime )
|
||||
{
|
||||
pop = _originalPop;
|
||||
_firstTime = false;
|
||||
}
|
||||
_continuator.reset();
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
eoCountContinue<EOT>& _continuator;
|
||||
eoPop<EOT> _originalPop;
|
||||
bool _firstTime;
|
||||
};
|
||||
} // namespace mpi
|
||||
} // namespace eo
|
||||
|
||||
/**
|
||||
* @}
|
||||
*/
|
||||
|
||||
# endif // __EO_MULTISTART_H__
|
||||
387
eompi/src/eoParallelApply.h
Normal file
387
eompi/src/eoParallelApply.h
Normal file
|
|
@ -0,0 +1,387 @@
|
|||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# ifndef __EO_PARALLEL_APPLY_H__
|
||||
# define __EO_PARALLEL_APPLY_H__
|
||||
|
||||
# include "eoMpi.h"
|
||||
|
||||
# include <eoFunctor.h> // eoUF
|
||||
# include <vector> // std::vector population
|
||||
|
||||
/**
|
||||
* @file eoParallelApply.h
|
||||
*
|
||||
* @brief Applies a functor with single parameter to elements of a table, in a parallel fashion.
|
||||
*
|
||||
* This file contains all the required classes to do a parallel apply of a table, in a parallel fashion. This can be
|
||||
* very useful when applying the function can be made without any dependances within the data. In EO, it occurs in
|
||||
* particular during the evaluation: the number of individuals to evaluate can be really high, and the evaluation of one
|
||||
* individual is independant from the evaluation of other individuals.
|
||||
*
|
||||
* Elements in the table are directly replaced, as the table is given by reference. No new table is made during the
|
||||
* process.
|
||||
*
|
||||
* User can tune this job, indicating how many elements of the table should be sent and evaluated by a worker, at a
|
||||
* time: this is called the "packet size", as individuals are groupped into a packet of individuals which are sent to
|
||||
* the worker before evaluation. The problem of choosing the optimal packet size is beyond the purposes of this documentation
|
||||
* and deserves a theoritical study.
|
||||
*
|
||||
* This job is the parallel equivalent to the function apply<EOT>, defined in apply.h. It just applies the function to
|
||||
* every element of a table. In Python or Javascript, it's the equivalent of the function Map.
|
||||
*/
|
||||
|
||||
namespace eo
|
||||
{
|
||||
namespace mpi
|
||||
{
|
||||
/**
|
||||
* @brief Structure used to save assignment to a worker, i.e which slice of the table it has to process.
|
||||
*
|
||||
* This slice is defined by the index of the first evaluated argument and the number of processed elements.
|
||||
*/
|
||||
struct ParallelApplyAssignment
|
||||
{
|
||||
int index;
|
||||
int size;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Data useful for a parallel apply (map).
|
||||
*
|
||||
* A parallel apply needs at least the functor to apply to every element of the table, and the table itself,
|
||||
* whereas it can be set later with the function init(). Master rank is also needed, to send it informations and
|
||||
* receive informations from it, inside the functors (the job knows these values, but the functors don't). The
|
||||
* size of a packet can be tuned here.
|
||||
*
|
||||
* Internal attributes contain:
|
||||
* - (useful for master) the index of the next element to be evaluated.
|
||||
* - (useful for master) a map containing links between MPI ranks and slices of the table which the worker with
|
||||
* this rank has evaluated. Without this map, when receiving results from a worker, the master couldn't be
|
||||
* able to replace the right elements in the table.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template<class EOT>
|
||||
struct ParallelApplyData
|
||||
{
|
||||
/**
|
||||
* @brief Ctor for Parallel apply (map) data.
|
||||
*
|
||||
* @param _proc The functor to apply on each element in the table
|
||||
* @param _masterRank The MPI rank of the master
|
||||
* @param _packetSize The number of elements on which the function will be applied by the worker, at a time.
|
||||
* @param table The table to apply. If this value is NULL, user will have to call init() before launching the
|
||||
* job.
|
||||
*/
|
||||
ParallelApplyData(
|
||||
eoUF<EOT&, void> & _proc,
|
||||
int _masterRank,
|
||||
int _packetSize,
|
||||
std::vector<EOT> * table = 0
|
||||
) :
|
||||
_table( table ), func( _proc ), index( 0 ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() )
|
||||
{
|
||||
if ( _packetSize <= 0 )
|
||||
{
|
||||
throw std::runtime_error("Packet size should not be negative.");
|
||||
}
|
||||
|
||||
if( table )
|
||||
{
|
||||
size = table->size();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Reinitializes the data for a new table to evaluate.
|
||||
*/
|
||||
void init( std::vector<EOT>& table )
|
||||
{
|
||||
index = 0;
|
||||
size = table.size();
|
||||
_table = &table;
|
||||
assignedTasks.clear();
|
||||
}
|
||||
|
||||
std::vector<EOT>& table()
|
||||
{
|
||||
return *_table;
|
||||
}
|
||||
|
||||
// All elements are public since functors will often use them.
|
||||
std::vector<EOT> * _table;
|
||||
eoUF<EOT&, void> & func;
|
||||
int index;
|
||||
int size;
|
||||
std::map< int /* worker rank */, ParallelApplyAssignment /* last assignment */> assignedTasks;
|
||||
int packetSize;
|
||||
std::vector<EOT> tempArray;
|
||||
|
||||
int masterRank;
|
||||
bmpi::communicator& comm;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Send task functor implementation for the parallel apply (map) job.
|
||||
*
|
||||
* Master side: Sends a slice of the table to evaluate to the worker.
|
||||
*
|
||||
* Implementation details:
|
||||
* Finds the next slice of data to send to the worker, sends first the size and then the data, and memorizes
|
||||
* that this slice has been distributed to the worker, then updates the next position of element to evaluate.
|
||||
*/
|
||||
template< class EOT >
|
||||
class SendTaskParallelApply : public SendTaskFunction< ParallelApplyData<EOT> >
|
||||
{
|
||||
public:
|
||||
using SendTaskFunction< ParallelApplyData<EOT> >::_data;
|
||||
|
||||
SendTaskParallelApply( SendTaskParallelApply<EOT> * w = 0 ) : SendTaskFunction< ParallelApplyData<EOT> >( w )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
void operator()(int wrkRank)
|
||||
{
|
||||
int futureIndex;
|
||||
|
||||
if( _data->index + _data->packetSize < _data->size )
|
||||
{
|
||||
futureIndex = _data->index + _data->packetSize;
|
||||
} else {
|
||||
futureIndex = _data->size;
|
||||
}
|
||||
|
||||
int sentSize = futureIndex - _data->index ;
|
||||
|
||||
_data->comm.send( wrkRank, eo::mpi::Channel::Messages, sentSize );
|
||||
|
||||
eo::log << eo::debug << "Evaluating individual " << _data->index << std::endl;
|
||||
|
||||
_data->assignedTasks[ wrkRank ].index = _data->index;
|
||||
_data->assignedTasks[ wrkRank ].size = sentSize;
|
||||
|
||||
_data->comm.send( wrkRank, eo::mpi::Channel::Messages, & ( (_data->table())[ _data->index ] ) , sentSize );
|
||||
_data->index = futureIndex;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Handle response functor implementation for the parallel apply (map) job.
|
||||
*
|
||||
* Master side: Replaces the slice of data attributed to the worker in the table.
|
||||
*/
|
||||
template< class EOT >
|
||||
class HandleResponseParallelApply : public HandleResponseFunction< ParallelApplyData<EOT> >
|
||||
{
|
||||
public:
|
||||
using HandleResponseFunction< ParallelApplyData<EOT> >::_data;
|
||||
|
||||
HandleResponseParallelApply( HandleResponseParallelApply<EOT> * w = 0 ) : HandleResponseFunction< ParallelApplyData<EOT> >( w )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
void operator()(int wrkRank)
|
||||
{
|
||||
_data->comm.recv( wrkRank, eo::mpi::Channel::Messages, & (_data->table()[ _data->assignedTasks[wrkRank].index ] ), _data->assignedTasks[wrkRank].size );
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Process task functor implementation for the parallel apply (map) job.
|
||||
*
|
||||
* Worker side: apply the function to the given slice of data.
|
||||
*
|
||||
* Implementation details: retrieves the number of elements to evaluate, retrieves them, applies the function
|
||||
* and then returns the results.
|
||||
*/
|
||||
template< class EOT >
|
||||
class ProcessTaskParallelApply : public ProcessTaskFunction< ParallelApplyData<EOT> >
|
||||
{
|
||||
public:
|
||||
using ProcessTaskFunction< ParallelApplyData<EOT> >::_data;
|
||||
|
||||
ProcessTaskParallelApply( ProcessTaskParallelApply<EOT> * w = 0 ) : ProcessTaskFunction< ParallelApplyData<EOT> >( w )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
void operator()()
|
||||
{
|
||||
int recvSize;
|
||||
|
||||
_data->comm.recv( _data->masterRank, eo::mpi::Channel::Messages, recvSize );
|
||||
_data->tempArray.resize( recvSize );
|
||||
_data->comm.recv( _data->masterRank, eo::mpi::Channel::Messages, & _data->tempArray[0] , recvSize );
|
||||
timerStat.start("worker_processes");
|
||||
for( int i = 0; i < recvSize ; ++i )
|
||||
{
|
||||
_data->func( _data->tempArray[ i ] );
|
||||
}
|
||||
timerStat.stop("worker_processes");
|
||||
_data->comm.send( _data->masterRank, eo::mpi::Channel::Messages, & _data->tempArray[0], recvSize );
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Is finished functor implementation for the parallel apply (map) job.
|
||||
*
|
||||
* Master side: returns true if and only if the whole table has been evaluated. The job is also terminated only
|
||||
* when the whole table has been evaluated.
|
||||
*/
|
||||
template< class EOT >
|
||||
class IsFinishedParallelApply : public IsFinishedFunction< ParallelApplyData<EOT> >
|
||||
{
|
||||
public:
|
||||
using IsFinishedFunction< ParallelApplyData<EOT> >::_data;
|
||||
|
||||
IsFinishedParallelApply( IsFinishedParallelApply<EOT> * w = 0 ) : IsFinishedFunction< ParallelApplyData<EOT> >( w )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
bool operator()()
|
||||
{
|
||||
return _data->index == _data->size;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Store containing all the datas and the functors for the parallel apply (map) job.
|
||||
*
|
||||
* User can tune functors when constructing the object. For each functor which is not given, a default one is
|
||||
* generated.
|
||||
*
|
||||
* @ingroup MPI
|
||||
*/
|
||||
template< class EOT >
|
||||
struct ParallelApplyStore : public JobStore< ParallelApplyData<EOT> >
|
||||
{
|
||||
using JobStore< ParallelApplyData<EOT> >::_stf;
|
||||
using JobStore< ParallelApplyData<EOT> >::_hrf;
|
||||
using JobStore< ParallelApplyData<EOT> >::_ptf;
|
||||
using JobStore< ParallelApplyData<EOT> >::_iff;
|
||||
|
||||
/**
|
||||
* @brief Main constructor for the parallel apply (map) job.
|
||||
*
|
||||
* @param _proc The procedure to apply to each element of the table.
|
||||
* @param _masterRank The rank of the master process.
|
||||
* @param _packetSize The number of elements of the table to be evaluated at a time, by the worker.
|
||||
* @param stpa Pointer to Send Task parallel apply functor descendant. If null, a default one is used.
|
||||
* @param hrpa Pointer to Handle Response parallel apply functor descendant. If null, a default one is used.
|
||||
* @param ptpa Pointer to Process Task parallel apply functor descendant. If null, a default one is used.
|
||||
* @param ifpa Pointer to Is Finished parallel apply functor descendant. If null, a default one is used.
|
||||
*/
|
||||
ParallelApplyStore(
|
||||
eoUF<EOT&, void> & _proc,
|
||||
int _masterRank,
|
||||
int _packetSize = 1,
|
||||
// JobStore functors
|
||||
SendTaskParallelApply<EOT> * stpa = 0,
|
||||
HandleResponseParallelApply<EOT>* hrpa = 0,
|
||||
ProcessTaskParallelApply<EOT>* ptpa = 0,
|
||||
IsFinishedParallelApply<EOT>* ifpa = 0
|
||||
) :
|
||||
_data( _proc, _masterRank, _packetSize )
|
||||
{
|
||||
if( stpa == 0 ) {
|
||||
stpa = new SendTaskParallelApply<EOT>;
|
||||
stpa->needDelete( true );
|
||||
}
|
||||
|
||||
if( hrpa == 0 ) {
|
||||
hrpa = new HandleResponseParallelApply<EOT>;
|
||||
hrpa->needDelete( true );
|
||||
}
|
||||
|
||||
if( ptpa == 0 ) {
|
||||
ptpa = new ProcessTaskParallelApply<EOT>;
|
||||
ptpa->needDelete( true );
|
||||
}
|
||||
|
||||
if( ifpa == 0 ) {
|
||||
ifpa = new IsFinishedParallelApply<EOT>;
|
||||
ifpa->needDelete( true );
|
||||
}
|
||||
|
||||
_stf = stpa;
|
||||
_hrf = hrpa;
|
||||
_ptf = ptpa;
|
||||
_iff = ifpa;
|
||||
}
|
||||
|
||||
ParallelApplyData<EOT>* data() { return &_data; }
|
||||
|
||||
/**
|
||||
* @brief Reinits the store with a new table to evaluate.
|
||||
*
|
||||
* @param _pop The table of elements to be evaluated.
|
||||
*/
|
||||
void data( std::vector<EOT>& _pop )
|
||||
{
|
||||
_data.init( _pop );
|
||||
}
|
||||
|
||||
virtual ~ParallelApplyStore() // for inheritance purposes only
|
||||
{
|
||||
}
|
||||
|
||||
protected:
|
||||
ParallelApplyData<EOT> _data;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Parallel apply job. Present for convenience only.
|
||||
*
|
||||
* A typedef wouldn't have been working, as typedef on templates don't work in C++. Traits would be a
|
||||
* disgraceful overload for the user.
|
||||
*
|
||||
* @ingroup MPI
|
||||
* @see eoParallelApply.h
|
||||
*/
|
||||
template< typename EOT >
|
||||
class ParallelApply : public MultiJob< ParallelApplyData<EOT> >
|
||||
{
|
||||
public:
|
||||
|
||||
ParallelApply(
|
||||
AssignmentAlgorithm & algo,
|
||||
int _masterRank,
|
||||
ParallelApplyStore<EOT> & store
|
||||
) :
|
||||
MultiJob< ParallelApplyData<EOT> >( algo, _masterRank, store )
|
||||
{
|
||||
// empty
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @example t-mpi-parallelApply.cpp
|
||||
* @example t-mpi-multipleRoles.cpp
|
||||
*/
|
||||
}
|
||||
}
|
||||
# endif // __EO_PARALLEL_APPLY_H__
|
||||
|
||||
|
||||
140
eompi/src/eoTerminateJob.h
Normal file
140
eompi/src/eoTerminateJob.h
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# ifndef __EO_TERMINATE_H__
|
||||
# define __EO_TERMINATE_H__
|
||||
|
||||
# include "eoMpi.h"
|
||||
|
||||
namespace eo
|
||||
{
|
||||
namespace mpi
|
||||
{
|
||||
/**
|
||||
* @ingroup MPI
|
||||
* @{
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Send task functor which does nothing.
|
||||
*/
|
||||
struct DummySendTaskFunction : public SendTaskFunction<void>
|
||||
{
|
||||
void operator()( int _ )
|
||||
{
|
||||
++_;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Handle response functor which does nothing.
|
||||
*/
|
||||
struct DummyHandleResponseFunction : public HandleResponseFunction<void>
|
||||
{
|
||||
void operator()( int _ )
|
||||
{
|
||||
++_;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Process task functor which does nothing.
|
||||
*/
|
||||
struct DummyProcessTaskFunction : public ProcessTaskFunction<void>
|
||||
{
|
||||
void operator()()
|
||||
{
|
||||
// nothing!
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Is finished functor which returns true everytime.
|
||||
*/
|
||||
struct DummyIsFinishedFunction : public IsFinishedFunction<void>
|
||||
{
|
||||
bool operator()()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Job store containing all dummy functors and containing no data.
|
||||
*/
|
||||
struct DummyJobStore : public JobStore<void>
|
||||
{
|
||||
using JobStore<void>::_stf;
|
||||
using JobStore<void>::_hrf;
|
||||
using JobStore<void>::_ptf;
|
||||
using JobStore<void>::_iff;
|
||||
|
||||
DummyJobStore()
|
||||
{
|
||||
_stf = new DummySendTaskFunction;
|
||||
_stf->needDelete( true );
|
||||
_hrf = new DummyHandleResponseFunction;
|
||||
_hrf->needDelete( true );
|
||||
_ptf = new DummyProcessTaskFunction;
|
||||
_ptf->needDelete( true );
|
||||
_iff = new DummyIsFinishedFunction;
|
||||
_iff->needDelete( true );
|
||||
}
|
||||
|
||||
void* data() { return 0; }
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Job to run after a Multi Job, so as to indicate that every workers should terminate.
|
||||
*/
|
||||
struct EmptyJob : public OneShotJob<void>
|
||||
{
|
||||
/**
|
||||
* @brief Main EmptyJob ctor
|
||||
*
|
||||
* @param algo Assignment (scheduling) algorithm used.
|
||||
* @param masterRank The rank of the master process.
|
||||
*/
|
||||
EmptyJob( AssignmentAlgorithm& algo, int masterRank ) :
|
||||
OneShotJob<void>( algo, masterRank, *(new DummyJobStore) )
|
||||
// the job store is deleted on destructor
|
||||
{
|
||||
// empty
|
||||
}
|
||||
|
||||
~EmptyJob()
|
||||
{
|
||||
std::vector< int > idles = assignmentAlgo.idles();
|
||||
for(unsigned i = 0, size = idles.size(); i < size; ++i)
|
||||
{
|
||||
comm.send( idles[i], Channel::Commands, Message::Kill );
|
||||
}
|
||||
delete & this->store;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
# endif // __EO_TERMINATE_H__
|
||||
166
eompi/src/implMpi.cpp
Normal file
166
eompi/src/implMpi.cpp
Normal file
|
|
@ -0,0 +1,166 @@
|
|||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# include "implMpi.h"
|
||||
|
||||
namespace mpi
|
||||
{
|
||||
const int any_source = MPI_ANY_SOURCE;
|
||||
const int any_tag = MPI_ANY_TAG;
|
||||
|
||||
environment::environment(int argc, char**argv)
|
||||
{
|
||||
MPI_Init(&argc, &argv);
|
||||
}
|
||||
|
||||
environment::~environment()
|
||||
{
|
||||
MPI_Finalize();
|
||||
}
|
||||
|
||||
status::status( const MPI_Status & s )
|
||||
{
|
||||
_source = s.MPI_SOURCE;
|
||||
_tag = s.MPI_TAG;
|
||||
_error = s.MPI_ERROR;
|
||||
}
|
||||
|
||||
communicator::communicator( )
|
||||
{
|
||||
_rank = -1;
|
||||
_size = -1;
|
||||
|
||||
_buf = 0;
|
||||
_bufsize = -1;
|
||||
}
|
||||
|
||||
communicator::~communicator()
|
||||
{
|
||||
if( _buf )
|
||||
{
|
||||
delete _buf;
|
||||
_buf = 0;
|
||||
}
|
||||
}
|
||||
|
||||
int communicator::rank()
|
||||
{
|
||||
if ( _rank == -1 )
|
||||
{
|
||||
MPI_Comm_rank( MPI_COMM_WORLD, &_rank );
|
||||
}
|
||||
return _rank;
|
||||
}
|
||||
|
||||
int communicator::size()
|
||||
{
|
||||
if ( _size == -1 )
|
||||
{
|
||||
MPI_Comm_size( MPI_COMM_WORLD, &_size );
|
||||
}
|
||||
return _size;
|
||||
}
|
||||
|
||||
/*
|
||||
* SEND / RECV INT
|
||||
*/
|
||||
void communicator::send( int dest, int tag, int n )
|
||||
{
|
||||
MPI_Send( &n, 1, MPI_INT, dest, tag, MPI_COMM_WORLD );
|
||||
}
|
||||
|
||||
void communicator::recv( int src, int tag, int& n )
|
||||
{
|
||||
MPI_Status stat;
|
||||
MPI_Recv( &n, 1, MPI_INT, src, tag, MPI_COMM_WORLD , &stat );
|
||||
}
|
||||
|
||||
/*
|
||||
* SEND / RECV STRING
|
||||
*/
|
||||
void communicator::send( int dest, int tag, const std::string& str )
|
||||
{
|
||||
int size = str.size() + 1;
|
||||
send( dest, tag, size );
|
||||
MPI_Send( (char*)str.c_str(), size, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
|
||||
}
|
||||
|
||||
void communicator::recv( int src, int tag, std::string& str )
|
||||
{
|
||||
int size = -1;
|
||||
MPI_Status stat;
|
||||
recv( src, tag, size );
|
||||
|
||||
if( _buf == 0 )
|
||||
{
|
||||
_buf = new char[ size ];
|
||||
_bufsize = size;
|
||||
} else if( _bufsize < size )
|
||||
{
|
||||
delete [] _buf;
|
||||
_buf = new char[ size ];
|
||||
_bufsize = size;
|
||||
}
|
||||
MPI_Recv( _buf, size, MPI_CHAR, src, tag, MPI_COMM_WORLD, &stat );
|
||||
str.assign( _buf );
|
||||
}
|
||||
|
||||
/*
|
||||
* SEND / RECV Objects
|
||||
*/
|
||||
void communicator::send( int dest, int tag, const eoserial::Persistent & persistent )
|
||||
{
|
||||
eoserial::Object* obj = persistent.pack();
|
||||
std::stringstream ss;
|
||||
obj->print( ss );
|
||||
delete obj;
|
||||
send( dest, tag, ss.str() );
|
||||
}
|
||||
|
||||
void communicator::recv( int src, int tag, eoserial::Persistent & persistent )
|
||||
{
|
||||
std::string asText;
|
||||
recv( src, tag, asText );
|
||||
eoserial::Object* obj = eoserial::Parser::parse( asText );
|
||||
persistent.unpack( obj );
|
||||
delete obj;
|
||||
}
|
||||
|
||||
/*
|
||||
* Other methods
|
||||
*/
|
||||
status communicator::probe( int src, int tag )
|
||||
{
|
||||
MPI_Status stat;
|
||||
MPI_Probe( src, tag, MPI_COMM_WORLD , &stat );
|
||||
return status( stat );
|
||||
}
|
||||
|
||||
void communicator::barrier()
|
||||
{
|
||||
MPI_Barrier( MPI_COMM_WORLD );
|
||||
}
|
||||
|
||||
void broadcast( communicator & comm, int value, int root )
|
||||
{
|
||||
MPI_Bcast( &value, 1, MPI_INT, root, MPI_COMM_WORLD );
|
||||
}
|
||||
}
|
||||
322
eompi/src/implMpi.h
Normal file
322
eompi/src/implMpi.h
Normal file
|
|
@ -0,0 +1,322 @@
|
|||
/*
|
||||
(c) Thales group, 2012
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation;
|
||||
version 2 of the License.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
Contact: http://eodev.sourceforge.net
|
||||
|
||||
Authors:
|
||||
Benjamin Bouvier <benjamin.bouvier@gmail.com>
|
||||
*/
|
||||
# ifndef __EO_IMPL_MPI_HPP__
|
||||
# define __EO_IMPL_MPI_HPP__
|
||||
|
||||
# include <mpi.h>
|
||||
# include <serial/eoSerial.h>
|
||||
|
||||
/**
|
||||
* This namespace contains reimplementations of some parts of the Boost::MPI API in C++, so as to be used in
|
||||
* EO without any dependance to Boost. Historically, EO's parallelization module used the
|
||||
* boost library to add a layer over MPI. After having noticed that just some functions
|
||||
* were really used, we decided to reimplement our own C++-like implementation of MPI.
|
||||
*
|
||||
* Because the Boost::MPI API is really clean, we reused it in this module. However, all
|
||||
* the functions of Boost::MPI were not used, hence a subset of the API is reused. For
|
||||
* instance, users can just send integer, std::string or eoserial::Persistent objects;
|
||||
* furthermore, only eoserial::Persistent objects can sent in a table.
|
||||
*
|
||||
* The documentation of the functions is exactly the same as the official Boost::MPI
|
||||
* documentation. You can find it on www.boost.org/doc/libs/1_49_0/doc/html/mpi/
|
||||
* The entities are here shortly described, if you need further details, don't hesitate
|
||||
* to visit the boost URL.
|
||||
*/
|
||||
|
||||
namespace mpi
|
||||
{
|
||||
/**
|
||||
* @ingroup Parallel
|
||||
* @{
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Constant indicating that a message can come from any process.
|
||||
*/
|
||||
extern const int any_source;
|
||||
|
||||
/**
|
||||
* @brief Constant indicating that a message can come from any tag (channel).
|
||||
*/
|
||||
extern const int any_tag;
|
||||
|
||||
/**
|
||||
* @brief Wrapper class to have a MPI environment.
|
||||
*
|
||||
* Instead of calling MPI_Init and MPI_Finalize, it is only necessary to instantiate
|
||||
* this class once, in the global context.
|
||||
*/
|
||||
class environment
|
||||
{
|
||||
public:
|
||||
|
||||
/**
|
||||
* @brief Inits MPI context.
|
||||
*
|
||||
* @param argc Number of params in command line (same as one in main)
|
||||
* @param argv Strings containing params (same as one in main)
|
||||
*/
|
||||
environment(int argc, char**argv);
|
||||
|
||||
/**
|
||||
* @brief Closes MPI context.
|
||||
*/
|
||||
~environment();
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Wrapper class for MPI_Status
|
||||
*
|
||||
* Consists only in a C++ wrapper class, giving getters on status attributes.
|
||||
*/
|
||||
class status
|
||||
{
|
||||
public:
|
||||
|
||||
/**
|
||||
* @brief Converts a MPI_Status into a status.
|
||||
*/
|
||||
status( const MPI_Status & s );
|
||||
|
||||
/**
|
||||
* @brief Returns the tag of the associated communication.
|
||||
*/
|
||||
int tag() { return _tag; }
|
||||
|
||||
/**
|
||||
* @brief Indicates which error number we obtained in the associated communication.
|
||||
*/
|
||||
int error() { return _error; }
|
||||
|
||||
/**
|
||||
* @brief Returns the MPI rank of the source of the associated communication.
|
||||
*/
|
||||
int source() { return _source; }
|
||||
|
||||
private:
|
||||
int _source;
|
||||
int _tag;
|
||||
int _error;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Main object, used to send / receive messages, get informations about the rank and the size of the world,
|
||||
* etc.
|
||||
*/
|
||||
class communicator
|
||||
{
|
||||
public:
|
||||
|
||||
/**
|
||||
* Creates the communicator, using the whole world as a MPI_Comm.
|
||||
*
|
||||
* @todo Allow the user to precise which MPI_Comm to use
|
||||
*/
|
||||
communicator( );
|
||||
|
||||
~communicator();
|
||||
|
||||
/**
|
||||
* @brief Returns the MPI rank of the current process.
|
||||
*/
|
||||
int rank();
|
||||
|
||||
/**
|
||||
* @brief Returns the size of the MPI cluster.
|
||||
*/
|
||||
int size();
|
||||
|
||||
/*
|
||||
* SEND / RECV INT
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Sends an integer to dest on channel "tag".
|
||||
*
|
||||
* @param dest MPI rank of the receiver
|
||||
* @param tag MPI tag of message
|
||||
* @param n The integer to send
|
||||
*/
|
||||
void send( int dest, int tag, int n );
|
||||
|
||||
/*
|
||||
* @brief Receives an integer from src on channel "tag".
|
||||
*
|
||||
* @param src MPI rank of the sender
|
||||
* @param tag MPI tag of message
|
||||
* @param n Where to save the received integer
|
||||
*/
|
||||
void recv( int src, int tag, int& n );
|
||||
|
||||
/*
|
||||
* SEND / RECV STRING
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Sends a string to dest on channel "tag".
|
||||
*
|
||||
* @param dest MPI rank of the receiver
|
||||
* @param tag MPI tag of message
|
||||
* @param str The std::string to send
|
||||
*/
|
||||
void send( int dest, int tag, const std::string& str );
|
||||
|
||||
/*
|
||||
* @brief Receives a string from src on channel "tag".
|
||||
*
|
||||
* @param src MPI rank of the sender
|
||||
* @param tag MPI tag of message
|
||||
* @param std::string Where to save the received string
|
||||
*/
|
||||
void recv( int src, int tag, std::string& str );
|
||||
|
||||
/*
|
||||
* SEND / RECV Objects
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Sends an eoserial::Persistent to dest on channel "tag".
|
||||
*
|
||||
* @param dest MPI rank of the receiver
|
||||
* @param tag MPI tag of message
|
||||
* @param persistent The object to send (it must absolutely implement eoserial::Persistent)
|
||||
*/
|
||||
void send( int dest, int tag, const eoserial::Persistent & persistent );
|
||||
|
||||
/**
|
||||
* @brief Sends an array of eoserial::Persistent to dest on channel "tag".
|
||||
*
|
||||
* @param dest MPI rank of the receiver
|
||||
* @param tag MPI tag of message
|
||||
* @param table The array of eoserial::Persistent objects
|
||||
* @param size The number of elements to send (no check is done, the user has to be sure that the size won't
|
||||
* overflow!)
|
||||
*/
|
||||
template< class T >
|
||||
void send( int dest, int tag, T* table, int size )
|
||||
{
|
||||
// Puts all the values into an array
|
||||
eoserial::Array* array = new eoserial::Array;
|
||||
|
||||
for( int i = 0; i < size; ++i )
|
||||
{
|
||||
array->push_back( table[i].pack() );
|
||||
}
|
||||
|
||||
// Encapsulates the array into an object
|
||||
eoserial::Object* obj = new eoserial::Object;
|
||||
obj->add( "array", array );
|
||||
std::stringstream ss;
|
||||
obj->print( ss );
|
||||
delete obj;
|
||||
|
||||
// Sends the object as a string
|
||||
send( dest, tag, ss.str() );
|
||||
}
|
||||
|
||||
/*
|
||||
* @brief Receives an eoserial::Persistent object from src on channel "tag".
|
||||
*
|
||||
* @param src MPI rank of the sender
|
||||
* @param tag MPI tag of message
|
||||
* @param persistent Where to unpack the serialized object?
|
||||
*/
|
||||
void recv( int src, int tag, eoserial::Persistent & persistent );
|
||||
|
||||
/*
|
||||
* @brief Receives an array of eoserial::Persistent from src on channel "tag".
|
||||
*
|
||||
* @param src MPI rank of the sender
|
||||
* @param tag MPI tag of message
|
||||
* @param table The table in which we're saving the received objects. It must have been allocated by the user,
|
||||
* as no allocation is performed here.
|
||||
* @param size The number of elements to receive (no check is done, the user has to be sure that the size won't
|
||||
* overflow!)
|
||||
*/
|
||||
template< class T >
|
||||
void recv( int src, int tag, T* table, int size )
|
||||
{
|
||||
// Receives the string which contains the object
|
||||
std::string asText;
|
||||
recv( src, tag, asText );
|
||||
|
||||
// Parses the object and retrieves the table
|
||||
eoserial::Object* obj = eoserial::Parser::parse( asText );
|
||||
eoserial::Array* array = static_cast<eoserial::Array*>( (*obj)["array"] );
|
||||
|
||||
// Retrieves all the values from the array
|
||||
for( int i = 0; i < size; ++i )
|
||||
{
|
||||
eoserial::unpackObject( *array, i, table[i] );
|
||||
}
|
||||
delete obj;
|
||||
}
|
||||
|
||||
/*
|
||||
* Other methods
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Wrapper for MPI_Probe
|
||||
*
|
||||
* Waits for a message to come from process having rank src, on the channel
|
||||
* tag.
|
||||
*
|
||||
* @param src MPI rank of the sender (any_source if it can be any sender)
|
||||
* @param tag MPI tag of the expected message (any_tag if it can be any tag)
|
||||
*/
|
||||
status probe( int src = any_source, int tag = any_tag );
|
||||
|
||||
/**
|
||||
* @brief Wrapper for MPI_Barrier
|
||||
*
|
||||
*
|
||||
*/
|
||||
void barrier();
|
||||
|
||||
private:
|
||||
int _rank;
|
||||
int _size;
|
||||
|
||||
char* _buf; // temporary buffer for sending and receiving strings. Avoids reallocations
|
||||
int _bufsize; // size of the above temporary buffer
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Wrapper for MPI_Bcast
|
||||
*
|
||||
* Broadcasts an integer value on the communicator comm, from the process having the MPI rank root.
|
||||
*
|
||||
* @param comm The communicator on which to broadcast
|
||||
* @param value The integer value to send
|
||||
* @param root The MPI rank of the broadcaster
|
||||
*
|
||||
* @todo Actually comm isn't used and broadcast is performed on the whole MPI_COMM_WORLD. TODO: Use comm instead
|
||||
*/
|
||||
void broadcast( communicator & comm, int value, int root );
|
||||
|
||||
/**
|
||||
* @}
|
||||
*/
|
||||
} // namespace mpi
|
||||
|
||||
# endif //__EO_IMPL_MPI_HPP__
|
||||
Loading…
Add table
Add a link
Reference in a new issue