00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <mpi.h>
00025 #include <semaphore.h>
00026 #include <queue>
00027
00028 #include "tags.h"
00029 #include "comm.h"
00030 #include "worker.h"
00031 #include "scheduler.h"
00032 #include "mess.h"
00033 #include "node.h"
00034 #include "../../core/cooperative.h"
00035 #include "../../core/peo_debug.h"
00036
00037 #define TO_ALL -1
00038
00039 typedef struct {
00040
00041 Communicable * comm;
00042 int to;
00043 int tag;
00044
00045 } SEND_REQUEST;
00046
00047 static std :: queue <SEND_REQUEST> mess;
00048
00049 static sem_t sem_send;
00050
00051 void initSending () {
00052
00053 sem_init (& sem_send, 0, 1);
00054 }
00055
00056 void send (Communicable * __comm, int __to, int __tag) {
00057
00058 SEND_REQUEST req;
00059 req.comm = __comm;
00060 req.to = __to;
00061 req.tag = __tag;
00062
00063 sem_wait (& sem_send);
00064 mess.push (req);
00065 sem_post (& sem_send);
00066 wakeUpCommunicator ();
00067 }
00068
00069 void sendToAll (Communicable * __comm, int __tag) {
00070
00071 send (__comm, TO_ALL, __tag);
00072 }
00073
00074 void sendMessages () {
00075
00076 sem_wait (& sem_send);
00077
00078 while (! mess.empty ()) {
00079
00080 SEND_REQUEST req = mess.front ();
00081
00082
00083
00084
00085
00086
00087 Communicable * comm = req.comm;
00088
00089 initMessage ();
00090
00091 switch (req.tag) {
00092
00093 case RUNNER_STOP_TAG:
00094 dynamic_cast <Runner *> (comm) -> packTermination ();
00095 dynamic_cast <Runner *> (comm) -> notifySendingTermination ();
00096 break;
00097
00098 case COOP_TAG:
00099 dynamic_cast <Cooperative *> (comm) -> pack ();
00100 dynamic_cast <Cooperative *> (comm) -> notifySending ();
00101 break;
00102
00103 case SCHED_REQUEST_TAG:
00104 dynamic_cast <Service *> (comm) -> packResourceRequest ();
00105 dynamic_cast <Service *> (comm) -> notifySendingResourceRequest ();
00106 break;
00107
00108 case TASK_RESULT_TAG:
00109 dynamic_cast <Worker *> (comm) -> packResult ();
00110 dynamic_cast <Worker *> (comm) -> notifySendingResult ();
00111 break;
00112
00113 case TASK_DONE_TAG:
00114 dynamic_cast <Worker *> (comm) -> packTaskDone ();
00115 dynamic_cast <Worker *> (comm) -> notifySendingTaskDone ();
00116 break;
00117
00118 default :
00119 break;
00120
00121 };
00122
00123 if (req.to == TO_ALL)
00124 sendMessageToAll (req.tag);
00125 else
00126 sendMessage (req.to, req.tag);
00127 mess.pop ();
00128 }
00129
00130 sem_post (& sem_send);
00131 }