Renamed new meta model branch

git-svn-id: svn://scm.gforge.inria.fr/svnroot/paradiseo@609 331e1502-861f-0410-8da2-ba01fb791d7f
This commit is contained in:
legrand 2007-09-20 14:33:54 +00:00
commit a0f7039b27
413 changed files with 31937 additions and 0 deletions

View file

@ -0,0 +1,9 @@
######################################################################################
### 1) Where must cmake go now ?
######################################################################################
SUBDIRS(mpi)
######################################################################################

View file

@ -0,0 +1,60 @@
######################################################################################
### 0) Set the compiler
######################################################################################
SET (CMAKE_CXX_COMPILER mpicxx)
######################################################################################
######################################################################################
### 1) Include the sources
######################################################################################
INCLUDE_DIRECTORIES(${EO_SRC_DIR})
INCLUDE_DIRECTORIES(${XML2_CFLAGS_WITH_WHITESPACE})
######################################################################################
######################################################################################
### 2) Define your target(s): just the core library here
######################################################################################
SET(RMC_MPI_LIB_OUTPUT_PATH ${ParadisEO-PEO_BINARY_DIR}/lib)
SET(LIBRARY_OUTPUT_PATH ${RMC_MPI_LIB_OUTPUT_PATH})
SET (RMC_MPI_SOURCES node.cpp
param.cpp
comm.cpp
coop.cpp
mess.cpp
rmc.cpp
scheduler.cpp
worker.cpp
send.cpp
recv.cpp
xml_parser.cpp
schema.cpp
runner.cpp
service.cpp)
ADD_LIBRARY(rmc_mpi STATIC ${RMC_MPI_SOURCES})
ADD_DEPENDENCIES(rmc_mpi peo)
######################################################################################
######################################################################################
### 3) Optionnal: define your lib version:
######################################################################################
SET(RMC_MPI_VERSION "1.0.beta")
SET_TARGET_PROPERTIES(rmc_mpi PROPERTIES VERSION "${RMC_MPI_VERSION}")
######################################################################################

View file

@ -0,0 +1,67 @@
// "comm.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include <mpi.h>
#include "comm.h"
#include "mess.h"
#include "node.h"
#include "param.h"
#include "../../core/peo_debug.h"
#include "../../core/runner.h"
#include "send.h"
#include "recv.h"
#include "scheduler.h"
static sem_t sem_comm_init;
static Communicator * the_thread;
Communicator :: Communicator (int * __argc, char * * * __argv) {
the_thread = this;
initNode (__argc, __argv);
loadRMCParameters (* __argc, * __argv);
sem_post (& sem_comm_init);
}
void Communicator :: start () {
while (true) {
/* Zzz Zzz Zzz :-))) */
sleep ();
sendMessages ();
if (! atLeastOneActiveRunner ())
break;
receiveMessages ();
}
waitBuffers ();
printDebugMessage ("finalizing");
MPI_Finalize ();
}
void initCommunication () {
sem_init (& sem_comm_init, 0, 0);
}
void waitNodeInitialization () {
sem_wait (& sem_comm_init);
}
void wakeUpCommunicator () {
the_thread -> wakeUp ();
}

View file

@ -0,0 +1,31 @@
// "comm.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __comm_mpi_h
#define __comm_mpi_h
#include "../../core/communicable.h"
#include "../../core/reac_thread.h"
class Communicator : public ReactiveThread {
public :
/* Ctor */
Communicator (int * __argc, char * * * __argv);
void start ();
};
extern void initCommunication ();
extern void waitNodeInitialization ();
extern void wakeUpCommunicator ();
#endif

View file

@ -0,0 +1,42 @@
// "coop.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include "../../core/cooperative.h"
#include "send.h"
#include "tags.h"
#include "schema.h"
#include "mess.h"
#include "../../core/peo_debug.h"
Runner * Cooperative :: getOwner () {
return owner;
}
void Cooperative :: setOwner (Runner & __runner) {
owner = & __runner;
}
void Cooperative :: send (Cooperative * __coop) {
:: send (this, getRankOfRunner (__coop -> getOwner () -> getID ()), COOP_TAG);
// stop ();
}
Cooperative * getCooperative (COOP_ID __key) {
return dynamic_cast <Cooperative *> (getCommunicable (__key));
}
void Cooperative :: notifySending () {
//getOwner -> setPassive ();
// resume ();
// printDebugMessage (b);
}

View file

