Split EncodeServer into that and Server.
authorCarl Hetherington <cth@carlh.net>
Sun, 6 Dec 2015 20:17:47 +0000 (20:17 +0000)
committerCarl Hetherington <cth@carlh.net>
Fri, 11 Dec 2015 11:56:23 +0000 (11:56 +0000)
src/lib/encode_server.cc
src/lib/encode_server.h
src/lib/exception_store.h
src/lib/server.cc [new file with mode: 0644]
src/lib/server.h [new file with mode: 0644]
src/lib/wscript
src/tools/dcpomatic_server.cc
src/tools/dcpomatic_server_cli.cc
test/client_server_test.cc

index 3f30a361a898a49587202cb1a2a7f8b512193a29..6560bcfec9ebf52b7dbeb5d0dc4d66ec25dcde05 100644 (file)
@@ -65,11 +65,11 @@ using boost::optional;
 using dcp::Size;
 using dcp::Data;
 
-EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose)
-       : _terminate (false)
+EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose, int num_threads)
+       : Server (Config::instance()->server_port_base())
        , _log (log)
        , _verbose (verbose)
-       , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
+       , _num_threads (num_threads)
 {
 
 }
@@ -77,8 +77,7 @@ EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose)
 EncodeServer::~EncodeServer ()
 {
        {
-               boost::mutex::scoped_lock lm (_worker_mutex);
-               _terminate = true;
+               boost::mutex::scoped_lock lm (_mutex);
                _empty_condition.notify_all ();
                _full_condition.notify_all ();
        }
@@ -89,8 +88,6 @@ EncodeServer::~EncodeServer ()
                delete i;
        }
 
