From 6503f61521126418b9bb0d56d19e619f53d149ce Mon Sep 17 00:00:00 2001 From: Benjamin BOUVIER Date: Mon, 1 Oct 2012 21:50:23 -0400 Subject: [PATCH] [MPI Distrib exp: serializable class Experiment added. --- eo/test/mpi/t-mpi-distrib-exp.cpp | 161 ++++++++++++++++++++++-------- 1 file changed, 117 insertions(+), 44 deletions(-) diff --git a/eo/test/mpi/t-mpi-distrib-exp.cpp b/eo/test/mpi/t-mpi-distrib-exp.cpp index 1b5afbf9e..43a7291aa 100644 --- a/eo/test/mpi/t-mpi-distrib-exp.cpp +++ b/eo/test/mpi/t-mpi-distrib-exp.cpp @@ -281,10 +281,120 @@ class ExponentialDistribution : public Distribution } exponentialDistribution; +class Experiment : public eoserial::Persistent +{ + public: + + Experiment() : _distribution(0) + { + // empty + } + + Experiment( Distribution* distrib, unsigned size, unsigned packet_size, bool print_waiting_time ) : + _distribution( distrib ), + _size( size ), + _packet_size( packet_size ), + _worker_print_waiting_time( print_waiting_time ) + { + // empty + } + + eoserial::Object* pack( void ) const + { + eoserial::Object* obj = new eoserial::Object; + obj->add( "size", eoserial::make( _size ) ); + obj->add( "packet-size", eoserial::make( _packet_size ) ); + obj->add( "worker-print-waiting-time", eoserial::make( _worker_print_waiting_time ) ); + if( _distribution ) + { + obj->add( "distribution", _distribution ); + } + return obj; + } + + void unpack( const eoserial::Object* obj ) + { + eoserial::unpack( *obj, "size", _size ); + eoserial::unpack( *obj, "packet-size", _packet_size ); + eoserial::unpack( *obj, "worker-print-waiting-time", _worker_print_waiting_time ); + + eoserial::Object* distribObject = static_cast( obj->find("distribution")->second ); + std::string distribName = *static_cast( distribObject->find("name")->second ); + + // TODO find a better design... + if( distribName == "normal" ) { + _distribution = & normalDistribution; + } else if( distribName == "uniform" ) { + _distribution = & uniformDistribution; + } else if( distribName == "exponential" ) { + _distribution = & exponentialDistribution; + } else { + throw std::runtime_error("When unpacking experience, no distribution found."); + } + + eoserial::unpackObject( *obj, "distribution", *_distribution ); + } + + void run() + { + mpi::communicator& comm = eo::mpi::Node::comm(); + timerStat.start("run"); + _distribution->clear(); + _distribution->fill( _size ); + + Wait wait( _worker_print_waiting_time ); + ParallelApplyStore< type > store( wait, DEFAULT_MASTER, _packet_size ); + store.data( *_distribution ); + DynamicAssignmentAlgorithm scheduling; + ParallelApply< type > job( scheduling, DEFAULT_MASTER, store ); + + job.run(); + timerStat.stop("run"); + if( job.isMaster() ) + { + EmptyJob( scheduling, DEFAULT_MASTER ); // to terminate parallel apply + // Receive statistics + typedef std::map< std::string, eoTimerStat::Stat > typeStats; + // TODO put that in a file instead + std::cout << std::fixed << std::setprecision( 5 ); + for( int i = 1, s = comm.size(); i < s; ++i ) + { + eoTimerStat timerStat; + comm.recv( i, eo::mpi::Channel::Commands, timerStat ); + typeStats stats = timerStat.stats(); + for( typeStats::iterator it = stats.begin(), + end = stats.end(); + it != end; + ++it ) + { + std::cout << "Worker " << i << ": Wallclock time of " << it->first << std::endl; + for( int j = 0, t = it->second.wtime.size(); j < t; ++j ) + { + std::cout << it->second.wtime[j] << " "; + } + std::cout << std::endl; + } + std::cout << std::endl; + } + } else + { + // Send statistics + comm.send( DEFAULT_MASTER, eo::mpi::Channel::Commands, eo::mpi::timerStat ); + } + timerStat.clear(); + } + + private: + + Distribution* _distribution; + unsigned _size; + unsigned _packet_size; + bool _worker_print_waiting_time; +}; + int main( int argc, char** argv ) { Node::init( argc, argv ); - mpi::communicator& comm = eo::mpi::Node::comm(); eoParser parser( argc, argv ); // forces the statistics to be retrieved @@ -324,56 +434,19 @@ int main( int argc, char** argv ) make_parallel( parser ); make_help( parser ); - timerStat.start("main"); if( !isChosenDistrib ) { throw std::runtime_error("No distribution chosen. One distribution should be chosen."); } - // Fill distribution - Distribution& distrib = *pdistrib; - distrib.fill( size ); + Experiment e( pdistrib, size, packet_size, worker_print_waiting_time ); + eoserial::Object* obj = e.pack(); + obj->print( std::cout ); + delete obj; + std::cout << '\n' << std::endl; - Wait wait( worker_print_waiting_time ); - ParallelApplyStore< type > store( wait, DEFAULT_MASTER, packet_size ); - store.data( distrib ); - DynamicAssignmentAlgorithm scheduling; - ParallelApply< type > job( scheduling, DEFAULT_MASTER, store ); - - job.run(); - - timerStat.stop("main"); - if( job.isMaster() ) - { - EmptyJob( scheduling, DEFAULT_MASTER ); // to terminate parallel apply - // Receive statistics - typedef std::map< std::string, eoTimerStat::Stat > typeStats; - std::cout << std::fixed << std::setprecision( 5 ); - for( int i = 1, s = comm.size(); i < s; ++i ) - { - eoTimerStat timerStat; - comm.recv( i, eo::mpi::Channel::Commands, timerStat ); - typeStats stats = timerStat.stats(); - for( typeStats::iterator it = stats.begin(), - end = stats.end(); - it != end; - ++it ) - { - std::cout << "Worker " << i << ": Wallclock time of " << it->first << std::endl; - for( int j = 0, t = it->second.wtime.size(); j < t; ++j ) - { - std::cout << it->second.wtime[j] << " "; - } - std::cout << std::endl; - } - std::cout << std::endl; - } - } else - { - // Send statistics - comm.send( DEFAULT_MASTER, eo::mpi::Channel::Commands, eo::mpi::timerStat ); - } + e.run(); return 0; }