00001
00002
00003
00004
00005
00006
00007
00008
00009 #include <queue>
00010
00011 #include "scheduler.h"
00012 #include "tags.h"
00013 #include "mess.h"
00014 #include "../../core/peo_debug.h"
00015
00016 static std :: queue <SCHED_RESOURCE> resources;
00017
00018 static std :: queue <SCHED_REQUEST> requests;
00019
00020 static unsigned initNumberOfRes = 0;
00021
00022 void initScheduler () {
00023
00024 for (unsigned i = 0; i < the_schema.size (); i ++) {
00025
00026 const Node & node = the_schema [i];
00027
00028 if (node.rk_sched == my_node -> rk)
00029 for (unsigned j = 0; j < node.num_workers; j ++)
00030 resources.push (std :: pair <RANK_ID, WORKER_ID> (i, j + 1));
00031 }
00032 initNumberOfRes = resources.size ();
00033 }
00034
00035 bool allResourcesFree () {
00036
00037 return resources.size () == initNumberOfRes;
00038 }
00039
00040 static void update () {
00041
00042 unsigned num_alloc = std :: min (resources.size (), requests.size ());
00043
00044 for (unsigned i = 0; i < num_alloc; i ++) {
00045
00046 SCHED_REQUEST req = requests.front ();
00047 requests.pop ();
00048
00049 SCHED_RESOURCE res = resources.front ();
00050 resources.pop ();
00051
00052 printDebugMessage ("allocating a resource.");
00053 initMessage ();
00054 pack (req.second);
00055 pack (res);
00056 sendMessage (req.first, SCHED_RESULT_TAG);
00057 }
00058 }
00059
00060 void unpackResourceRequest () {
00061
00062 printDebugMessage ("queuing a resource request.");
00063 SCHED_REQUEST req;
00064 unpack (req);
00065 requests.push (req);
00066 update ();
00067 }
00068
00069 void unpackTaskDone () {
00070
00071 printDebugMessage ("I'm notified a worker is now idle.");
00072 SCHED_RESOURCE res;
00073 unpack (res);
00074 resources.push (res);
00075 if (resources.size () == initNumberOfRes)
00076 printDebugMessage ("all the resources are now free.");
00077 update ();
00078 }