Catching exceptions and sending them back in eoMpi loop.

This commit is contained in:
Benjamin Bouvier 2012-07-06 10:06:34 +02:00
commit 23acd1a633

View file

@ -221,6 +221,63 @@ namespace eo
ProcessTaskFunction<JobData> & processTask; ProcessTaskFunction<JobData> & processTask;
IsFinishedFunction<JobData> & isFinished; IsFinishedFunction<JobData> & isFinished;
struct FinallyBlock
{
FinallyBlock(
int _totalWorkers,
AssignmentAlgorithm& _algo,
Job< JobData > & _that
) :
totalWorkers( _totalWorkers ),
assignmentAlgo( _algo ),
comm( Node::comm() ),
that( _that )
{
// empty
}
~FinallyBlock()
{
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl;
# endif
// frees all the idle workers
timerStat.start("master_wait_for_idles");
std::vector<int> idles = assignmentAlgo.idles();
for(unsigned int i = 0; i < idles.size(); ++i)
{
comm.send( idles[i], Channel::Commands, Message::Finish );
}
timerStat.stop("master_wait_for_idles");
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl;
# endif
// wait for all responses
timerStat.start("master_wait_for_all_responses");
while( assignmentAlgo.availableWorkers() != totalWorkers )
{
bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag );
int wrkRank = status.source();
that.handleResponse( wrkRank );
comm.send( wrkRank, Channel::Commands, Message::Finish );
assignmentAlgo.confirm( wrkRank );
}
timerStat.stop("master_wait_for_all_responses");
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl;
# endif
}
protected:
int totalWorkers;
bmpi::communicator & comm;
Job< JobData > & that;
AssignmentAlgorithm& assignmentAlgo;
};
void master( ) void master( )
{ {
int totalWorkers = assignmentAlgo.availableWorkers(); int totalWorkers = assignmentAlgo.availableWorkers();
@ -228,65 +285,43 @@ namespace eo
eo::log << eo::debug; eo::log << eo::debug;
eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl; eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl;
# endif # endif
while( ! isFinished() ) try {
{ FinallyBlock finally( totalWorkers, assignmentAlgo, *this );
timerStat.start("master_wait_for_assignee"); while( ! isFinished() )
int assignee = assignmentAlgo.get( );
while( assignee <= 0 )
{ {
timerStat.start("master_wait_for_assignee");
int assignee = assignmentAlgo.get( );
while( assignee <= 0 )
{
# ifndef NDEBUG # ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl; eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl;
# endif # endif
bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag );
int wrkRank = status.source(); int wrkRank = status.source();
# ifndef NDEBUG # ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl; eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl;
# endif # endif
handleResponse( wrkRank ); handleResponse( wrkRank );
assignmentAlgo.confirm( wrkRank ); assignmentAlgo.confirm( wrkRank );
assignee = assignmentAlgo.get( ); assignee = assignmentAlgo.get( );
}
timerStat.stop("master_wait_for_assignee");
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl;
# endif
timerStat.start("master_wait_for_send");
comm.send( assignee, Channel::Commands, Message::Continue );
sendTask( assignee );
timerStat.stop("master_wait_for_send");
} }
timerStat.stop("master_wait_for_assignee"); } catch( const std::exception & e )
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl;
# endif
timerStat.start("master_wait_for_send");
comm.send( assignee, Channel::Commands, Message::Continue );
sendTask( assignee );
timerStat.stop("master_wait_for_send");
}
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl;
# endif
// frees all the idle workers
timerStat.start("master_wait_for_idles");
std::vector<int> idles = assignmentAlgo.idles();
for(unsigned int i = 0; i < idles.size(); ++i)
{ {
comm.send( idles[i], Channel::Commands, Message::Finish ); std::string s = e.what();
s.append( " in eoMpi loop");
throw std::runtime_error( s );
} }
timerStat.stop("master_wait_for_idles");
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl;
# endif
// wait for all responses
timerStat.start("master_wait_for_all_responses");
while( assignmentAlgo.availableWorkers() != totalWorkers )
{
bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag );
int wrkRank = status.source();
handleResponse( wrkRank );
comm.send( wrkRank, Channel::Commands, Message::Finish );
assignmentAlgo.confirm( wrkRank );
}
timerStat.stop("master_wait_for_all_responses");
# ifndef NDEBUG
eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl;
# endif
} }
void worker( ) void worker( )