@ -0,0 +1,237 @@
// "mess.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include <mpi.h>
#include <vector>
#include "mess.h"
#include "../../core/peo_debug.h"
#include "node.h"
#define MPI_BUF_SIZE 1024*64
static char mpi_buf [MPI_BUF_SIZE];
static int pos_buf ;
static std :: vector <char *> act_buf; /* Active buffers */
static std :: vector <MPI_Request *> act_req; /* Active requests */
void cleanBuffers () {
for (unsigned i = 0; i < act_req.size ();) {
MPI_Status stat ;
int flag ;
MPI_Test (act_req [i], & flag, & stat) ;
if (flag) {
delete act_buf [i] ;
delete act_req [i] ;
act_buf [i] = act_buf.back () ;
act_buf.pop_back () ;
act_req [i] = act_req.back () ;
act_req.pop_back () ;
}
else
i ++;
}
}
void waitBuffers () {
printDebugMessage ("waiting the termination of the asynchronous operations to complete");
for (unsigned i = 0; i < act_req.size (); i ++) {
MPI_Status stat ;
MPI_Wait (act_req [i], & stat) ;
delete act_buf [i] ;
delete act_req [i] ;
}
}
bool probeMessage (int & __src, int & __tag) {
int flag;
MPI_Status stat;
MPI_Iprobe (MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, & flag, & stat);
__src = stat.MPI_SOURCE;
__tag = stat.MPI_TAG;
return flag;
}
void waitMessage () {
MPI_Status stat;
MPI_Probe (MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, & stat);
}
void initMessage () {
pos_buf = 0;
}
void sendMessage (int __to, int __tag) {
cleanBuffers ();
act_buf.push_back (new char [pos_buf]);
act_req.push_back (new MPI_Request);
memcpy (act_buf.back (), mpi_buf, pos_buf);
MPI_Isend (act_buf.back (), pos_buf, MPI_PACKED, __to, __tag, MPI_COMM_WORLD, act_req.back ());
}
void sendMessageToAll (int __tag) {
for (int i = 0; i < getNumberOfNodes (); i ++)
sendMessage (i, __tag);
}
void receiveMessage (int __from, int __tag) {
MPI_Status stat;
MPI_Request req;
MPI_Irecv (mpi_buf, MPI_BUF_SIZE, MPI_PACKED, __from, __tag, MPI_COMM_WORLD, & req) ;
MPI_Wait (& req, & stat) ;
}
/* Char */
void pack (const char & __c) {
MPI_Pack ((void *) & __c, 1, MPI_CHAR, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Float */
void pack (const float & __f, int __nitem) {
MPI_Pack ((void *) & __f, __nitem, MPI_FLOAT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Double */
void pack (const double & __d, int __nitem) {
MPI_Pack ((void *) & __d, __nitem, MPI_DOUBLE, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Integer */
void pack (const int & __i, int __nitem) {
MPI_Pack ((void *) & __i, __nitem, MPI_INT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Unsigned int. */
void pack (const unsigned int & __ui, int __nitem) {
MPI_Pack ((void *) & __ui, __nitem, MPI_UNSIGNED, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Short int. */
void pack (const short & __sh, int __nitem) {
MPI_Pack ((void *) & __sh, __nitem, MPI_SHORT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Unsigned short */
void pack (const unsigned short & __ush, int __nitem) {
MPI_Pack ((void *) & __ush, __nitem, MPI_UNSIGNED_SHORT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Long */
void pack (const long & __l, int __nitem) {
MPI_Pack ((void *) & __l, __nitem, MPI_LONG, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Unsigned long */
void pack (const unsigned long & __ul, int __nitem) {
MPI_Pack ((void *) & __ul, __nitem, MPI_UNSIGNED_LONG, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* String */
void pack (const char * __str) {
int len = strlen (__str) + 1;
MPI_Pack (& len, 1, MPI_INT, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
MPI_Pack ((void *) __str, len, MPI_CHAR, mpi_buf, MPI_BUF_SIZE, & pos_buf, MPI_COMM_WORLD);
}
/* Char */
void unpack (char & __c) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __c, 1, MPI_CHAR, MPI_COMM_WORLD);
}
/* Float */
void unpack (float & __f, int __nitem) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __f, __nitem, MPI_FLOAT, MPI_COMM_WORLD);
}
/* Double */
void unpack (double & __d, int __nitem) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __d, __nitem, MPI_DOUBLE, MPI_COMM_WORLD);
}
/* Integer */
void unpack (int & __i, int __nitem) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __i, __nitem, MPI_INT, MPI_COMM_WORLD);
}
/* Unsigned int. */
void unpack (unsigned int & __ui, int __nitem) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __ui, __nitem, MPI_UNSIGNED, MPI_COMM_WORLD);
}
/* Short int. */
void unpack (short & __sh, int __nitem) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __sh, __nitem, MPI_SHORT, MPI_COMM_WORLD);
}
/* Unsigned short */
void unpack (unsigned short & __ush, int __nitem) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __ush, __nitem, MPI_UNSIGNED_SHORT, MPI_COMM_WORLD);
}
/* Long */
void unpack (long & __l, int __nitem) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __l, __nitem, MPI_LONG, MPI_COMM_WORLD);
}
/* Unsigned long */
void unpack (unsigned long & __ul, int __nitem) {
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & __ul, __nitem, MPI_UNSIGNED_LONG, MPI_COMM_WORLD);
}
/* String */
void unpack (char * __str) {
int len;
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, & len, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack (mpi_buf, MPI_BUF_SIZE, & pos_buf, __str, len, MPI_CHAR, MPI_COMM_WORLD);
}

View file

@ -0,0 +1,31 @@
// "mess.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __mess_rmc_h
#define __mess_rmc_h
#include "../../core/messaging.h"
extern void initMessage ();
extern void sendMessage (int __to, int __tag);
extern void sendMessageToAll (int __tag);
extern void receiveMessage (int __from, int __tag);
extern void cleanBuffers ();
extern void waitBuffers ();
extern bool probeMessage (int & __src, int & __tag);
extern void waitMessage ();
#endif

View file

@ -0,0 +1,58 @@
// "node.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include <mpi.h>
#include <vector>
#include <map>
#include <string>
#include <cassert>
static int rk, sz; /* Rank & size */
static std :: map <std :: string, int> name_to_rk;
static std :: vector <std :: string> rk_to_name;
int getNodeRank () {
return rk;
}
int getNumberOfNodes () {
return sz;
}
int getRankFromName (const std :: string & __name) {
return atoi (__name.c_str ());
}
void initNode (int * __argc, char * * * __argv) {
int provided;
MPI_Init_thread (__argc, __argv, MPI_THREAD_FUNNELED, & provided);
assert (provided == MPI_THREAD_FUNNELED); /* The MPI implementation must be multi-threaded.
Yet, only one thread performs the comm.
operations */
MPI_Comm_rank (MPI_COMM_WORLD, & rk); /* Who ? */
MPI_Comm_size (MPI_COMM_WORLD, & sz); /* How many ? */
char names [sz] [MPI_MAX_PROCESSOR_NAME];
int len;
/* Processor names */
MPI_Get_processor_name (names [0], & len); /* Me */
MPI_Allgather (names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, MPI_COMM_WORLD); /* Broadcast */
for (int i = 0; i < sz; i ++) {
rk_to_name.push_back (names [i]);
name_to_rk [names [i]] = i;
}
}

View file

@ -0,0 +1,24 @@
// "node.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __node_h
#define __node_h
#include <string>
#include <cassert>
extern int getNodeRank (); /* It gives the rank of the calling process */
extern int getNumberOfNodes (); /* It gives the size of the environment (Total number of nodes) */
extern int getRankFromName (const std :: string & __name); /* It gives the rank of the process
expressed by its name */
extern void initNode (int * __argc, char * * * __argv);
#endif

View file

@ -0,0 +1,21 @@
// "param.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include <utils/eoParser.h>
#include "schema.h"
void loadRMCParameters (int & __argc, char * * & __argv) {
eoParser parser (__argc, __argv);
/* Schema */
eoValueParam <std :: string> schema_param ("schema.xml", "schema", "?");
parser.processParam (schema_param);
loadSchema (schema_param.value ().c_str ());
}

View file

@ -0,0 +1,14 @@
// "param.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __rmc_param_h
#define __rmc_param_h
extern void loadRMCParameters (int & __argc, char * * & __argv);
#endif

View file

@ -0,0 +1,112 @@
// "recv.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include "comm.h"
#include "tags.h"
#include "worker.h"
#include "scheduler.h"
#include "mess.h"
#include "node.h"
#include "../../core/runner.h"
#include "../../core/cooperative.h"
#include "../../core/peo_debug.h"
void receiveMessages () {
cleanBuffers ();
do {
if (! atLeastOneActiveThread ()) {
// printDebugMessage ("debut wait");
waitMessage ();
//printDebugMessage ("fin wait");
}
int src, tag;
while (probeMessage (src, tag)) {
receiveMessage (src, tag);
initMessage ();
/*
char b [1000];
sprintf (b, "traitement recv %d\n", tag);
printDebugMessage (b);
*/
switch (tag) {
case RUNNER_STOP_TAG:
unpackTerminationOfRunner ();
wakeUpCommunicator ();
break;
case COOP_TAG:
// printDebugMessage ("reception de message de cooperation");
COOP_ID coop_id;
unpack (coop_id);
getCooperative (coop_id) -> unpack ();
break;
case SCHED_REQUEST_TAG:
unpackResourceRequest ();
break;
case SCHED_RESULT_TAG:
{
/* Unpacking the resource */
SERVICE_ID serv_id;
unpack (serv_id);
Service * serv = getService (serv_id);
int dest;
unpack (dest);
WORKER_ID worker_id;
unpack (worker_id);
/* Going back ... */
initMessage ();
pack (worker_id);
pack (serv_id);
serv -> packData ();
serv -> notifySendingData ();
sendMessage (dest, TASK_DATA_TAG);
break;
}
case TASK_DATA_TAG:
{
WORKER_ID worker_id;
unpack (worker_id);
Worker * worker = getWorker (worker_id);
worker -> setSource (src);
worker -> unpackData ();
worker -> wakeUp ();
break;
}
case TASK_RESULT_TAG:
{
SERVICE_ID serv_id;
unpack (serv_id);
Service * serv = getService (serv_id);
serv -> unpackResult ();
break;
}
case TASK_DONE_TAG:
unpackTaskDone ();
break;
default:
;
};
}
} while (! atLeastOneActiveThread () && atLeastOneActiveRunner () /*&& ! allResourcesFree ()*/);
}

View file

@ -0,0 +1,14 @@
// "recv.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __recv_h
#define __recv_h
extern void receiveMessages ();
#endif

View file

@ -0,0 +1,47 @@
// "rmc.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include "send.h"
#include "worker.h"
#include "schema.h"
#include "comm.h"
#include "scheduler.h"
#include "../../core/peo_debug.h"
static std :: vector <pthread_t *> ll_threads; /* Low level threads */
void runRMC () {
/* Worker(s) ? */
for (unsigned i = 0; i < my_node -> num_workers; i ++)
addThread (new Worker, ll_threads);
wakeUpCommunicator ();
}
void initRMC (int & __argc, char * * & __argv) {
/* Communication */
initCommunication ();
addThread (new Communicator (& __argc, & __argv), ll_threads);
waitNodeInitialization ();
initSending ();
/* Scheduler */
if (isScheduleNode ())
initScheduler ();
///
}
void finalizeRMC () {
printDebugMessage ("before join threads RMC");
joinThreads (ll_threads);
printDebugMessage ("after join threads RMC");
}

View file

@ -0,0 +1,33 @@
// "runner.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include "../../core/messaging.h"
#include "../../core/runner.h"
#include "node.h"
#include "send.h"
#include "tags.h"
#include "schema.h"
bool Runner :: isLocal () {
for (unsigned i = 0; i < my_node -> id_run.size (); i ++)
if (my_node -> id_run [i] == id)
return true;
return false;
}
void Runner :: packTermination () {
pack (id);
}
void Runner :: terminate () {
sendToAll (this, RUNNER_STOP_TAG);
}

View file

@ -0,0 +1,78 @@
// "sched_thread.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include <queue>
#include "scheduler.h"
#include "tags.h"
#include "mess.h"
#include "../../core/peo_debug.h"
static std :: queue <SCHED_RESOURCE> resources; /* Free resources */
static std :: queue <SCHED_REQUEST> requests; /* Requests */
static unsigned initNumberOfRes = 0;
void initScheduler () {
for (unsigned i = 0; i < the_schema.size (); i ++) {
const Node & node = the_schema [i];
if (node.rk_sched == my_node -> rk)
for (unsigned j = 0; j < node.num_workers; j ++)
resources.push (std :: pair <RANK_ID, WORKER_ID> (i, j + 1));
}
initNumberOfRes = resources.size ();
}
bool allResourcesFree () {
return resources.size () == initNumberOfRes;
}
static void update () {
unsigned num_alloc = std :: min (resources.size (), requests.size ());
for (unsigned i = 0; i < num_alloc; i ++) {
SCHED_REQUEST req = requests.front ();
requests.pop ();
SCHED_RESOURCE res = resources.front ();
resources.pop ();
printDebugMessage ("allocating a resource.");
initMessage ();
pack (req.second);
pack (res);
sendMessage (req.first, SCHED_RESULT_TAG);
}
}
void unpackResourceRequest () {
printDebugMessage ("queuing a resource request.");
SCHED_REQUEST req;
unpack (req);
requests.push (req);
update ();
}
void unpackTaskDone () {
printDebugMessage ("I'm notified a worker is now idle.");
SCHED_RESOURCE res;
unpack (res);
resources.push (res);
if (resources.size () == initNumberOfRes)
printDebugMessage ("all the resources are now free.");
update ();
}

View file

@ -0,0 +1,32 @@
// "scheduler.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __scheduler_h
#define __scheduler_h
#include <utility>
#include "schema.h"
#include "worker.h"
typedef std :: pair <RANK_ID, WORKER_ID> SCHED_RESOURCE;
typedef std :: pair <RANK_ID, SERVICE_ID> SCHED_REQUEST;
/* Initializing the list of available workers */
extern void initScheduler ();
/* Processing a resource request from a service */
extern void unpackResourceRequest ();
/* Being known a worker is now idle :-) */
extern void unpackTaskDone ();
extern bool allResourcesFree ();
#endif

View file

@ -0,0 +1,135 @@
// "schema.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include <iostream>
#include <assert.h>
#include "schema.h"
#include "xml_parser.h"
#include "comm.h"
#include "node.h"
#include "../../core/peo_debug.h"
std :: vector <Node> the_schema;
Node * my_node;
RANK_ID getRankOfRunner (RUNNER_ID __key) {
for (unsigned i = 0; i < the_schema.size (); i ++)
for (unsigned j = 0; j < the_schema [i].id_run.size (); j ++)
if (the_schema [i].id_run [j] == __key)
return the_schema [i].rk;
assert (false);
return 0;
}
static void loadNode (int __rk_sched) {
Node node;
node.rk_sched = __rk_sched;
/* ATT: name*/
node.rk = getRankFromName (getAttributeValue ("name"));
/* ATT: num_workers */
node.num_workers = atoi (getAttributeValue ("num_workers").c_str ());
while (true) {
/* TAG: <runner> | </node> */
std :: string name = getNextNode ();
assert (name == "runner" || name == "node");
if (name == "runner") {
/* TAG: </node> */
node.id_run.push_back (atoi (getNextNode ().c_str ()));
/* TAG: </runner> */
assert (getNextNode () == "runner");
}
else {
/* TAG: </node> */
the_schema.push_back (node);
break;
}
}
}
static void loadGroup () {
std :: string name;
/* ATT: scheduler*/
int rk_sched = getRankFromName (getAttributeValue ("scheduler"));
while (true) {
/* TAG: <node> | </group> */
name = getNextNode ();
assert (name == "node" || name == "group");
if (name == "node")
/* TAG: <node> */
loadNode (rk_sched);
else
/* TAG: </group> */
break;
}
}
bool isScheduleNode () {
return my_node -> rk == my_node -> rk_sched;
}
void loadSchema (const char * __filename) {
openXMLDocument (__filename);
std :: string name;
/* TAG: <schema> */
name = getNextNode ();
assert (name == "schema");
while (true) {
/* TAG: <group> | </schema> */
name = getNextNode ();
assert (name == "group" || name == "schema");
if (name == "group")
/* TAG: <group> */
loadGroup ();
else
/* TAG: </schema> */
break;
}
/* Looking for my node */
for (unsigned i = 0; i < the_schema.size (); i ++)
if (the_schema [i].rk == getNodeRank ())
my_node = & (the_schema [i]);
/* About me */
char mess [1000];
sprintf (mess, "my rank is %d", my_node -> rk);
printDebugMessage (mess);
if (isScheduleNode ())
printDebugMessage ("I'am a scheduler");
for (unsigned i = 0; i < my_node -> id_run.size (); i ++) {
sprintf (mess, "I manage the runner %d", my_node -> id_run [i]);
printDebugMessage (mess);
}
if (my_node -> num_workers) {
sprintf (mess, "I manage %d worker(s)", my_node -> num_workers);
printDebugMessage (mess);
}
closeXMLDocument ();
}

View file

@ -0,0 +1,39 @@
// "schema.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __schema_h
#define __schema_h
#include <string>
#include <vector>
#include <cassert>
#include "../../core/runner.h"
typedef int RANK_ID;
struct Node {
RANK_ID rk; /* Rank */
std :: string name; /* Host name */
unsigned num_workers; /* Number of parallel workers */
int rk_sched; /* rank of the scheduler */
std :: vector <RUNNER_ID> id_run; /* List of runners */
};
extern std :: vector <Node> the_schema;
extern Node * my_node;
extern void loadSchema (const char * __filename);
extern RANK_ID getRankOfRunner (RUNNER_ID __key);
extern bool isScheduleNode ();
#endif

View file

@ -0,0 +1,116 @@
// "send.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include <mpi.h>
#include <semaphore.h>
#include <queue>
#include "tags.h"
#include "comm.h"
#include "worker.h"
#include "scheduler.h"
#include "mess.h"
#include "node.h"
#include "../../core/cooperative.h"
#include "../../core/peo_debug.h"
#define TO_ALL -1
typedef struct {
Communicable * comm;
int to;
int tag;
} SEND_REQUEST;
static std :: queue <SEND_REQUEST> mess;
static sem_t sem_send;
void initSending () {
sem_init (& sem_send, 0, 1);
}
void send (Communicable * __comm, int __to, int __tag) {
SEND_REQUEST req;
req.comm = __comm;
req.to = __to;
req.tag = __tag;
sem_wait (& sem_send);
mess.push (req);
sem_post (& sem_send);
wakeUpCommunicator ();
}
void sendToAll (Communicable * __comm, int __tag) {
send (__comm, TO_ALL, __tag);
}
void sendMessages () {
sem_wait (& sem_send);
while (! mess.empty ()) {
SEND_REQUEST req = mess.front ();
/*
char b [1000];
sprintf (b, "traitement send %d\n", req.tag);
printDebugMessage (b);
*/
Communicable * comm = req.comm;
initMessage ();
switch (req.tag) {
case RUNNER_STOP_TAG:
dynamic_cast <Runner *> (comm) -> packTermination ();
dynamic_cast <Runner *> (comm) -> notifySendingTermination ();
break;
case COOP_TAG:
dynamic_cast <Cooperative *> (comm) -> pack ();
dynamic_cast <Cooperative *> (comm) -> notifySending ();
break;
case SCHED_REQUEST_TAG:
dynamic_cast <Service *> (comm) -> packResourceRequest ();
dynamic_cast <Service *> (comm) -> notifySendingResourceRequest ();
break;
case TASK_RESULT_TAG:
dynamic_cast <Worker *> (comm) -> packResult ();
dynamic_cast <Worker *> (comm) -> notifySendingResult ();
break;
case TASK_DONE_TAG:
dynamic_cast <Worker *> (comm) -> packTaskDone ();
dynamic_cast <Worker *> (comm) -> notifySendingTaskDone ();
break;
default :
break;
};
if (req.to == TO_ALL)
sendMessageToAll (req.tag);
else
sendMessage (req.to, req.tag);
mess.pop ();
}
sem_post (& sem_send);
}

View file

@ -0,0 +1,22 @@
// "send.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __send_h
#define __send_h
#include "../../core/communicable.h"
extern void initSending ();
extern void send (Communicable * __comm, int __to, int __tag);
extern void sendToAll (Communicable * __comm, int __tag);
extern void sendMessages ();
#endif

View file

@ -0,0 +1,30 @@
// "service.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include "../../core/service.h"
#include "../../core/messaging.h"
#include "node.h"
#include "tags.h"
#include "send.h"
#include "scheduler.h"
void Service :: requestResourceRequest (unsigned __how_many) {
num_sent_rr = __how_many;
for (unsigned i = 0; i < __how_many; i ++)
send (this, my_node -> rk_sched, SCHED_REQUEST_TAG);
}
void Service :: packResourceRequest () {
SCHED_REQUEST req;
req.first = getNodeRank ();
req.second = getKey ();
// printf ("demande de ressource pour %d\n", req.second);
:: pack (req);
}

View file

@ -0,0 +1,24 @@
// "tags.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __tags_h
#define __tags_h
#define RUNNER_STOP_TAG 13
#define COOP_TAG 14
#define SCHED_REQUEST_TAG 16
#define SCHED_RESULT_TAG 17
#define TASK_DATA_TAG 18
#define TASK_RESULT_TAG 19
#define TASK_DONE_TAG 20
#endif

View file

@ -0,0 +1,93 @@
// "worker.cpp"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include <vector>
#include "tags.h"
#include "send.h"
#include "node.h"
#include "schema.h"
#include "worker.h"
#include "mess.h"
#include "../../core/peo_debug.h"
static std :: vector <Worker *> key_to_worker (1); /* Vector of registered workers */
Worker * getWorker (WORKER_ID __key) {
return key_to_worker [__key];
}
Worker :: Worker () {
toto = false;
id = key_to_worker.size ();
key_to_worker.push_back (this);
}
void Worker :: packResult () {
pack (serv_id);
serv -> packResult ();
}
void Worker :: unpackData () {
printDebugMessage ("unpacking the ID. of the service.");
unpack (serv_id);
serv = getService (serv_id);
printDebugMessage ("found the service.");
serv -> unpackData ();
printDebugMessage ("unpacking the data.");
setActive ();
}
void Worker :: packTaskDone () {
pack (getNodeRank ());
pack (id);
}
void Worker :: notifySendingResult () {
/* Notifying the scheduler of the termination */
toto = true;
wakeUp ();
}
void Worker :: notifySendingTaskDone () {
setPassive ();
}
void Worker :: setSource (int __rank) {
src = __rank;
}
void Worker :: start () {
while (true) {
sleep ();
if (! atLeastOneActiveRunner ())
break;
if (toto) {
send (this, my_node -> rk_sched, TASK_DONE_TAG);
toto = false;
}
else {
printDebugMessage ("executing the task.");
serv -> execute ();
send (this, src, TASK_RESULT_TAG);
}
}
}

View file

@ -0,0 +1,50 @@
// "worker.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __worker_h
#define __worker_h
#include "../../core/communicable.h"
#include "../../core/reac_thread.h"
#include "../../core/service.h"
typedef unsigned WORKER_ID;
class Worker : public Communicable, public ReactiveThread {
public :
Worker ();
void start ();
void packResult ();
void unpackData ();
void packTaskDone ();
void notifySendingResult ();
void notifySendingTaskDone ();
void setSource (int __rank);
private :
WORKER_ID id;
SERVICE_ID serv_id;
Service * serv;
int src;
bool toto;
};
extern Worker * getWorker (WORKER_ID __key);
#endif

View file

@ -0,0 +1,75 @@
// "xml_parser.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#include <libxml/xmlreader.h>
#include "xml_parser.h"
static xmlTextReaderPtr reader;
void openXMLDocument (const char * __filename) {
reader = xmlNewTextReaderFilename (__filename);
if (! reader) {
fprintf (stderr, "unable to open '%s'.\n", __filename);
exit (1);
}
}
void closeXMLDocument () {
xmlFreeTextReader (reader);
}
std :: string getAttributeValue (const std :: string & __attr) {
xmlChar * value = xmlTextReaderGetAttribute (reader, (const xmlChar *) __attr.c_str ());
std :: string str ((const char *) value);
xmlFree (value);
return str;
}
static bool isSep (const xmlChar * __text) {
for (unsigned i = 0; i < strlen ((char *) __text); i ++)
if (__text [i] != ' ' && __text [i] != '\t' && __text [i] != '\n')
return false;
return true;
}
std :: string getNextNode () {
xmlChar * name, * value;
do {
xmlTextReaderRead (reader);
name = xmlTextReaderName (reader);
value = xmlTextReaderValue (reader);
// printf ("value = %s\n", value);
} while (! strcmp ((char *) name, "#text") && isSep (value));
std :: string str;
if (strcmp ((char *) name, "#text"))
str.assign ((char *) name);
else
str.assign ((char *) value);
if (name)
xmlFree (name);
if (value)
xmlFree (value);
return str;
}

View file

@ -0,0 +1,22 @@
// "xml_parser.h"
// (c) OPAC Team, LIFL, August 2005
/*
Contact: paradiseo-help@lists.gforge.inria.fr
*/
#ifndef __xml_parser_h
#define __xml_parser_h
#include <string>
extern void openXMLDocument (const char * __filename);
extern void closeXMLDocument ();
extern std :: string getAttributeValue (const std :: string & __attr);
extern std :: string getNextNode ();
#endif