[MPI Distrib exp: serializable class Experiment added.

This commit is contained in:
Benjamin BOUVIER 2012-10-01 21:50:23 -04:00
commit 6503f61521

View file

@ -281,10 +281,120 @@ class ExponentialDistribution : public Distribution
} exponentialDistribution;
class Experiment : public eoserial::Persistent
{
public:
Experiment() : _distribution(0)
{
// empty
}
Experiment( Distribution* distrib, unsigned size, unsigned packet_size, bool print_waiting_time ) :
_distribution( distrib ),
_size( size ),
_packet_size( packet_size ),
_worker_print_waiting_time( print_waiting_time )
{
// empty
}
eoserial::Object* pack( void ) const
{
eoserial::Object* obj = new eoserial::Object;
obj->add( "size", eoserial::make( _size ) );
obj->add( "packet-size", eoserial::make( _packet_size ) );
obj->add( "worker-print-waiting-time", eoserial::make( _worker_print_waiting_time ) );
if( _distribution )
{
obj->add( "distribution", _distribution );
}
return obj;
}
void unpack( const eoserial::Object* obj )
{
eoserial::unpack( *obj, "size", _size );
eoserial::unpack( *obj, "packet-size", _packet_size );
eoserial::unpack( *obj, "worker-print-waiting-time", _worker_print_waiting_time );
eoserial::Object* distribObject = static_cast<eoserial::Object*>( obj->find("distribution")->second );
std::string distribName = *static_cast<eoserial::String*>( distribObject->find("name")->second );
// TODO find a better design...
if( distribName == "normal" ) {
_distribution = & normalDistribution;
} else if( distribName == "uniform" ) {
_distribution = & uniformDistribution;
} else if( distribName == "exponential" ) {
_distribution = & exponentialDistribution;
} else {
throw std::runtime_error("When unpacking experience, no distribution found.");
}
eoserial::unpackObject( *obj, "distribution", *_distribution );
}
void run()
{
mpi::communicator& comm = eo::mpi::Node::comm();
timerStat.start("run");
_distribution->clear();
_distribution->fill( _size );
Wait wait( _worker_print_waiting_time );
ParallelApplyStore< type > store( wait, DEFAULT_MASTER, _packet_size );
store.data( *_distribution );
DynamicAssignmentAlgorithm scheduling;
ParallelApply< type > job( scheduling, DEFAULT_MASTER, store );
job.run();
timerStat.stop("run");
if( job.isMaster() )
{
EmptyJob( scheduling, DEFAULT_MASTER ); // to terminate parallel apply
// Receive statistics
typedef std::map< std::string, eoTimerStat::Stat > typeStats;
// TODO put that in a file instead
std::cout << std::fixed << std::setprecision( 5 );
for( int i = 1, s = comm.size(); i < s; ++i )
{
eoTimerStat timerStat;
comm.recv( i, eo::mpi::Channel::Commands, timerStat );
typeStats stats = timerStat.stats();
for( typeStats::iterator it = stats.begin(),
end = stats.end();
it != end;
++it )
{
std::cout << "Worker " << i << ": Wallclock time of " << it->first << std::endl;
for( int j = 0, t = it->second.wtime.size(); j < t; ++j )
{
std::cout << it->second.wtime[j] << " ";
}
std::cout << std::endl;
}
std::cout << std::endl;
}
} else
{
// Send statistics
comm.send( DEFAULT_MASTER, eo::mpi::Channel::Commands, eo::mpi::timerStat );
}
timerStat.clear();
}
private:
Distribution* _distribution;
unsigned _size;
unsigned _packet_size;
bool _worker_print_waiting_time;
};
int main( int argc, char** argv )
{
Node::init( argc, argv );
mpi::communicator& comm = eo::mpi::Node::comm();
eoParser parser( argc, argv );
// forces the statistics to be retrieved
@ -324,56 +434,19 @@ int main( int argc, char** argv )
make_parallel( parser );
make_help( parser );
timerStat.start("main");
if( !isChosenDistrib )
{
throw std::runtime_error("No distribution chosen. One distribution should be chosen.");
}
// Fill distribution
Distribution& distrib = *pdistrib;
distrib.fill( size );
Experiment e( pdistrib, size, packet_size, worker_print_waiting_time );
eoserial::Object* obj = e.pack();
obj->print( std::cout );
delete obj;
std::cout << '\n' << std::endl;
Wait wait( worker_print_waiting_time );
ParallelApplyStore< type > store( wait, DEFAULT_MASTER, packet_size );
store.data( distrib );
DynamicAssignmentAlgorithm scheduling;
ParallelApply< type > job( scheduling, DEFAULT_MASTER, store );
job.run();
timerStat.stop("main");
if( job.isMaster() )
{
EmptyJob( scheduling, DEFAULT_MASTER ); // to terminate parallel apply
// Receive statistics
typedef std::map< std::string, eoTimerStat::Stat > typeStats;
std::cout << std::fixed << std::setprecision( 5 );
for( int i = 1, s = comm.size(); i < s; ++i )
{
eoTimerStat timerStat;
comm.recv( i, eo::mpi::Channel::Commands, timerStat );
typeStats stats = timerStat.stats();
for( typeStats::iterator it = stats.begin(),
end = stats.end();
it != end;
++it )
{
std::cout << "Worker " << i << ": Wallclock time of " << it->first << std::endl;
for( int j = 0, t = it->second.wtime.size(); j < t; ++j )
{
std::cout << it->second.wtime[j] << " ";
}
std::cout << std::endl;
}
std::cout << std::endl;
}
} else
{
// Send statistics
comm.send( DEFAULT_MASTER, eo::mpi::Channel::Commands, eo::mpi::timerStat );
}
e.run();
return 0;
}