commit 9830cf304781f4aa3b2e931969b4ffb08329e882 Author: nojhan Date: Thu Feb 20 23:04:46 2020 +0100 initial commit diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ef64cf6 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +all: pcat service service2 + +pcat: pcat.cpp + clang++ pcat.cpp -o pcat + +service: service.cpp + clang++ service.cpp -o service + +service2: service2.cpp + clang++ service2.cpp -l pthread -o service2 + diff --git a/README.md b/README.md new file mode 100644 index 0000000..95f3bc9 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +Named pipes services +==================== + +Examples of how to design servicecs that use named pipes FIFO as I/O. + diff --git a/pcat.cpp b/pcat.cpp new file mode 100644 index 0000000..5dfbfae --- /dev/null +++ b/pcat.cpp @@ -0,0 +1,21 @@ +#include +#include +#include +#include + +int main(int argc, char** argv) +{ + while(true) { + + // Blocking call on named pipes. + std::ifstream ifs(argv[1]); + std::stringstream datas; + datas << ifs.rdbuf(); + ifs.close(); + + std::string data = datas.str(); + + std::cout << datas.str(); + std::cout.flush(); + } +} diff --git a/service.cpp b/service.cpp new file mode 100644 index 0000000..2209a75 --- /dev/null +++ b/service.cpp @@ -0,0 +1,41 @@ +#include +#include +#include +#include +#include +#include + +std::string strip(std::string s) +{ + s.erase(std::find_if( s.rbegin(), s.rend(), + [](int ch) { return !std::isspace(ch); } + ).base(), s.end()); + return s; +} + +int main(int argc, char** argv) +{ + std::clog << "Start server" << std::endl; + + while(true) { + + // Blocking call on named pipes. + std::ifstream ifs(argv[1]); + std::stringstream datas; + datas << ifs.rdbuf(); + ifs.close(); + + std::string data = strip(datas.str()); + + std::ofstream ofs(argv[2]); + std::clog << "Received: <" << data << ">" << std::endl; + ofs << data << std::endl; + ofs.close(); + + if(data == "exit") { + break; + } + } + + std::clog << "Stop server" << std::endl; +} diff --git a/service2.cpp b/service2.cpp new file mode 100644 index 0000000..b3bd08f --- /dev/null +++ b/service2.cpp @@ -0,0 +1,149 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +enum ERROR { NOT_FIFO=1, NO_CONTEXT }; + +class Service +{ +protected: + bool _has_current_context; + std::mutex _mutex; + std::string _file_current_context; + std::string _file_data; + std::string _out; + const unsigned int _sleep; + std::string _current_context; + + bool has_current_context() + { + std::lock_guard guarded_scope(_mutex); + return _has_current_context; + } + + void has_current_context(bool flag) + { + std::lock_guard guarded_scope(_mutex); + _has_current_context = flag; + } + +public: + + Service( + std::string context, + std::string data, + std::string out, + unsigned int sleep = 100 + ) : + _has_current_context(false), + _file_current_context(context), + _file_data(data), + _out(out), + _sleep(sleep) + {} + + std::string strip(std::string s) + { + s.erase(std::find_if( s.rbegin(), s.rend(), + [](int ch) { return !std::isspace(ch); } + ).base(), s.end()); + return s; + } + + void update_current_context() + { + while(true) { + std::clog << "Wait for context..." << std::endl; + bool has_error = false; + try { + std::ifstream if_current_context(_file_current_context); + std::stringstream current_context; + current_context << if_current_context.rdbuf(); + if_current_context.close(); + _current_context = strip(current_context.str()); + } catch(...) { + has_error = true; + } + if(not has_error) { + this->has_current_context(true); + std::clog << "\tReceived context: " << _current_context << std::endl; + } + std::this_thread::sleep_for(std::chrono::milliseconds(_sleep)); + } + } + + void handle_data() + { + while(true) { + if(this->has_current_context()) { + std::string data; + std::clog << "Wait for data..." << std::endl; + bool has_error = false; + try { + std::ifstream if_data(_file_data); + std::stringstream datas; + datas << if_data.rdbuf(); + if_data.close(); + data = strip(datas.str()); + } catch(...) { + has_error = true; + } + if(not has_error) { + std::clog << "\tReceived data: " << data << std::endl; + + std::clog << "Do stuff..." << std::endl; + std::ostringstream result; + result << _current_context << ": " << data; + std::clog << "\tdone" << std::endl; + + std::clog << "Output..." << std::endl; + std::ofstream out(_out); + out << result.str() << std::endl; + out.close(); + std::clog << "\tdone" << std::endl; + } // if not has_error + } + std::this_thread::sleep_for(std::chrono::milliseconds(_sleep)); + } // while true + } +}; + +bool is_named_pipe_fifo(char* filename) +{ + struct stat st; + stat(filename, &st); + if(not S_ISFIFO(st.st_mode) ) { + return false; + } + return true; +} + +int main(int argc, char** argv) +{ + assert(argc = 3); + + for(size_t i=1; i < 3; ++i) { + if( not is_named_pipe_fifo(argv[i]) ) { + std::cerr << "ERROR: " << argv[i] << " is not a named pipe FIFO" << std::endl; + exit(ERROR::NOT_FIFO); + } + } + + std::clog << "Start server" << std::endl; + std::clog.flush(); + Service server(argv[1], argv[2], argv[3], 100); + + std::thread do_current_context(&Service::update_current_context, &server); + std::thread do_tasks(&Service::handle_data, &server); + + do_current_context.join(); + do_tasks.join(); + std::clog << "End" << std::endl; + +}