00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "comm.h"
00010 #include "tags.h"
00011 #include "worker.h"
00012 #include "scheduler.h"
00013 #include "mess.h"
00014 #include "node.h"
00015 #include "../../core/runner.h"
00016 #include "../../core/cooperative.h"
00017 #include "../../core/peo_debug.h"
00018
00019 void receiveMessages () {
00020
00021 cleanBuffers ();
00022
00023 do {
00024
00025 if (! atLeastOneActiveThread ()) {
00026
00027 waitMessage ();
00028
00029 }
00030
00031 int src, tag;
00032
00033 while (probeMessage (src, tag)) {
00034
00035 receiveMessage (src, tag);
00036 initMessage ();
00037
00038
00039
00040
00041
00042
00043 switch (tag) {
00044
00045 case RUNNER_STOP_TAG:
00046 unpackTerminationOfRunner ();
00047 wakeUpCommunicator ();
00048 break;
00049
00050 case COOP_TAG:
00051
00052 COOP_ID coop_id;
00053 unpack (coop_id);
00054 getCooperative (coop_id) -> unpack ();
00055 break;
00056
00057 case SCHED_REQUEST_TAG:
00058 unpackResourceRequest ();
00059 break;
00060
00061 case SCHED_RESULT_TAG:
00062 {
00063
00064 SERVICE_ID serv_id;
00065 unpack (serv_id);
00066 Service * serv = getService (serv_id);
00067 int dest;
00068 unpack (dest);
00069 WORKER_ID worker_id;
00070 unpack (worker_id);
00071
00072
00073 initMessage ();
00074 pack (worker_id);
00075 pack (serv_id);
00076 serv -> packData ();
00077 serv -> notifySendingData ();
00078 sendMessage (dest, TASK_DATA_TAG);
00079 break;
00080 }
00081
00082 case TASK_DATA_TAG:
00083 {
00084 WORKER_ID worker_id;
00085 unpack (worker_id);
00086 Worker * worker = getWorker (worker_id);
00087 worker -> setSource (src);
00088 worker -> unpackData ();
00089 worker -> wakeUp ();
00090 break;
00091 }
00092
00093 case TASK_RESULT_TAG:
00094 {
00095 SERVICE_ID serv_id;
00096 unpack (serv_id);
00097 Service * serv = getService (serv_id);
00098 serv -> unpackResult ();
00099 break;
00100 }
00101
00102 case TASK_DONE_TAG:
00103 unpackTaskDone ();
00104 break;
00105
00106 default:
00107 ;
00108 };
00109 }
00110
00111 } while (! atLeastOneActiveThread () && atLeastOneActiveRunner () );
00112 }