From 6fa353595ce8f784b7d5004a6c38c78bddae94c7 Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 6 Dec 2015 20:17:47 +0000 Subject: [PATCH] Split EncodeServer into that and Server. --- src/lib/encode_server.cc | 45 ++++++-------------- src/lib/encode_server.h | 19 +++------ src/lib/exception_store.h | 6 +-- src/lib/server.cc | 71 +++++++++++++++++++++++++++++++ src/lib/server.h | 52 ++++++++++++++++++++++ src/lib/wscript | 1 + src/tools/dcpomatic_server.cc | 4 +- src/tools/dcpomatic_server_cli.cc | 4 +- test/client_server_test.cc | 12 +++--- 9 files changed, 156 insertions(+), 58 deletions(-) create mode 100644 src/lib/server.cc create mode 100644 src/lib/server.h diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc index 3f30a361a..6560bcfec 100644 --- a/src/lib/encode_server.cc +++ b/src/lib/encode_server.cc @@ -65,11 +65,11 @@ using boost::optional; using dcp::Size; using dcp::Data; -EncodeServer::EncodeServer (shared_ptr log, bool verbose) - : _terminate (false) +EncodeServer::EncodeServer (shared_ptr log, bool verbose, int num_threads) + : Server (Config::instance()->server_port_base()) , _log (log) , _verbose (verbose) - , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base())) + , _num_threads (num_threads) { } @@ -77,8 +77,7 @@ EncodeServer::EncodeServer (shared_ptr log, bool verbose) EncodeServer::~EncodeServer () { { - boost::mutex::scoped_lock lm (_worker_mutex); - _terminate = true; + boost::mutex::scoped_lock lm (_mutex); _empty_condition.notify_all (); _full_condition.notify_all (); } @@ -89,8 +88,6 @@ EncodeServer::~EncodeServer () delete i; } - _io_service.stop (); - _broadcast.io_service.stop (); if (_broadcast.thread) { DCPOMATIC_ASSERT (_broadcast.thread->joinable ()); @@ -146,7 +143,7 @@ void EncodeServer::worker_thread () { while (true) { - boost::mutex::scoped_lock lock (_worker_mutex); + boost::mutex::scoped_lock lock (_mutex); while (_queue.empty () && !_terminate) { _empty_condition.wait (lock); } @@ -209,21 +206,20 @@ EncodeServer::worker_thread () } void -EncodeServer::run (int num_threads) +EncodeServer::run () { - LOG_GENERAL ("Server starting with %1 threads", num_threads); + LOG_GENERAL ("Server starting with %1 threads", _num_threads); if (_verbose) { - cout << "DCP-o-matic server starting with " << num_threads << " threads.\n"; + cout << "DCP-o-matic server starting with " << _num_threads << " threads.\n"; } - for (int i = 0; i < num_threads; ++i) { + for (int i = 0; i < _num_threads; ++i) { _worker_threads.push_back (new thread (bind (&EncodeServer::worker_thread, this))); } _broadcast.thread = new thread (bind (&EncodeServer::broadcast_thread, this)); - start_accept (); - _io_service.run (); + Server::run (); } void @@ -283,24 +279,9 @@ EncodeServer::broadcast_received () } void -EncodeServer::start_accept () -{ - if (_terminate) { - return; - } - - shared_ptr socket (new Socket); - _acceptor.async_accept (socket->socket (), boost::bind (&EncodeServer::handle_accept, this, socket, boost::asio::placeholders::error)); -} - -void -EncodeServer::handle_accept (shared_ptr socket, boost::system::error_code const & error) +EncodeServer::handle (shared_ptr socket) { - if (error) { - return; - } - - boost::mutex::scoped_lock lock (_worker_mutex); + boost::mutex::scoped_lock lock (_mutex); /* Wait until the queue has gone down a bit */ while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) { @@ -309,6 +290,4 @@ EncodeServer::handle_accept (shared_ptr socket, boost::system::error_cod _queue.push_back (socket); _empty_condition.notify_all (); - - start_accept (); } diff --git a/src/lib/encode_server.h b/src/lib/encode_server.h index f6f1fc9b9..d0c061eb3 100644 --- a/src/lib/encode_server.h +++ b/src/lib/encode_server.h @@ -21,9 +21,10 @@ #define DCPOMATIC_ENCODE_SERVER_H /** @file src/encode_server.h - * @brief Server class. + * @brief EncodeServer class. */ +#include "server.h" #include "exception_store.h" #include #include @@ -37,34 +38,28 @@ class Log; * @brief A class to run a server which can accept requests to perform JPEG2000 * encoding work. */ -class EncodeServer : public ExceptionStore, public boost::noncopyable +class EncodeServer : public Server, public ExceptionStore { public: - EncodeServer (boost::shared_ptr log, bool verbose); + EncodeServer (boost::shared_ptr log, bool verbose, int num_threads); ~EncodeServer (); - void run (int num_threads); + void run (); private: + void handle (boost::shared_ptr); void worker_thread (); int process (boost::shared_ptr socket, struct timeval &, struct timeval &); void broadcast_thread (); void broadcast_received (); - void start_accept (); - void handle_accept (boost::shared_ptr, boost::system::error_code const &); - - bool _terminate; std::vector _worker_threads; std::list > _queue; - boost::mutex _worker_mutex; boost::condition _full_condition; boost::condition _empty_condition; boost::shared_ptr _log; bool _verbose; - - boost::asio::io_service _io_service; - boost::asio::ip::tcp::acceptor _acceptor; + int _num_threads; struct Broadcast { diff --git a/src/lib/exception_store.h b/src/lib/exception_store.h index d7c34c0f4..de0bed029 100644 --- a/src/lib/exception_store.h +++ b/src/lib/exception_store.h @@ -46,7 +46,7 @@ class ExceptionStore { public: void rethrow () { - boost::mutex::scoped_lock lm (_mutex); + boost::mutex::scoped_lock lm (_exception_mutex); if (_exception) { boost::exception_ptr tmp = _exception; _exception = boost::exception_ptr (); @@ -57,13 +57,13 @@ public: protected: void store_current () { - boost::mutex::scoped_lock lm (_mutex); + boost::mutex::scoped_lock lm (_exception_mutex); _exception = boost::current_exception (); } private: boost::exception_ptr _exception; - mutable boost::mutex _mutex; + mutable boost::mutex _exception_mutex; }; #endif diff --git a/src/lib/server.cc b/src/lib/server.cc new file mode 100644 index 000000000..f137d07f8 --- /dev/null +++ b/src/lib/server.cc @@ -0,0 +1,71 @@ +/* + Copyright (C) 2012-2015 Carl Hetherington + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +*/ + +#include "server.h" +#include "dcpomatic_socket.h" + +#include "i18n.h" + +using boost::shared_ptr; + +Server::Server (int port) + : _terminate (false) + , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port)) +{ + +} + +Server::~Server () +{ + boost::mutex::scoped_lock lm (_mutex); + _terminate = true; + _io_service.stop (); +} + +void +Server::run () +{ + start_accept (); + _io_service.run (); +} + +void +Server::start_accept () +{ + { + boost::mutex::scoped_lock lm (_mutex); + if (_terminate) { + return; + } + } + + shared_ptr socket (new Socket); + _acceptor.async_accept (socket->socket (), boost::bind (&Server::handle_accept, this, socket, boost::asio::placeholders::error)); +} + +void +Server::handle_accept (shared_ptr socket, boost::system::error_code const & error) +{ + if (error) { + return; + } + + handle (socket); + start_accept (); +} diff --git a/src/lib/server.h b/src/lib/server.h new file mode 100644 index 000000000..3e7c7945a --- /dev/null +++ b/src/lib/server.h @@ -0,0 +1,52 @@ +/* + Copyright (C) 2015 Carl Hetherington + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +*/ + +#ifndef DCPOMATIC_SERVER_H +#define DCPOMATIC_SERVER_H + +#include +#include +#include +#include + +class Socket; + +class Server : public boost::noncopyable +{ +public: + Server (int port); + virtual ~Server (); + + virtual void run (); + +protected: + boost::mutex _mutex; + bool _terminate; + +private: + virtual void handle (boost::shared_ptr socket) = 0; + + void start_accept (); + void handle_accept (boost::shared_ptr, boost::system::error_code const &); + + boost::asio::io_service _io_service; + boost::asio::ip::tcp::acceptor _acceptor; +}; + +#endif diff --git a/src/lib/wscript b/src/lib/wscript index 0f0b0c14f..f61445685 100644 --- a/src/lib/wscript +++ b/src/lib/wscript @@ -109,6 +109,7 @@ sources = """ screen_kdm.cc send_kdm_email_job.cc send_problem_report_job.cc + server.cc single_stream_audio_content.cc sndfile_base.cc sndfile_content.cc diff --git a/src/tools/dcpomatic_server.cc b/src/tools/dcpomatic_server.cc index e339b1966..5fb8cb3af 100644 --- a/src/tools/dcpomatic_server.cc +++ b/src/tools/dcpomatic_server.cc @@ -291,8 +291,8 @@ private: void main_thread () try { - EncodeServer server (server_log, false); - server.run (Config::instance()->num_local_encoding_threads ()); + EncodeServer server (server_log, false, Config::instance()->num_local_encoding_threads()); + server.run (); } catch (...) { store_current (); } diff --git a/src/tools/dcpomatic_server_cli.cc b/src/tools/dcpomatic_server_cli.cc index 8f343b344..336d578ee 100644 --- a/src/tools/dcpomatic_server_cli.cc +++ b/src/tools/dcpomatic_server_cli.cc @@ -110,10 +110,10 @@ main (int argc, char* argv[]) log.reset (new NullLog); } - EncodeServer server (log, verbose); + EncodeServer server (log, verbose, num_threads); try { - server.run (num_threads); + server.run (); } catch (boost::system::system_error& e) { if (e.code() == boost::system::errc::address_in_use) { cerr << argv[0] << ": address already in use. Is another DCP-o-matic server instance already running?\n"; diff --git a/test/client_server_test.cc b/test/client_server_test.cc index 6f7a170d0..e7502af0e 100644 --- a/test/client_server_test.cc +++ b/test/client_server_test.cc @@ -112,9 +112,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb) Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2)); - EncodeServer* server = new EncodeServer (log, true); + EncodeServer* server = new EncodeServer (log, true, 2); - new thread (boost::bind (&EncodeServer::run, server, 2)); + new thread (boost::bind (&EncodeServer::run, server)); /* Let the server get itself ready */ dcpomatic_sleep (1); @@ -192,9 +192,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv) Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2)); - EncodeServer* server = new EncodeServer (log, true); + EncodeServer* server = new EncodeServer (log, true, 2); - new thread (boost::bind (&EncodeServer::run, server, 2)); + new thread (boost::bind (&EncodeServer::run, server)); /* Let the server get itself ready */ dcpomatic_sleep (1); @@ -284,9 +284,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k) Data j2k_locally_encoded = j2k_frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2)); - EncodeServer* server = new EncodeServer (log, true); + EncodeServer* server = new EncodeServer (log, true, 2); - new thread (boost::bind (&EncodeServer::run, server, 2)); + new thread (boost::bind (&EncodeServer::run, server)); /* Let the server get itself ready */ dcpomatic_sleep (1); -- 2.30.2