Fix critical memory issue. Sent messages are launched asynchronously and only -std::future- results are stocked. This enable to check if a thread has completed its task (update methods in Model and Island return always true in that purpose) unlike the std:🧵:joinable method.
This commit is contained in:
parent
cdeb1929e2
commit
ca44e95add
5 changed files with 25 additions and 13 deletions
|
|
@ -74,7 +74,7 @@ public:
|
||||||
* Update the island by adding population to send in the imigrants list.
|
* Update the island by adding population to send in the imigrants list.
|
||||||
* @param _data Population to integrate.
|
* @param _data Population to integrate.
|
||||||
*/
|
*/
|
||||||
virtual void update(eoPop<bEOT> _data) = 0;
|
virtual bool update(eoPop<bEOT> _data) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the algorithm is stopped.
|
* Check if the algorithm is stopped.
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,7 @@ void paradiseo::smp::Island<EOAlgo,EOT,bEOT>::operator()()
|
||||||
stopped = true;
|
stopped = true;
|
||||||
// Let's wait the end of communications with the island model
|
// Let's wait the end of communications with the island model
|
||||||
for(auto& message : sentMessages)
|
for(auto& message : sentMessages)
|
||||||
message.join();
|
message.wait();
|
||||||
|
|
||||||
// Clear the sentMessages container
|
// Clear the sentMessages container
|
||||||
sentMessages.clear();
|
sentMessages.clear();
|
||||||
|
|
@ -125,12 +125,12 @@ void paradiseo::smp::Island<EOAlgo,EOT,bEOT>::send(eoSelect<EOT>& _select)
|
||||||
|
|
||||||
// Delete delivered messages
|
// Delete delivered messages
|
||||||
sentMessages.erase(std::remove_if(sentMessages.begin(), sentMessages.end(),
|
sentMessages.erase(std::remove_if(sentMessages.begin(), sentMessages.end(),
|
||||||
[&](std::thread& i) -> bool
|
[&](std::shared_future<bool>& i) -> bool
|
||||||
{ return !i.joinable(); }
|
{ return i.wait_for(std::chrono::nanoseconds(0)) == std::future_status::ready; }
|
||||||
),
|
),
|
||||||
sentMessages.end());
|
sentMessages.end());
|
||||||
|
|
||||||
sentMessages.push_back(std::thread(&IslandModel<bEOT>::update, model, std::move(baseMigPop), this));
|
sentMessages.push_back(std::async(std::launch::async, &IslandModel<bEOT>::update, model, std::move(baseMigPop), this));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -159,9 +159,11 @@ void paradiseo::smp::Island<EOAlgo,EOT,bEOT>::receive(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
template<template <class> class EOAlgo, class EOT, class bEOT>
|
template<template <class> class EOAlgo, class EOT, class bEOT>
|
||||||
void paradiseo::smp::Island<EOAlgo,EOT,bEOT>::update(eoPop<bEOT> _data)
|
bool paradiseo::smp::Island<EOAlgo,EOT,bEOT>::update(eoPop<bEOT> _data)
|
||||||
{
|
{
|
||||||
//std::cout << "On update dans l'île" << std::endl;
|
//std::cout << "On update dans l'île" << std::endl;
|
||||||
std::lock_guard<std::mutex> lock(this->m);
|
std::lock_guard<std::mutex> lock(this->m);
|
||||||
listImigrants.push(_data);
|
listImigrants.push(_data);
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@ Contact: paradiseo-help@lists.gforge.inria.fr
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <future>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
|
|
@ -113,7 +114,7 @@ public:
|
||||||
* Update the list of imigrants.
|
* Update the list of imigrants.
|
||||||
* @param _data Elements to integrate in the main population.
|
* @param _data Elements to integrate in the main population.
|
||||||
*/
|
*/
|
||||||
void update(eoPop<bEOT> _data);
|
bool update(eoPop<bEOT> _data);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the algorithm is stopped.
|
* Check if the algorithm is stopped.
|
||||||
|
|
@ -148,7 +149,7 @@ protected:
|
||||||
IntPolicy<EOT>& intPolicy;
|
IntPolicy<EOT>& intPolicy;
|
||||||
MigPolicy<EOT>& migPolicy;
|
MigPolicy<EOT>& migPolicy;
|
||||||
std::atomic<bool> stopped;
|
std::atomic<bool> stopped;
|
||||||
std::vector<std::thread> sentMessages;
|
std::vector<std::shared_future<bool>> sentMessages;
|
||||||
IslandModel<bEOT>* model;
|
IslandModel<bEOT>* model;
|
||||||
std::function<EOT(bEOT&)> convertFromBase;
|
std::function<EOT(bEOT&)> convertFromBase;
|
||||||
std::function<bEOT(EOT&)> convertToBase;
|
std::function<bEOT(EOT&)> convertToBase;
|
||||||
|
|
|
||||||
|
|
@ -98,7 +98,7 @@ void paradiseo::smp::IslandModel<EOT>::operator()()
|
||||||
|
|
||||||
// Wait the end of messages sending
|
// Wait the end of messages sending
|
||||||
for(auto& message : sentMessages)
|
for(auto& message : sentMessages)
|
||||||
message.join();
|
message.wait();
|
||||||
|
|
||||||
// Clear the sentMessages container
|
// Clear the sentMessages container
|
||||||
sentMessages.clear();
|
sentMessages.clear();
|
||||||
|
|
@ -119,10 +119,12 @@ void paradiseo::smp::IslandModel<EOT>::operator()()
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class EOT>
|
template<class EOT>
|
||||||
void paradiseo::smp::IslandModel<EOT>::update(eoPop<EOT> _data, AIsland<EOT>* _island)
|
bool paradiseo::smp::IslandModel<EOT>::update(eoPop<EOT> _data, AIsland<EOT>* _island)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(m);
|
std::lock_guard<std::mutex> lock(m);
|
||||||
listEmigrants.push(std::pair<eoPop<EOT>,AIsland<EOT>*>(_data, _island));
|
listEmigrants.push(std::pair<eoPop<EOT>,AIsland<EOT>*>(_data, _island));
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class EOT>
|
template<class EOT>
|
||||||
|
|
@ -154,11 +156,17 @@ void paradiseo::smp::IslandModel<EOT>::send(void)
|
||||||
|
|
||||||
// Send elements to neighbors
|
// Send elements to neighbors
|
||||||
eoPop<EOT> migPop = std::move(listEmigrants.front().first);
|
eoPop<EOT> migPop = std::move(listEmigrants.front().first);
|
||||||
|
sentMessages.erase(std::remove_if(sentMessages.begin(), sentMessages.end(),
|
||||||
|
[&](std::shared_future<bool>& i) -> bool
|
||||||
|
{ return i.wait_for(std::chrono::nanoseconds(0)) == std::future_status::ready; }
|
||||||
|
),
|
||||||
|
sentMessages.end());
|
||||||
for (unsigned idTo : neighbors)
|
for (unsigned idTo : neighbors)
|
||||||
sentMessages.push_back(std::thread(&AIsland<EOT>::update, table.getRight()[idTo], std::move(migPop)));
|
sentMessages.push_back(std::async(std::launch::async, &AIsland<EOT>::update, table.getRight()[idTo], std::move(migPop)));
|
||||||
|
|
||||||
listEmigrants.pop();
|
listEmigrants.pop();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class EOT>
|
template<class EOT>
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@ Contact: paradiseo-help@lists.gforge.inria.fr
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
#include <future>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
#include <bimap.h>
|
#include <bimap.h>
|
||||||
|
|
@ -71,7 +72,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Update the island model by adding population to send in the emigrants list.
|
* Update the island model by adding population to send in the emigrants list.
|
||||||
*/
|
*/
|
||||||
void update(eoPop<EOT> _data, AIsland<EOT>* _island);
|
bool update(eoPop<EOT> _data, AIsland<EOT>* _island);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Change topology
|
* Change topology
|
||||||
|
|
@ -99,7 +100,7 @@ protected:
|
||||||
Bimap<unsigned, AIsland<EOT>*> table;
|
Bimap<unsigned, AIsland<EOT>*> table;
|
||||||
std::vector<std::pair<AIsland<EOT>*, bool>> islands;
|
std::vector<std::pair<AIsland<EOT>*, bool>> islands;
|
||||||
AbstractTopology& topo;
|
AbstractTopology& topo;
|
||||||
std::vector<std::thread> sentMessages;
|
std::vector<std::shared_future<bool>> sentMessages;
|
||||||
std::mutex m;
|
std::mutex m;
|
||||||
std::atomic<bool> running;
|
std::atomic<bool> running;
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue