00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <queue>
00025
00026 #include "scheduler.h"
00027 #include "tags.h"
00028 #include "mess.h"
00029 #include "../../core/peo_debug.h"
00030
00031 static std :: queue <SCHED_RESOURCE> resources;
00032
00033 static std :: queue <SCHED_REQUEST> requests;
00034
00035 static unsigned initNumberOfRes = 0;
00036
00037 void initScheduler () {
00038
00039 for (unsigned i = 0; i < the_schema.size (); i ++) {
00040
00041 const Node & node = the_schema [i];
00042
00043 if (node.rk_sched == my_node -> rk)
00044 for (unsigned j = 0; j < node.num_workers; j ++)
00045 resources.push (std :: pair <RANK_ID, WORKER_ID> (i, j + 1));
00046 }
00047 initNumberOfRes = resources.size ();
00048 }
00049
00050 bool allResourcesFree () {
00051
00052 return resources.size () == initNumberOfRes;
00053 }
00054
00055 static void update () {
00056
00057 unsigned num_alloc = std :: min (resources.size (), requests.size ());
00058
00059 for (unsigned i = 0; i < num_alloc; i ++) {
00060
00061 SCHED_REQUEST req = requests.front ();
00062 requests.pop ();
00063
00064 SCHED_RESOURCE res = resources.front ();
00065 resources.pop ();
00066
00067 printDebugMessage ("allocating a resource.");
00068 initMessage ();
00069 pack (req.second);
00070 pack (res);
00071 sendMessage (req.first, SCHED_RESULT_TAG);
00072 }
00073 }
00074
00075 void unpackResourceRequest () {
00076
00077 printDebugMessage ("queuing a resource request.");
00078 SCHED_REQUEST req;
00079 unpack (req);
00080 requests.push (req);
00081 update ();
00082 }
00083
00084 void unpackTaskDone () {
00085
00086 printDebugMessage ("I'm notified a worker is now idle.");
00087 SCHED_RESOURCE res;
00088 unpack (res);
00089 resources.push (res);
00090 if (resources.size () == initNumberOfRes)
00091 printDebugMessage ("all the resources are now free.");
00092 update ();
00093 }