diff --git a/eo/src/mpi/eoMpi.h b/eo/src/mpi/eoMpi.h index 5f430be2..a2dcebf7 100644 --- a/eo/src/mpi/eoMpi.h +++ b/eo/src/mpi/eoMpi.h @@ -221,6 +221,63 @@ namespace eo ProcessTaskFunction & processTask; IsFinishedFunction & isFinished; + struct FinallyBlock + { + FinallyBlock( + int _totalWorkers, + AssignmentAlgorithm& _algo, + Job< JobData > & _that + ) : + totalWorkers( _totalWorkers ), + assignmentAlgo( _algo ), + comm( Node::comm() ), + that( _that ) + { + // empty + } + + ~FinallyBlock() + { +# ifndef NDEBUG + eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl; +# endif + // frees all the idle workers + timerStat.start("master_wait_for_idles"); + std::vector idles = assignmentAlgo.idles(); + for(unsigned int i = 0; i < idles.size(); ++i) + { + comm.send( idles[i], Channel::Commands, Message::Finish ); + } + timerStat.stop("master_wait_for_idles"); + +# ifndef NDEBUG + eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl; +# endif + // wait for all responses + timerStat.start("master_wait_for_all_responses"); + while( assignmentAlgo.availableWorkers() != totalWorkers ) + { + bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); + int wrkRank = status.source(); + that.handleResponse( wrkRank ); + comm.send( wrkRank, Channel::Commands, Message::Finish ); + assignmentAlgo.confirm( wrkRank ); + } + timerStat.stop("master_wait_for_all_responses"); + +# ifndef NDEBUG + eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; +# endif + } + + protected: + + int totalWorkers; + bmpi::communicator & comm; + Job< JobData > & that; + AssignmentAlgorithm& assignmentAlgo; + }; + void master( ) { int totalWorkers = assignmentAlgo.availableWorkers(); @@ -228,65 +285,43 @@ namespace eo eo::log << eo::debug; eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl; # endif - while( ! isFinished() ) - { - timerStat.start("master_wait_for_assignee"); - int assignee = assignmentAlgo.get( ); - while( assignee <= 0 ) + try { + FinallyBlock finally( totalWorkers, assignmentAlgo, *this ); + while( ! isFinished() ) { + timerStat.start("master_wait_for_assignee"); + int assignee = assignmentAlgo.get( ); + while( assignee <= 0 ) + { # ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl; + eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl; # endif - bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); - int wrkRank = status.source(); + bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); + int wrkRank = status.source(); # ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl; + eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl; # endif - handleResponse( wrkRank ); - assignmentAlgo.confirm( wrkRank ); - assignee = assignmentAlgo.get( ); + handleResponse( wrkRank ); + assignmentAlgo.confirm( wrkRank ); + assignee = assignmentAlgo.get( ); + } + timerStat.stop("master_wait_for_assignee"); +# ifndef NDEBUG + eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl; +# endif + + timerStat.start("master_wait_for_send"); + comm.send( assignee, Channel::Commands, Message::Continue ); + sendTask( assignee ); + timerStat.stop("master_wait_for_send"); } - timerStat.stop("master_wait_for_assignee"); -# ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl; -# endif - - timerStat.start("master_wait_for_send"); - comm.send( assignee, Channel::Commands, Message::Continue ); - sendTask( assignee ); - timerStat.stop("master_wait_for_send"); - } - -# ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl; -# endif - // frees all the idle workers - timerStat.start("master_wait_for_idles"); - std::vector idles = assignmentAlgo.idles(); - for(unsigned int i = 0; i < idles.size(); ++i) + } catch( const std::exception & e ) { - comm.send( idles[i], Channel::Commands, Message::Finish ); + std::string s = e.what(); + s.append( " in eoMpi loop"); + throw std::runtime_error( s ); } - timerStat.stop("master_wait_for_idles"); -# ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl; -# endif - // wait for all responses - timerStat.start("master_wait_for_all_responses"); - while( assignmentAlgo.availableWorkers() != totalWorkers ) - { - bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); - int wrkRank = status.source(); - handleResponse( wrkRank ); - comm.send( wrkRank, Channel::Commands, Message::Finish ); - assignmentAlgo.confirm( wrkRank ); - } - timerStat.stop("master_wait_for_all_responses"); - -# ifndef NDEBUG - eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; -# endif } void worker( )