Policies system using tag-dispatching method (again)

git-svn-id: svn://scm.gforge.inria.fr/svnroot/paradiseo@2723 331e1502-861f-0410-8da2-ba01fb791d7f
This commit is contained in:
quemy 2012-08-24 16:55:23 +00:00
commit a7cc3dab81
10 changed files with 266 additions and 66 deletions

View file

@ -27,29 +27,70 @@ ParadisEO WebSite : http://paradiseo.gforge.inria.fr
Contact: paradiseo-help@lists.gforge.inria.fr
*/
template<class EOT>
paradiseo::smp::Scheduler<EOT>::Scheduler(unsigned workersNb) :
#include <typeinfo>
template<class EOT, class Policy>
paradiseo::smp::Scheduler<EOT,Policy>::Scheduler(unsigned workersNb) :
workers(workersNb),
popPackages(workersNb),
done(false),
planning(workersNb),
idWaitingThread(-1)
m(workersNb)
{ }
template<class EOT>
paradiseo::smp::Scheduler<EOT>::~Scheduler()
template<class EOT, class Policy>
paradiseo::smp::Scheduler<EOT,Policy>::~Scheduler()
{ }
template<class EOT>
void paradiseo::smp::Scheduler<EOT>::operator()(eoUF<EOT&, void>& func, eoPop<EOT>& pop)
template<class EOT, class Policy>
void paradiseo::smp::Scheduler<EOT,Policy>::operator()(eoUF<EOT&, void>& func, eoPop<EOT>& pop)
{
// Call the tag dispatcher
operator()(func, pop,typename policyTraits<Policy>::type());
}
template<class EOT, class Policy>
void paradiseo::smp::Scheduler<EOT,Policy>::operator()(eoUF<EOT&, void>& func, eoPop<EOT>& pop, const LinearPolicy&)
{
// Determine number of packages according to the number of workers
unsigned nbPackages = workers.size();
// Fill packages
unsigned nbIndi = pop.size() / nbPackages;
unsigned remaining = pop.size() % nbPackages;
unsigned indice = 0;
for(unsigned i = 0; i < nbPackages; i++)
{
popPackages[i].clear();
for(unsigned j = 0; j < nbIndi; j++)
{
popPackages[i].push_back(&pop[i*nbIndi+j]);
indice = i*nbIndi+j;
}
}
for(unsigned i = 0; i < remaining; i++)
popPackages[i].push_back(&pop[indice+i+1]);
// Starting threads
for(unsigned i = 0; i < workers.size(); i++)
workers[i].start(&Scheduler<EOT,Policy>::applyLinearPolicy, this, std::ref(func), std::ref(popPackages[i]));
// Wait the end of tasks
for(unsigned i = 0; i < workers.size(); i++)
workers[i].join();
}
template<class EOT, class Policy>
void paradiseo::smp::Scheduler<EOT,Policy>::operator()(eoUF<EOT&, void>& func, eoPop<EOT>& pop, const ProgressivePolicy&)
{
done = false;
idWaitingThread = -1;
for(unsigned i = 0; i < workers.size(); i++)
{
planning[i] = 2;
workers[i].start(&Scheduler<EOT>::apply, this, std::ref(func), std::ref(popPackages[i]), i);
workers[i].start(&Scheduler<EOT,Policy>::applyProgressivePolicy, this, std::ref(func), std::ref(popPackages[i]), i);
}
unsigned counter = 0;
@ -57,41 +98,51 @@ void paradiseo::smp::Scheduler<EOT>::operator()(eoUF<EOT&, void>& func, eoPop<EO
while(counter < pop.size())
{
std::unique_lock<std::mutex> lock(m);
cvt.wait(lock, [this]() -> bool {return (int)idWaitingThread != -1;});
j = 0;
while (j < planning[idWaitingThread] && counter < pop.size())
for(unsigned i = 0; i < workers.size(); i++)
{
popPackages[idWaitingThread].push_back(&pop[counter]);
counter++;
j++;
if(popPackages[i].empty() && counter < pop.size())
{
j = 0;
std::unique_lock<std::mutex> lock(m[i]);
while (j < planning[i] && counter < pop.size())
{
//std::cout << counter << std::endl;
popPackages[i].push_back(&pop[counter]);
counter++;
j++;
}
planning[i] *= 2;
}
}
planning[idWaitingThread] *= 2;
idWaitingThread = -1;
cv.notify_one();
/* A nanosleep can increase performances by 10% but
as it is not supported by Fedora atm, I deactivate it. */
//std::this_thread::sleep_for(std::chrono::nanoseconds(10));
}
done = true;
idWaitingThread = -1;
cv.notify_all();
for(unsigned i = 0; i < workers.size(); i++)
workers[i].join();
}
template<class EOT>
void paradiseo::smp::Scheduler<EOT>::apply(eoUF<EOT&, void>& func, std::vector<EOT*>& pop, int id)
template<class EOT, class Policy>
void paradiseo::smp::Scheduler<EOT,Policy>::applyLinearPolicy(eoUF<EOT&, void>& func, std::vector<EOT*>& pop)
{
for(unsigned i = 0; i < pop.size(); i++)
func(*pop[i]);
}
template<class EOT, class Policy>
void paradiseo::smp::Scheduler<EOT,Policy>::applyProgressivePolicy(eoUF<EOT&, void>& func, std::vector<EOT*>& pop, int id)
{
while(!done || !pop.empty())
{
for(unsigned i = 0; i < pop.size(); i++)
std::unique_lock<std::mutex> lock(m[id]);
for(unsigned i = 0; i < pop.size(); i++) {
//std::cout << "." << id << "." << std::endl;
func(*pop[i]);
}
pop.clear();
std::unique_lock<std::mutex> lock(m);
idWaitingThread = id;
// We notify the scheduler we finished the package
cvt.notify_one();
// We wait for a new package
cv.wait(lock, [this]() -> bool {return (int)idWaitingThread == -1 || (bool)done;});
lock.unlock();
}
}