-       _io_service.stop ();
-
        _broadcast.io_service.stop ();
        if (_broadcast.thread) {
                DCPOMATIC_ASSERT (_broadcast.thread->joinable ());
@@ -146,7 +143,7 @@ void
 EncodeServer::worker_thread ()
 {
        while (true) {
-               boost::mutex::scoped_lock lock (_worker_mutex);
+               boost::mutex::scoped_lock lock (_mutex);
                while (_queue.empty () && !_terminate) {
                        _empty_condition.wait (lock);
                }
@@ -209,21 +206,20 @@ EncodeServer::worker_thread ()
 }
 
 void
-EncodeServer::run (int num_threads)
+EncodeServer::run ()
 {
-       LOG_GENERAL ("Server starting with %1 threads", num_threads);
+       LOG_GENERAL ("Server starting with %1 threads", _num_threads);
        if (_verbose) {
-               cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
+               cout << "DCP-o-matic server starting with " << _num_threads << " threads.\n";
        }
 
-       for (int i = 0; i < num_threads; ++i) {
+       for (int i = 0; i < _num_threads; ++i) {
                _worker_threads.push_back (new thread (bind (&EncodeServer::worker_thread, this)));
        }
 
        _broadcast.thread = new thread (bind (&EncodeServer::broadcast_thread, this));
 
-       start_accept ();
-       _io_service.run ();
+       Server::run ();
 }
 
 void
@@ -283,24 +279,9 @@ EncodeServer::broadcast_received ()
 }
 
 void
-EncodeServer::start_accept ()
-{
-       if (_terminate) {
-               return;
-       }
-
-       shared_ptr<Socket> socket (new Socket);
-       _acceptor.async_accept (socket->socket (), boost::bind (&EncodeServer::handle_accept, this, socket, boost::asio::placeholders::error));
-}
-
-void
-EncodeServer::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+EncodeServer::handle (shared_ptr<Socket> socket)
 {
-       if (error) {
-               return;
-       }
-
-       boost::mutex::scoped_lock lock (_worker_mutex);
+       boost::mutex::scoped_lock lock (_mutex);
 
        /* Wait until the queue has gone down a bit */
        while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
@@ -309,6 +290,4 @@ EncodeServer::handle_accept (shared_ptr<Socket> socket, boost::system::error_cod
 
        _queue.push_back (socket);
        _empty_condition.notify_all ();
-
-       start_accept ();
 }
index f6f1fc9b9c0e2a82071c1fb298fa965039db7cc2..d0c061eb3755d01d0887ee2a2ac906da95412728 100644 (file)
 #define DCPOMATIC_ENCODE_SERVER_H
 
 /** @file src/encode_server.h
- *  @brief Server class.
+ *  @brief EncodeServer class.
  */
 
+#include "server.h"
 #include "exception_store.h"
 #include <boost/thread.hpp>
 #include <boost/asio.hpp>
@@ -37,34 +38,28 @@ class Log;
  *  @brief A class to run a server which can accept requests to perform JPEG2000
  *  encoding work.
  */
-class EncodeServer : public ExceptionStore, public boost::noncopyable
+class EncodeServer : public Server, public ExceptionStore
 {
 public:
-       EncodeServer (boost::shared_ptr<Log> log, bool verbose);
+       EncodeServer (boost::shared_ptr<Log> log, bool verbose, int num_threads);
        ~EncodeServer ();
 
-       void run (int num_threads);
+       void run ();
 
 private:
+       void handle (boost::shared_ptr<Socket>);
        void worker_thread ();
        int process (boost::shared_ptr<Socket> socket, struct timeval &, struct timeval &);
        void broadcast_thread ();
        void broadcast_received ();
-       void start_accept ();
-       void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
-
-       bool _terminate;
 
        std::vector<boost::thread *> _worker_threads;
        std::list<boost::shared_ptr<Socket> > _queue;
-       boost::mutex _worker_mutex;
        boost::condition _full_condition;
        boost::condition _empty_condition;
        boost::shared_ptr<Log> _log;
        bool _verbose;
-
-       boost::asio::io_service _io_service;
-       boost::asio::ip::tcp::acceptor _acceptor;
+       int _num_threads;
 
        struct Broadcast {
 
index d7c34c0f438aab04c76b3ebb1de14d50ed83454c..de0bed029d890df5341a5d16a12956696843ba82 100644 (file)
@@ -46,7 +46,7 @@ class ExceptionStore
 {
 public:
        void rethrow () {
-               boost::mutex::scoped_lock lm (_mutex);
+               boost::mutex::scoped_lock lm (_exception_mutex);
                if (_exception) {
                        boost::exception_ptr tmp = _exception;
                        _exception = boost::exception_ptr ();
@@ -57,13 +57,13 @@ public:
 protected:
 
        void store_current () {
-               boost::mutex::scoped_lock lm (_mutex);
+               boost::mutex::scoped_lock lm (_exception_mutex);
                _exception = boost::current_exception ();
        }
 
 private:
        boost::exception_ptr _exception;
-       mutable boost::mutex _mutex;
+       mutable boost::mutex _exception_mutex;
 };
 
 #endif
diff --git a/src/lib/server.cc b/src/lib/server.cc
new file mode 100644 (file)
index 0000000..f137d07
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+    Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#include "server.h"
+#include "dcpomatic_socket.h"
+
+#include "i18n.h"
+
+using boost::shared_ptr;
+
+Server::Server (int port)
+       : _terminate (false)
+       , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port))
+{
+
+}
+
+Server::~Server ()
+{
+       boost::mutex::scoped_lock lm (_mutex);
+       _terminate = true;
+       _io_service.stop ();
+}
+
+void
+Server::run ()
+{
+       start_accept ();
+       _io_service.run ();
+}
+
+void
+Server::start_accept ()
+{
+       {
+               boost::mutex::scoped_lock lm (_mutex);
+               if (_terminate) {
+                       return;
+               }
+       }
+
+       shared_ptr<Socket> socket (new Socket);
+       _acceptor.async_accept (socket->socket (), boost::bind (&Server::handle_accept, this, socket, boost::asio::placeholders::error));
+}
+
+void
+Server::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+{
+       if (error) {
+               return;
+       }
+
+       handle (socket);
+       start_accept ();
+}
diff --git a/src/lib/server.h b/src/lib/server.h
new file mode 100644 (file)
index 0000000..3e7c794
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+    Copyright (C) 2015 Carl Hetherington <cth@carlh.net>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#ifndef DCPOMATIC_SERVER_H
+#define DCPOMATIC_SERVER_H
+
+#include <boost/thread.hpp>
+#include <boost/asio.hpp>
+#include <boost/thread/condition.hpp>
+#include <string>
+
+class Socket;
+
+class Server : public boost::noncopyable
+{
+public:
+       Server (int port);
+       virtual ~Server ();
+
+       virtual void run ();
+
+protected:
+       boost::mutex _mutex;
+       bool _terminate;
+
+private:
+       virtual void handle (boost::shared_ptr<Socket> socket) = 0;
+
+       void start_accept ();
+       void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
+
+       boost::asio::io_service _io_service;
+       boost::asio::ip::tcp::acceptor _acceptor;
+};
+
+#endif
index 0f0b0c14f1fe1d0587fe8fa9190e3cb7c1db7cd5..f614456859e5680c5a5332371b94ed95487665ca 100644 (file)
@@ -109,6 +109,7 @@ sources = """
           screen_kdm.cc
           send_kdm_email_job.cc
           send_problem_report_job.cc
+          server.cc
           single_stream_audio_content.cc
           sndfile_base.cc
           sndfile_content.cc
index e339b1966c9798c444708c364dda9a8e16e96d07..5fb8cb3afd4c8b5e60d4cb76ddfe516803c805fa 100644 (file)
@@ -291,8 +291,8 @@ private:
 
        void main_thread ()
        try {
-               EncodeServer server (server_log, false);
-               server.run (Config::instance()->num_local_encoding_threads ());
+               EncodeServer server (server_log, false, Config::instance()->num_local_encoding_threads());
+               server.run ();
        } catch (...) {
                store_current ();
        }
index 8f343b344ff2fb7b45c6d40b956fde682dc1aa0c..336d578eef2c881a49c50ccff7cddbf0be1e2304 100644 (file)
@@ -110,10 +110,10 @@ main (int argc, char* argv[])
                log.reset (new NullLog);
        }
 
-       EncodeServer server (log, verbose);
+       EncodeServer server (log, verbose, num_threads);
 
        try {
-               server.run (num_threads);
+               server.run ();
        } catch (boost::system::system_error& e) {
                if (e.code() == boost::system::errc::address_in_use) {
                        cerr << argv[0] << ": address already in use.  Is another DCP-o-matic server instance already running?\n";
index 6f7a170d0f47f5f17f8b9d3d96e5f9cd4839c159..e7502af0eadd12c053b634ff5a2d69e2b76352c2 100644 (file)
@@ -112,9 +112,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb)
 
        Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
 
-       EncodeServer* server = new EncodeServer (log, true);
+       EncodeServer* server = new EncodeServer (log, true, 2);
 
-       new thread (boost::bind (&EncodeServer::run, server, 2));
+       new thread (boost::bind (&EncodeServer::run, server));
 
        /* Let the server get itself ready */
        dcpomatic_sleep (1);
@@ -192,9 +192,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv)
 
        Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
 
-       EncodeServer* server = new EncodeServer (log, true);
+       EncodeServer* server = new EncodeServer (log, true, 2);
 
-       new thread (boost::bind (&EncodeServer::run, server, 2));
+       new thread (boost::bind (&EncodeServer::run, server));
 
        /* Let the server get itself ready */
        dcpomatic_sleep (1);
@@ -284,9 +284,9 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
 
        Data j2k_locally_encoded = j2k_frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
 
-       EncodeServer* server = new EncodeServer (log, true);
+       EncodeServer* server = new EncodeServer (log, true, 2);
 
-       new thread (boost::bind (&EncodeServer::run, server, 2));
+       new thread (boost::bind (&EncodeServer::run, server));
 
        /* Let the server get itself ready */
        dcpomatic_sleep (1);