00001
00002
00003
00004
00005
00006
00007
00008
00009 #include <mpi.h>
00010 #include <semaphore.h>
00011 #include <queue>
00012
00013 #include "tags.h"
00014 #include "comm.h"
00015 #include "worker.h"
00016 #include "scheduler.h"
00017 #include "mess.h"
00018 #include "node.h"
00019 #include "../../core/cooperative.h"
00020 #include "../../core/peo_debug.h"
00021
00022 #define TO_ALL -1
00023
00024 typedef struct {
00025
00026 Communicable * comm;
00027 int to;
00028 int tag;
00029
00030 } SEND_REQUEST;
00031
00032 static std :: queue <SEND_REQUEST> mess;
00033
00034 static sem_t sem_send;
00035
00036 void initSending () {
00037
00038 sem_init (& sem_send, 0, 1);
00039 }
00040
00041 void send (Communicable * __comm, int __to, int __tag) {
00042
00043 SEND_REQUEST req;
00044 req.comm = __comm;
00045 req.to = __to;
00046 req.tag = __tag;
00047
00048 sem_wait (& sem_send);
00049 mess.push (req);
00050 sem_post (& sem_send);
00051 wakeUpCommunicator ();
00052 }
00053
00054 void sendToAll (Communicable * __comm, int __tag) {
00055
00056 send (__comm, TO_ALL, __tag);
00057 }
00058
00059 void sendMessages () {
00060
00061 sem_wait (& sem_send);
00062
00063 while (! mess.empty ()) {
00064
00065 SEND_REQUEST req = mess.front ();
00066
00067
00068
00069
00070
00071
00072 Communicable * comm = req.comm;
00073
00074 initMessage ();
00075
00076 switch (req.tag) {
00077
00078 case RUNNER_STOP_TAG:
00079 dynamic_cast <Runner *> (comm) -> packTermination ();
00080 dynamic_cast <Runner *> (comm) -> notifySendingTermination ();
00081 break;
00082
00083 case COOP_TAG:
00084 dynamic_cast <Cooperative *> (comm) -> pack ();
00085 dynamic_cast <Cooperative *> (comm) -> notifySending ();
00086 break;
00087
00088 case SCHED_REQUEST_TAG:
00089 dynamic_cast <Service *> (comm) -> packResourceRequest ();
00090 dynamic_cast <Service *> (comm) -> notifySendingResourceRequest ();
00091 break;
00092
00093 case TASK_RESULT_TAG:
00094 dynamic_cast <Worker *> (comm) -> packResult ();
00095 dynamic_cast <Worker *> (comm) -> notifySendingResult ();
00096 break;
00097
00098 case TASK_DONE_TAG:
00099 dynamic_cast <Worker *> (comm) -> packTaskDone ();
00100 dynamic_cast <Worker *> (comm) -> notifySendingTaskDone ();
00101 break;
00102
00103 default :
00104 break;
00105
00106 };
00107
00108 if (req.to == TO_ALL)
00109 sendMessageToAll (req.tag);
00110 else
00111 sendMessage (req.to, req.tag);
00112 mess.pop ();
00113 }
00114
00115 sem_post (& sem_send);
00116 }