Rename Server -> EncodeServer, ServerFinder -> EncodeServerFinder, ServerDescription...
authorCarl Hetherington <cth@carlh.net>
Sun, 6 Dec 2015 19:59:47 +0000 (19:59 +0000)
committerCarl Hetherington <cth@carlh.net>
Fri, 11 Dec 2015 11:56:22 +0000 (11:56 +0000)
26 files changed:
src/lib/config.cc
src/lib/dcp_video.cc
src/lib/dcp_video.h
src/lib/encode_server.cc [new file with mode: 0644]
src/lib/encode_server.h [new file with mode: 0644]
src/lib/encode_server_description.h [new file with mode: 0644]
src/lib/encode_server_finder.cc [new file with mode: 0644]
src/lib/encode_server_finder.h [new file with mode: 0644]
src/lib/encoder.cc
src/lib/encoder.h
src/lib/server.cc [deleted file]
src/lib/server.h [deleted file]
src/lib/server_description.h [deleted file]
src/lib/server_finder.cc [deleted file]
src/lib/server_finder.h [deleted file]
src/lib/wscript
src/tools/dcpomatic.cc
src/tools/dcpomatic_cli.cc
src/tools/dcpomatic_server.cc
src/tools/dcpomatic_server_cli.cc
src/tools/server_test.cc
src/wx/server_dialog.cc
src/wx/servers_list_dialog.cc
src/wx/servers_list_dialog.h
test/client_server_test.cc
test/test.cc

index 43a3e63..b9e25e3 100644 (file)
@@ -18,7 +18,6 @@
 */
 
 #include "config.h"
-#include "server.h"
 #include "filter.h"
 #include "ratio.h"
 #include "types.h"
@@ -38,6 +37,7 @@
 #include <boost/filesystem.hpp>
 #include <boost/algorithm/string.hpp>
 #include <boost/foreach.hpp>
+#include <boost/thread.hpp>
 #include <cstdlib>
 #include <fstream>
 #include <iostream>
index 26113a1..cd7d522 100644 (file)
@@ -31,7 +31,7 @@
 #include "dcp_video.h"
 #include "config.h"
 #include "exceptions.h"
-#include "server_description.h"
+#include "encode_server_description.h"
 #include "dcpomatic_socket.h"
 #include "image.h"
 #include "log.h"
@@ -151,7 +151,7 @@ DCPVideo::encode_locally (dcp::NoteHandler note)
  *  @return Encoded data.
  */
 Data
-DCPVideo::encode_remotely (ServerDescription serv, int timeout)
+DCPVideo::encode_remotely (EncodeServerDescription serv, int timeout)
 {
        boost::asio::io_service io_service;
        boost::asio::ip::tcp::resolver resolver (io_service);
index 05aa0ff..a61a757 100644 (file)
@@ -19,7 +19,7 @@
 */
 
 #include "types.h"
-#include "server_description.h"
+#include "encode_server_description.h"
 #include <libcxml/cxml.h>
 #include <dcp/data.h>
 
@@ -46,7 +46,7 @@ public:
        DCPVideo (boost::shared_ptr<const PlayerVideo>, cxml::ConstNodePtr, boost::shared_ptr<Log>);
 
        dcp::Data encode_locally (dcp::NoteHandler note);
-       dcp::Data encode_remotely (ServerDescription, int timeout = 30);
+       dcp::Data encode_remotely (EncodeServerDescription, int timeout = 30);
 
        int index () const {
                return _index;
diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc
new file mode 100644 (file)
index 0000000..3f30a36
--- /dev/null
@@ -0,0 +1,314 @@
+/*
+    Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+/** @file src/encode_server.cc
+ *  @brief Class to describe a server to which we can send
+ *  encoding work, and a class to implement such a server.
+ */
+
+#include "encode_server.h"
+#include "util.h"
+#include "dcpomatic_socket.h"
+#include "image.h"
+#include "dcp_video.h"
+#include "config.h"
+#include "cross.h"
+#include "player_video.h"
+#include "safe_stringstream.h"
+#include "raw_convert.h"
+#include "compose.hpp"
+#include "log.h"
+#include "encoded_log_entry.h"
+#include <libcxml/cxml.h>
+#include <libxml++/libxml++.h>
+#include <boost/algorithm/string.hpp>
+#include <boost/scoped_array.hpp>
+#include <boost/foreach.hpp>
+#include <string>
+#include <vector>
+#include <iostream>
+
+#include "i18n.h"
+
+#define LOG_GENERAL(...)    _log->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
+#define LOG_GENERAL_NC(...) _log->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
+#define LOG_ERROR(...)      _log->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
+#define LOG_ERROR_NC(...)   _log->log (__VA_ARGS__, LogEntry::TYPE_ERROR);
+
+using std::string;
+using std::vector;
+using std::list;
+using std::cout;
+using std::cerr;
+using std::fixed;
+using boost::shared_ptr;
+using boost::thread;
+using boost::bind;
+using boost::scoped_array;
+using boost::optional;
+using dcp::Size;
+using dcp::Data;
+
+EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose)
+       : _terminate (false)
+       , _log (log)
+       , _verbose (verbose)
+       , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
+{
+
+}
+
+EncodeServer::~EncodeServer ()
+{
+       {
+               boost::mutex::scoped_lock lm (_worker_mutex);
+               _terminate = true;
+               _empty_condition.notify_all ();
+               _full_condition.notify_all ();
+       }
+
+       BOOST_FOREACH (boost::thread* i, _worker_threads) {
+               DCPOMATIC_ASSERT (i->joinable ());
+               i->join ();
+               delete i;
+       }
+
+       _io_service.stop ();
+
+       _broadcast.io_service.stop ();
+       if (_broadcast.thread) {
+               DCPOMATIC_ASSERT (_broadcast.thread->joinable ());
+               _broadcast.thread->join ();
+       }
+}
+
+/** @param after_read Filled in with gettimeofday() after reading the input from the network.
+ *  @param after_encode Filled in with gettimeofday() after encoding the image.
+ */
+int
+EncodeServer::process (shared_ptr<Socket> socket, struct timeval& after_read, struct timeval& after_encode)
+{
+       uint32_t length = socket->read_uint32 ();
+       scoped_array<char> buffer (new char[length]);
+       socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
+
+       string s (buffer.get());
+       shared_ptr<cxml::Document> xml (new cxml::Document ("EncodingRequest"));
+       xml->read_string (s);
+       /* This is a double-check; the server shouldn't even be on the candidate list
+          if it is the wrong version, but it doesn't hurt to make sure here.
+       */
+       if (xml->number_child<int> ("Version") != SERVER_LINK_VERSION) {
+               cerr << "Mismatched server/client versions\n";
+               LOG_ERROR_NC ("Mismatched server/client versions");
+               return -1;
+       }
+
+       shared_ptr<PlayerVideo> pvf (new PlayerVideo (xml, socket));
+
+       DCPVideo dcp_video_frame (pvf, xml, _log);
+
+       gettimeofday (&after_read, 0);
+
+       Data encoded = dcp_video_frame.encode_locally (boost::bind (&Log::dcp_log, _log.get(), _1, _2));
+
+       gettimeofday (&after_encode, 0);
+
+       try {
+               socket->write (encoded.size());
+               socket->write (encoded.data().get(), encoded.size());
+       } catch (std::exception& e) {
+               cerr << "Send failed; frame " << dcp_video_frame.index() << "\n";
+               LOG_ERROR ("Send failed; frame %1", dcp_video_frame.index());
+               throw;
+       }
+
+       return dcp_video_frame.index ();
+}
+
+void
+EncodeServer::worker_thread ()
+{
+       while (true) {
+               boost::mutex::scoped_lock lock (_worker_mutex);
+               while (_queue.empty () && !_terminate) {
+                       _empty_condition.wait (lock);
+               }
+
+               if (_terminate) {
+                       return;
+               }
+
+               shared_ptr<Socket> socket = _queue.front ();
+               _queue.pop_front ();
+
+               lock.unlock ();
+
+               int frame = -1;
+               string ip;
+
+               struct timeval start;
+               struct timeval after_read;
+               struct timeval after_encode;
+               struct timeval end;
+
+               gettimeofday (&start, 0);
+
+               try {
+                       frame = process (socket, after_read, after_encode);
+                       ip = socket->socket().remote_endpoint().address().to_string();
+               } catch (std::exception& e) {
+                       cerr << "Error: " << e.what() << "\n";
+                       LOG_ERROR ("Error: %1", e.what());
+               }
+
+               gettimeofday (&end, 0);
+
+               socket.reset ();
+
+               lock.lock ();
+
+               if (frame >= 0) {
+                       struct timeval end;
+                       gettimeofday (&end, 0);
+
+                       shared_ptr<EncodedLogEntry> e (
+                               new EncodedLogEntry (
+                                       frame, ip,
+                                       seconds(after_read) - seconds(start),
+                                       seconds(after_encode) - seconds(after_read),
+                                       seconds(end) - seconds(after_encode)
+                                       )
+                               );
+
+                       if (_verbose) {
+                               cout << e->get() << "\n";
+                       }
+
+                       _log->log (e);
+               }
+
+               _full_condition.notify_all ();
+       }
+}
+
+void
+EncodeServer::run (int num_threads)
+{
+       LOG_GENERAL ("Server starting with %1 threads", num_threads);
+       if (_verbose) {
+               cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
+       }
+
+       for (int i = 0; i < num_threads; ++i) {
+               _worker_threads.push_back (new thread (bind (&EncodeServer::worker_thread, this)));
+       }
+
+       _broadcast.thread = new thread (bind (&EncodeServer::broadcast_thread, this));
+
+       start_accept ();
+       _io_service.run ();
+}
+
+void
+EncodeServer::broadcast_thread ()
+try
+{
+       boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
+       boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
+
+       _broadcast.socket = new boost::asio::ip::udp::socket (_broadcast.io_service);
+       _broadcast.socket->open (listen_endpoint.protocol ());
+       _broadcast.socket->bind (listen_endpoint);
+
+       _broadcast.socket->async_receive_from (
+               boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
+               _broadcast.send_endpoint,
+               boost::bind (&EncodeServer::broadcast_received, this)
+               );
+
+       _broadcast.io_service.run ();
+}
+catch (...)
+{
+       store_current ();
+}
+
+void
+EncodeServer::broadcast_received ()
+{
+       _broadcast.buffer[sizeof(_broadcast.buffer) - 1] = '\0';
+
+       if (strcmp (_broadcast.buffer, DCPOMATIC_HELLO) == 0) {
+               /* Reply to the client saying what we can do */
+               xmlpp::Document doc;
+               xmlpp::Element* root = doc.create_root_node ("ServerAvailable");
+               root->add_child("Threads")->add_child_text (raw_convert<string> (_worker_threads.size ()));
+               root->add_child("Version")->add_child_text (raw_convert<string> (SERVER_LINK_VERSION));
+               string xml = doc.write_to_string ("UTF-8");
+
+               if (_verbose) {
+                       cout << "Offering services to master " << _broadcast.send_endpoint.address().to_string () << "\n";
+               }
+               shared_ptr<Socket> socket (new Socket);
+               try {
+                       socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1));
+                       socket->write (xml.length() + 1);
+                       socket->write ((uint8_t *) xml.c_str(), xml.length() + 1);
+               } catch (...) {
+
+               }
+       }
+
+       _broadcast.socket->async_receive_from (
+               boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
+               _broadcast.send_endpoint, boost::bind (&EncodeServer::broadcast_received, this)
+               );
+}
+
+void
+EncodeServer::start_accept ()
+{
+       if (_terminate) {
+               return;
+       }
+
+       shared_ptr<Socket> socket (new Socket);
+       _acceptor.async_accept (socket->socket (), boost::bind (&EncodeServer::handle_accept, this, socket, boost::asio::placeholders::error));
+}
+
+void
+EncodeServer::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+{
+       if (error) {
+               return;
+       }
+
+       boost::mutex::scoped_lock lock (_worker_mutex);
+
+       /* Wait until the queue has gone down a bit */
+       while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
+               _full_condition.wait (lock);
+       }
+
+       _queue.push_back (socket);
+       _empty_condition.notify_all ();
+
+       start_accept ();
+}
diff --git a/src/lib/encode_server.h b/src/lib/encode_server.h
new file mode 100644 (file)
index 0000000..f6f1fc9
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+    Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#ifndef DCPOMATIC_ENCODE_SERVER_H
+#define DCPOMATIC_ENCODE_SERVER_H
+
+/** @file src/encode_server.h
+ *  @brief Server class.
+ */
+
+#include "exception_store.h"
+#include <boost/thread.hpp>
+#include <boost/asio.hpp>
+#include <boost/thread/condition.hpp>
+#include <string>
+
+class Socket;
+class Log;
+
+/** @class EncodeServer
+ *  @brief A class to run a server which can accept requests to perform JPEG2000
+ *  encoding work.
+ */
+class EncodeServer : public ExceptionStore, public boost::noncopyable
+{
+public:
+       EncodeServer (boost::shared_ptr<Log> log, bool verbose);
+       ~EncodeServer ();
+
+       void run (int num_threads);
+
+private:
+       void worker_thread ();
+       int process (boost::shared_ptr<Socket> socket, struct timeval &, struct timeval &);
+       void broadcast_thread ();
+       void broadcast_received ();
+       void start_accept ();
+       void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
+
+       bool _terminate;
+
+       std::vector<boost::thread *> _worker_threads;
+       std::list<boost::shared_ptr<Socket> > _queue;
+       boost::mutex _worker_mutex;
+       boost::condition _full_condition;
+       boost::condition _empty_condition;
+       boost::shared_ptr<Log> _log;
+       bool _verbose;
+
+       boost::asio::io_service _io_service;
+       boost::asio::ip::tcp::acceptor _acceptor;
+
+       struct Broadcast {
+
+               Broadcast ()
+                       : thread (0)
+                       , socket (0)
+               {}
+
+               boost::thread* thread;
+               boost::asio::ip::udp::socket* socket;
+               char buffer[64];
+               boost::asio::ip::udp::endpoint send_endpoint;
+               boost::asio::io_service io_service;
+
+       } _broadcast;
+};
+
+#endif
diff --git a/src/lib/encode_server_description.h b/src/lib/encode_server_description.h
new file mode 100644 (file)
index 0000000..2ff1b80
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+    Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#ifndef DCPOMATIC_ENCODE_SERVER_DESCRIPTION_H
+#define DCPOMATIC_ENCODE_SERVER_DESCRIPTION_H
+
+/** @class EncodeServerDescription
+ *  @brief Class to describe a server to which we can send encoding work.
+ */
+class EncodeServerDescription
+{
+public:
+       EncodeServerDescription ()
+               : _host_name ("")
+               , _threads (1)
+       {}
+
+       /** @param h Server host name or IP address in string form.
+        *  @param t Number of threads to use on the server.
+        */
+       EncodeServerDescription (std::string h, int t)
+               : _host_name (h)
+               , _threads (t)
+       {}
+
+       /* Default copy constructor is fine */
+
+       /** @return server's host name or IP address in string form */
+       std::string host_name () const {
+               return _host_name;
+       }
+
+       /** @return number of threads to use on the server */
+       int threads () const {
+               return _threads;
+       }
+
+       void set_host_name (std::string n) {
+               _host_name = n;
+       }
+
+       void set_threads (int t) {
+               _threads = t;
+       }
+
+private:
+       /** server's host name */
+       std::string _host_name;
+       /** number of threads to use on the server */
+       int _threads;
+};
+
+#endif
diff --git a/src/lib/encode_server_finder.cc b/src/lib/encode_server_finder.cc
new file mode 100644 (file)
index 0000000..aa76e39
--- /dev/null
@@ -0,0 +1,240 @@
+/*
+    Copyright (C) 2013-2015 Carl Hetherington <cth@carlh.net>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#include "encode_server_finder.h"
+#include "exceptions.h"
+#include "util.h"
+#include "config.h"
+#include "cross.h"
+#include "encode_server_description.h"
+#include "dcpomatic_socket.h"
+#include "raw_convert.h"
+#include <libcxml/cxml.h>
+#include <boost/lambda/lambda.hpp>
+#include <iostream>
+
+#include "i18n.h"
+
+using std::string;
+using std::list;
+using std::vector;
+using std::cout;
+using boost::shared_ptr;
+using boost::scoped_array;
+using boost::weak_ptr;
+
+EncodeServerFinder* EncodeServerFinder::_instance = 0;
+
+EncodeServerFinder::EncodeServerFinder ()
+       : _disabled (false)
+       , _search_thread (0)
+       , _listen_thread (0)
+       , _stop (false)
+{
+       Config::instance()->Changed.connect (boost::bind (&EncodeServerFinder::config_changed, this, _1));
+}
+
+void
+EncodeServerFinder::start ()
+{
+       _search_thread = new boost::thread (boost::bind (&EncodeServerFinder::search_thread, this));
+       _listen_thread = new boost::thread (boost::bind (&EncodeServerFinder::listen_thread, this));
+}
+
+EncodeServerFinder::~EncodeServerFinder ()
+{
+       _stop = true;
+
+       _search_condition.notify_all ();
+       if (_search_thread) {
+               DCPOMATIC_ASSERT (_search_thread->joinable ());
+               _search_thread->join ();
+       }
+
+       _listen_io_service.stop ();
+       if (_listen_thread) {
+               DCPOMATIC_ASSERT (_listen_thread->joinable ());
+               _listen_thread->join ();
+       }
+}
+
+void
+EncodeServerFinder::search_thread ()
+try
+{
+       boost::system::error_code error;
+       boost::asio::io_service io_service;
+       boost::asio::ip::udp::socket socket (io_service);
+       socket.open (boost::asio::ip::udp::v4(), error);
+       if (error) {
+               throw NetworkError ("failed to set up broadcast socket");
+       }
+
+        socket.set_option (boost::asio::ip::udp::socket::reuse_address (true));
+        socket.set_option (boost::asio::socket_base::broadcast (true));
+
+       string const data = DCPOMATIC_HELLO;
+
+       while (!_stop) {
+               if (Config::instance()->use_any_servers ()) {
+                       /* Broadcast to look for servers */
+                       try {
+                               boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1);
+                               socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
+                       } catch (...) {
+
+                       }
+               }
+
+               /* Query our `definite' servers (if there are any) */
+               vector<string> servers = Config::instance()->servers ();
+               for (vector<string>::const_iterator i = servers.begin(); i != servers.end(); ++i) {
+                       if (server_found (*i)) {
+                               /* Don't bother asking a server that we already know about */
+                               continue;
+                       }
+                       try {
+                               boost::asio::ip::udp::resolver resolver (io_service);
+                               boost::asio::ip::udp::resolver::query query (*i, raw_convert<string> (Config::instance()->server_port_base() + 1));
+                               boost::asio::ip::udp::endpoint end_point (*resolver.resolve (query));
+                               socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
+                       } catch (...) {
+
+                       }
+               }
+
+               boost::mutex::scoped_lock lm (_search_condition_mutex);
+               _search_condition.timed_wait (lm, boost::get_system_time() + boost::posix_time::seconds (10));
+       }
+}
+catch (...)
+{
+       store_current ();
+}
+
+void
+EncodeServerFinder::listen_thread ()
+try {
+       using namespace boost::asio::ip;
+
+       try {
+               _listen_acceptor.reset (new tcp::acceptor (_listen_io_service, tcp::endpoint (tcp::v4(), Config::instance()->server_port_base() + 1)));
+       } catch (...) {
+               boost::throw_exception (NetworkError (_("Could not listen for remote encode servers.  Perhaps another instance of DCP-o-matic is running.")));
+       }
+
+       start_accept ();
+       _listen_io_service.run ();
+}
+catch (...)
+{
+       store_current ();
+}
+
+void
+EncodeServerFinder::start_accept ()
+{
+       shared_ptr<Socket> socket (new Socket ());
+       _listen_acceptor->async_accept (
+               socket->socket(),
+               boost::bind (&EncodeServerFinder::handle_accept, this, boost::asio::placeholders::error, socket)
+               );
+}
+
+void
+EncodeServerFinder::handle_accept (boost::system::error_code ec, shared_ptr<Socket> socket)
+{
+       if (ec) {
+               start_accept ();
+               return;
+       }
+
+       uint32_t length;
+       socket->read (reinterpret_cast<uint8_t*> (&length), sizeof (uint32_t));
+       length = ntohl (length);
+
+       scoped_array<char> buffer (new char[length]);
+       socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
+
+       string s (buffer.get());
+       shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable"));
+       xml->read_string (s);
+
+       string const ip = socket->socket().remote_endpoint().address().to_string ();
+       if (!server_found (ip) && xml->optional_number_child<int>("Version").get_value_or (0) == SERVER_LINK_VERSION) {
+               EncodeServerDescription sd (ip, xml->number_child<int> ("Threads"));
+               {
+                       boost::mutex::scoped_lock lm (_servers_mutex);
+                       _servers.push_back (sd);
+               }
+               emit (boost::bind (boost::ref (ServersListChanged)));
+       }
+
+       start_accept ();
+}
+
+bool
+EncodeServerFinder::server_found (string ip) const
+{
+       boost::mutex::scoped_lock lm (_servers_mutex);
+       list<EncodeServerDescription>::const_iterator i = _servers.begin();
+       while (i != _servers.end() && i->host_name() != ip) {
+               ++i;
+       }
+
+       return i != _servers.end ();
+}
+
+EncodeServerFinder*
+EncodeServerFinder::instance ()
+{
+       if (!_instance) {
+               _instance = new EncodeServerFinder ();
+               _instance->start ();
+       }
+
+       return _instance;
+}
+
+void
+EncodeServerFinder::drop ()
+{
+       delete _instance;
+       _instance = 0;
+}
+
+list<EncodeServerDescription>
+EncodeServerFinder::servers () const
+{
+       boost::mutex::scoped_lock lm (_servers_mutex);
+       return _servers;
+}
+
+void
+EncodeServerFinder::config_changed (Config::Property what)
+{
+       if (what == Config::USE_ANY_SERVERS || what == Config::SERVERS) {
+               {
+                       boost::mutex::scoped_lock lm (_servers_mutex);
+                       _servers.clear ();
+               }
+               ServersListChanged ();
+               _search_condition.notify_all ();
+       }
+}
diff --git a/src/lib/encode_server_finder.h b/src/lib/encode_server_finder.h
new file mode 100644 (file)
index 0000000..49433ad
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+    Copyright (C) 2013-2015 Carl Hetherington <cth@carlh.net>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+/** @file  src/lib/encode_server_finder.h
+ *  @brief EncodeServerFinder class.
+ */
+
+#include "signaller.h"
+#include "encode_server_description.h"
+#include "config.h"
+#include "exception_store.h"
+#include <boost/signals2.hpp>
+#include <boost/thread/condition.hpp>
+
+class Socket;
+
+class EncodeServerFinder : public Signaller, public ExceptionStore
+{
+public:
+       static EncodeServerFinder* instance ();
+       static void drop ();
+
+       void disable () {
+               _disabled = true;
+       }
+
+       bool disabled () const {
+               return _disabled;
+       }
+
+       std::list<EncodeServerDescription> servers () const;
+
+       /** Emitted whenever the list of servers changes */
+       boost::signals2::signal<void ()> ServersListChanged;
+
+private:
+       EncodeServerFinder ();
+       ~EncodeServerFinder ();
+
+       void start ();
+
+       void search_thread ();
+       void listen_thread ();
+
+       bool server_found (std::string) const;
+       void start_accept ();
+       void handle_accept (boost::system::error_code ec, boost::shared_ptr<Socket> socket);
+
+       void config_changed (Config::Property what);
+
+       bool _disabled;
+
+       /** Thread to periodically issue broadcasts and requests to find encoding servers */
+       boost::thread* _search_thread;
+       /** Thread to listen to the responses from servers */
+       boost::thread* _listen_thread;
+
+       std::list<EncodeServerDescription> _servers;
+       mutable boost::mutex _servers_mutex;
+
+       boost::asio::io_service _listen_io_service;
+       boost::shared_ptr<boost::asio::ip::tcp::acceptor> _listen_acceptor;
+       bool _stop;
+
+       boost::condition _search_condition;
+       boost::mutex _search_condition_mutex;
+
+       static EncodeServerFinder* _instance;
+};
index bfb96f4..8ba794a 100644 (file)
 #include "log.h"
 #include "config.h"
 #include "dcp_video.h"
-#include "server.h"
 #include "cross.h"
 #include "writer.h"
-#include "server_finder.h"
+#include "encode_server_finder.h"
 #include "player.h"
 #include "player_video.h"
-#include "server_description.h"
+#include "encode_server_description.h"
 #include "compose.hpp"
 #include <libcxml/cxml.h>
 #include <boost/foreach.hpp>
@@ -79,8 +78,8 @@ Encoder::~Encoder ()
 void
 Encoder::begin ()
 {
-       if (!ServerFinder::instance()->disabled ()) {
-               _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
+       if (!EncodeServerFinder::instance()->disabled ()) {
+               _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
        }
 }
 
@@ -277,7 +276,7 @@ Encoder::terminate_threads ()
 }
 
 void
-Encoder::encoder_thread (optional<ServerDescription> server)
+Encoder::encoder_thread (optional<EncodeServerDescription> server)
 try
 {
        if (server) {
@@ -384,11 +383,11 @@ Encoder::servers_list_changed ()
 
        if (!Config::instance()->only_servers_encode ()) {
                for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
-                       _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
+                       _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ())));
                }
        }
 
-       BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
+       BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
                LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
                for (int j = 0; j < i.threads(); ++j) {
                        _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
index d5b1455..20f2951 100644 (file)
@@ -37,7 +37,7 @@
 #include <stdint.h>
 
 class Film;
-class ServerDescription;
+class EncodeServerDescription;
 class DCPVideo;
 class Writer;
 class Job;
@@ -75,7 +75,7 @@ private:
        void enqueue (boost::shared_ptr<PlayerVideo> f);
        void frame_done ();
 
-       void encoder_thread (boost::optional<ServerDescription>);
+       void encoder_thread (boost::optional<EncodeServerDescription>);
        void terminate_threads ();
        void servers_list_changed ();
 
diff --git a/src/lib/server.cc b/src/lib/server.cc
deleted file mode 100644 (file)
index 7879d0f..0000000
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
-    Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
-
-    This program is free software; you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation; either version 2 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program; if not, write to the Free Software
-    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-/** @file src/server.cc
- *  @brief Class to describe a server to which we can send
- *  encoding work, and a class to implement such a server.
- */
-
-#include "server.h"
-#include "util.h"
-#include "dcpomatic_socket.h"
-#include "image.h"
-#include "dcp_video.h"
-#include "config.h"
-#include "cross.h"
-#include "player_video.h"
-#include "safe_stringstream.h"
-#include "raw_convert.h"
-#include "compose.hpp"
-#include "log.h"
-#include "encoded_log_entry.h"
-#include <libcxml/cxml.h>
-#include <libxml++/libxml++.h>
-#include <boost/algorithm/string.hpp>
-#include <boost/scoped_array.hpp>
-#include <boost/foreach.hpp>
-#include <string>
-#include <vector>
-#include <iostream>
-
-#include "i18n.h"
-
-#define LOG_GENERAL(...)    _log->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
-#define LOG_GENERAL_NC(...) _log->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
-#define LOG_ERROR(...)      _log->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
-#define LOG_ERROR_NC(...)   _log->log (__VA_ARGS__, LogEntry::TYPE_ERROR);
-
-using std::string;
-using std::vector;
-using std::list;
-using std::cout;
-using std::cerr;
-using std::fixed;
-using boost::shared_ptr;
-using boost::thread;
-using boost::bind;
-using boost::scoped_array;
-using boost::optional;
-using dcp::Size;
-using dcp::Data;
-
-Server::Server (shared_ptr<Log> log, bool verbose)
-       : _terminate (false)
-       , _log (log)
-       , _verbose (verbose)
-       , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
-{
-
-}
-
-Server::~Server ()
-{
-       {
-               boost::mutex::scoped_lock lm (_worker_mutex);
-               _terminate = true;
-               _empty_condition.notify_all ();
-               _full_condition.notify_all ();
-       }
-
-       BOOST_FOREACH (boost::thread* i, _worker_threads) {
-               DCPOMATIC_ASSERT (i->joinable ());
-               i->join ();
-               delete i;
-       }
-
-       _io_service.stop ();
-
-       _broadcast.io_service.stop ();
-       if (_broadcast.thread) {
-               DCPOMATIC_ASSERT (_broadcast.thread->joinable ());
-               _broadcast.thread->join ();
-       }
-}
-
-/** @param after_read Filled in with gettimeofday() after reading the input from the network.
- *  @param after_encode Filled in with gettimeofday() after encoding the image.
- */
-int
-Server::process (shared_ptr<Socket> socket, struct timeval& after_read, struct timeval& after_encode)
-{
-       uint32_t length = socket->read_uint32 ();
-       scoped_array<char> buffer (new char[length]);
-       socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
-
-       string s (buffer.get());
-       shared_ptr<cxml::Document> xml (new cxml::Document ("EncodingRequest"));
-       xml->read_string (s);
-       /* This is a double-check; the server shouldn't even be on the candidate list
-          if it is the wrong version, but it doesn't hurt to make sure here.
-       */
-       if (xml->number_child<int> ("Version") != SERVER_LINK_VERSION) {
-               cerr << "Mismatched server/client versions\n";
-               LOG_ERROR_NC ("Mismatched server/client versions");
-               return -1;
-       }
-
-       shared_ptr<PlayerVideo> pvf (new PlayerVideo (xml, socket));
-
-       DCPVideo dcp_video_frame (pvf, xml, _log);
-
-       gettimeofday (&after_read, 0);
-
-       Data encoded = dcp_video_frame.encode_locally (boost::bind (&Log::dcp_log, _log.get(), _1, _2));
-
-       gettimeofday (&after_encode, 0);
-
-       try {
-               socket->write (encoded.size());
-               socket->write (encoded.data().get(), encoded.size());
-       } catch (std::exception& e) {
-               cerr << "Send failed; frame " << dcp_video_frame.index() << "\n";
-               LOG_ERROR ("Send failed; frame %1", dcp_video_frame.index());
-               throw;
-       }
-
-       return dcp_video_frame.index ();
-}
-
-void
-Server::worker_thread ()
-{
-       while (true) {
-               boost::mutex::scoped_lock lock (_worker_mutex);
-               while (_queue.empty () && !_terminate) {
-                       _empty_condition.wait (lock);
-               }
-
-               if (_terminate) {
-                       return;
-               }
-
-               shared_ptr<Socket> socket = _queue.front ();
-               _queue.pop_front ();
-
-               lock.unlock ();
-
-               int frame = -1;
-               string ip;
-
-               struct timeval start;
-               struct timeval after_read;
-               struct timeval after_encode;
-               struct timeval end;
-
-               gettimeofday (&start, 0);
-
-               try {
-                       frame = process (socket, after_read, after_encode);
-                       ip = socket->socket().remote_endpoint().address().to_string();
-               } catch (std::exception& e) {
-                       cerr << "Error: " << e.what() << "\n";
-                       LOG_ERROR ("Error: %1", e.what());
-               }
-
-               gettimeofday (&end, 0);
-
-               socket.reset ();
-
-               lock.lock ();
-
-               if (frame >= 0) {
-                       struct timeval end;
-                       gettimeofday (&end, 0);
-
-                       shared_ptr<EncodedLogEntry> e (
-                               new EncodedLogEntry (
-                                       frame, ip,
-                                       seconds(after_read) - seconds(start),
-                                       seconds(after_encode) - seconds(after_read),
-                                       seconds(end) - seconds(after_encode)
-                                       )
-                               );
-
-                       if (_verbose) {
-                               cout << e->get() << "\n";
-                       }
-
-                       _log->log (e);
-               }
-
-               _full_condition.notify_all ();
-       }
-}
-
-void
-Server::run (int num_threads)
-{
-       LOG_GENERAL ("Server starting with %1 threads", num_threads);
-       if (_verbose) {
-               cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
-       }
-
-       for (int i = 0; i < num_threads; ++i) {
-               _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
-       }
-
-       _broadcast.thread = new thread (bind (&Server::broadcast_thread, this));
-
-       start_accept ();
-       _io_service.run ();
-}
-
-void
-Server::broadcast_thread ()
-try
-{
-       boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
-       boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
-
-       _broadcast.socket = new boost::asio::ip::udp::socket (_broadcast.io_service);
-       _broadcast.socket->open (listen_endpoint.protocol ());
-       _broadcast.socket->bind (listen_endpoint);
-
-       _broadcast.socket->async_receive_from (
-               boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
-               _broadcast.send_endpoint,
-               boost::bind (&Server::broadcast_received, this)
-               );
-
-       _broadcast.io_service.run ();
-}
-catch (...)
-{
-       store_current ();
-}
-
-void
-Server::broadcast_received ()
-{
-       _broadcast.buffer[sizeof(_broadcast.buffer) - 1] = '\0';
-
-       if (strcmp (_broadcast.buffer, DCPOMATIC_HELLO) == 0) {
-               /* Reply to the client saying what we can do */
-               xmlpp::Document doc;
-               xmlpp::Element* root = doc.create_root_node ("ServerAvailable");
-               root->add_child("Threads")->add_child_text (raw_convert<string> (_worker_threads.size ()));
-               root->add_child("Version")->add_child_text (raw_convert<string> (SERVER_LINK_VERSION));
-               string xml = doc.write_to_string ("UTF-8");
-
-               if (_verbose) {
-                       cout << "Offering services to master " << _broadcast.send_endpoint.address().to_string () << "\n";
-               }
-               shared_ptr<Socket> socket (new Socket);
-               try {
-                       socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1));
-                       socket->write (xml.length() + 1);
-                       socket->write ((uint8_t *) xml.c_str(), xml.length() + 1);
-               } catch (...) {
-
-               }
-       }
-
-       _broadcast.socket->async_receive_from (
-               boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
-               _broadcast.send_endpoint, boost::bind (&Server::broadcast_received, this)
-               );
-}
-
-void
-Server::start_accept ()
-{
-       if (_terminate) {
-               return;
-       }
-
-       shared_ptr<Socket> socket (new Socket);
-       _acceptor.async_accept (socket->socket (), boost::bind (&Server::handle_accept, this, socket, boost::asio::placeholders::error));
-}
-
-void
-Server::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
-{
-       if (error) {
-               return;
-       }
-
-       boost::mutex::scoped_lock lock (_worker_mutex);
-
-       /* Wait until the queue has gone down a bit */
-       while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
-               _full_condition.wait (lock);
-       }
-
-       _queue.push_back (socket);
-       _empty_condition.notify_all ();
-
-       start_accept ();
-}
diff --git a/src/lib/server.h b/src/lib/server.h
deleted file mode 100644 (file)
index 97bc26f..0000000
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
-    Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
-
-    This program is free software; you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation; either version 2 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program; if not, write to the Free Software
-    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-#ifndef DCPOMATIC_SERVER_H
-#define DCPOMATIC_SERVER_H
-
-/** @file src/server.h
- *  @brief Server class.
- */
-
-#include "exception_store.h"
-#include <boost/thread.hpp>
-#include <boost/asio.hpp>
-#include <boost/thread/condition.hpp>
-#include <string>
-
-class Socket;
-class Log;
-
-/** @class Server
- *  @brief A class to run a server which can accept requests to perform JPEG2000
- *  encoding work.
- */
-class Server : public ExceptionStore, public boost::noncopyable
-{
-public:
-       Server (boost::shared_ptr<Log> log, bool verbose);
-       ~Server ();
-
-       void run (int num_threads);
-
-private:
-       void worker_thread ();
-       int process (boost::shared_ptr<Socket> socket, struct timeval &, struct timeval &);
-       void broadcast_thread ();
-       void broadcast_received ();
-       void start_accept ();
-       void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
-
-       bool _terminate;
-
-       std::vector<boost::thread *> _worker_threads;
-       std::list<boost::shared_ptr<Socket> > _queue;
-       boost::mutex _worker_mutex;
-       boost::condition _full_condition;
-       boost::condition _empty_condition;
-       boost::shared_ptr<Log> _log;
-       bool _verbose;
-
-       boost::asio::io_service _io_service;
-       boost::asio::ip::tcp::acceptor _acceptor;
-
-       struct Broadcast {
-
-               Broadcast ()
-                       : thread (0)
-                       , socket (0)
-               {}
-
-               boost::thread* thread;
-               boost::asio::ip::udp::socket* socket;
-               char buffer[64];
-               boost::asio::ip::udp::endpoint send_endpoint;
-               boost::asio::io_service io_service;
-
-       } _broadcast;
-};
-
-#endif
diff --git a/src/lib/server_description.h b/src/lib/server_description.h
deleted file mode 100644 (file)
index 35d2df3..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-    Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
-
-    This program is free software; you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation; either version 2 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program; if not, write to the Free Software
-    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-#ifndef DCPOMATIC_SERVER_DESCRIPTION_H
-#define DCPOMATIC_SERVER_DESCRIPTION_H
-
-/** @class ServerDescription
- *  @brief Class to describe a server to which we can send encoding work.
- */
-class ServerDescription
-{
-public:
-       ServerDescription ()
-               : _host_name ("")
-               , _threads (1)
-       {}
-
-       /** @param h Server host name or IP address in string form.
-        *  @param t Number of threads to use on the server.
-        */
-       ServerDescription (std::string h, int t)
-               : _host_name (h)
-               , _threads (t)
-       {}
-
-       /* Default copy constructor is fine */
-
-       /** @return server's host name or IP address in string form */
-       std::string host_name () const {
-               return _host_name;
-       }
-
-       /** @return number of threads to use on the server */
-       int threads () const {
-               return _threads;
-       }
-
-       void set_host_name (std::string n) {
-               _host_name = n;
-       }
-
-       void set_threads (int t) {
-               _threads = t;
-       }
-
-private:
-       /** server's host name */
-       std::string _host_name;
-       /** number of threads to use on the server */
-       int _threads;
-};
-
-#endif
diff --git a/src/lib/server_finder.cc b/src/lib/server_finder.cc
deleted file mode 100644 (file)
index 8685c83..0000000
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
-    Copyright (C) 2013-2015 Carl Hetherington <cth@carlh.net>
-
-    This program is free software; you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation; either version 2 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program; if not, write to the Free Software
-    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-#include "server_finder.h"
-#include "exceptions.h"
-#include "util.h"
-#include "config.h"
-#include "cross.h"
-#include "server_description.h"
-#include "dcpomatic_socket.h"
-#include "raw_convert.h"
-#include <libcxml/cxml.h>
-#include <boost/lambda/lambda.hpp>
-#include <iostream>
-
-#include "i18n.h"
-
-using std::string;
-using std::list;
-using std::vector;
-using std::cout;
-using boost::shared_ptr;
-using boost::scoped_array;
-using boost::weak_ptr;
-
-ServerFinder* ServerFinder::_instance = 0;
-
-ServerFinder::ServerFinder ()
-       : _disabled (false)
-       , _search_thread (0)
-       , _listen_thread (0)
-       , _stop (false)
-{
-       Config::instance()->Changed.connect (boost::bind (&ServerFinder::config_changed, this, _1));
-}
-
-void
-ServerFinder::start ()
-{
-       _search_thread = new boost::thread (boost::bind (&ServerFinder::search_thread, this));
-       _listen_thread = new boost::thread (boost::bind (&ServerFinder::listen_thread, this));
-}
-
-ServerFinder::~ServerFinder ()
-{
-       _stop = true;
-
-       _search_condition.notify_all ();
-       if (_search_thread) {
-               DCPOMATIC_ASSERT (_search_thread->joinable ());
-               _search_thread->join ();
-       }
-
-       _listen_io_service.stop ();
-       if (_listen_thread) {
-               DCPOMATIC_ASSERT (_listen_thread->joinable ());
-               _listen_thread->join ();
-       }
-}
-
-void
-ServerFinder::search_thread ()
-try
-{
-       boost::system::error_code error;
-       boost::asio::io_service io_service;
-       boost::asio::ip::udp::socket socket (io_service);
-       socket.open (boost::asio::ip::udp::v4(), error);
-       if (error) {
-               throw NetworkError ("failed to set up broadcast socket");
-       }
-
-        socket.set_option (boost::asio::ip::udp::socket::reuse_address (true));
-        socket.set_option (boost::asio::socket_base::broadcast (true));
-
-       string const data = DCPOMATIC_HELLO;
-
-       while (!_stop) {
-               if (Config::instance()->use_any_servers ()) {
-                       /* Broadcast to look for servers */
-                       try {
-                               boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1);
-                               socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
-                       } catch (...) {
-
-                       }
-               }
-
-               /* Query our `definite' servers (if there are any) */
-               vector<string> servers = Config::instance()->servers ();
-               for (vector<string>::const_iterator i = servers.begin(); i != servers.end(); ++i) {
-                       if (server_found (*i)) {
-                               /* Don't bother asking a server that we already know about */
-                               continue;
-                       }
-                       try {
-                               boost::asio::ip::udp::resolver resolver (io_service);
-                               boost::asio::ip::udp::resolver::query query (*i, raw_convert<string> (Config::instance()->server_port_base() + 1));
-                               boost::asio::ip::udp::endpoint end_point (*resolver.resolve (query));
-                               socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
-                       } catch (...) {
-
-                       }
-               }
-
-               boost::mutex::scoped_lock lm (_search_condition_mutex);
-               _search_condition.timed_wait (lm, boost::get_system_time() + boost::posix_time::seconds (10));
-       }
-}
-catch (...)
-{
-       store_current ();
-}
-
-void
-ServerFinder::listen_thread ()
-try {
-       using namespace boost::asio::ip;
-
-       try {
-               _listen_acceptor.reset (new tcp::acceptor (_listen_io_service, tcp::endpoint (tcp::v4(), Config::instance()->server_port_base() + 1)));
-       } catch (...) {
-               boost::throw_exception (NetworkError (_("Could not listen for remote encode servers.  Perhaps another instance of DCP-o-matic is running.")));
-       }
-
-       start_accept ();
-       _listen_io_service.run ();
-}
-catch (...)
-{
-       store_current ();
-}
-
-void
-ServerFinder::start_accept ()
-{
-       shared_ptr<Socket> socket (new Socket ());
-       _listen_acceptor->async_accept (
-               socket->socket(),
-               boost::bind (&ServerFinder::handle_accept, this, boost::asio::placeholders::error, socket)
-               );
-}
-
-void
-ServerFinder::handle_accept (boost::system::error_code ec, shared_ptr<Socket> socket)
-{
-       if (ec) {
-               start_accept ();
-               return;
-       }
-
-       uint32_t length;
-       socket->read (reinterpret_cast<uint8_t*> (&length), sizeof (uint32_t));
-       length = ntohl (length);
-
-       scoped_array<char> buffer (new char[length]);
-       socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
-
-       string s (buffer.get());
-       shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable"));
-       xml->read_string (s);
-
-       string const ip = socket->socket().remote_endpoint().address().to_string ();
-       if (!server_found (ip) && xml->optional_number_child<int>("Version").get_value_or (0) == SERVER_LINK_VERSION) {
-               ServerDescription sd (ip, xml->number_child<int> ("Threads"));
-               {
-                       boost::mutex::scoped_lock lm (_servers_mutex);
-                       _servers.push_back (sd);
-               }
-               emit (boost::bind (boost::ref (ServersListChanged)));
-       }
-
-       start_accept ();
-}
-
-bool
-ServerFinder::server_found (string ip) const
-{
-       boost::mutex::scoped_lock lm (_servers_mutex);
-       list<ServerDescription>::const_iterator i = _servers.begin();
-       while (i != _servers.end() && i->host_name() != ip) {
-               ++i;
-       }
-
-       return i != _servers.end ();
-}
-
-ServerFinder*
-ServerFinder::instance ()
-{
-       if (!_instance) {
-               _instance = new ServerFinder ();
-               _instance->start ();
-       }
-
-       return _instance;
-}
-
-void
-ServerFinder::drop ()
-{
-       delete _instance;
-       _instance = 0;
-}
-
-list<ServerDescription>
-ServerFinder::servers () const
-{
-       boost::mutex::scoped_lock lm (_servers_mutex);
-       return _servers;
-}
-
-void
-ServerFinder::config_changed (Config::Property what)
-{
-       if (what == Config::USE_ANY_SERVERS || what == Config::SERVERS) {
-               {
-                       boost::mutex::scoped_lock lm (_servers_mutex);
-                       _servers.clear ();
-               }
-               ServersListChanged ();
-               _search_condition.notify_all ();
-       }
-}
diff --git a/src/lib/server_finder.h b/src/lib/server_finder.h
deleted file mode 100644 (file)
index 08e138b..0000000
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
-    Copyright (C) 2013-2015 Carl Hetherington <cth@carlh.net>
-
-    This program is free software; you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation; either version 2 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program; if not, write to the Free Software
-    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-/** @file  src/lib/server_finder.h
- *  @brief ServerFinder class.
- */
-
-#include "signaller.h"
-#include "server_description.h"
-#include "config.h"
-#include "exception_store.h"
-#include <boost/signals2.hpp>
-#include <boost/thread/condition.hpp>
-
-class Socket;
-
-class ServerFinder : public Signaller, public ExceptionStore
-{
-public:
-       static ServerFinder* instance ();
-       static void drop ();
-
-       void disable () {
-               _disabled = true;
-       }
-
-       bool disabled () const {
-               return _disabled;
-       }
-
-       std::list<ServerDescription> servers () const;
-
-       /** Emitted whenever the list of servers changes */
-       boost::signals2::signal<void ()> ServersListChanged;
-
-private:
-       ServerFinder ();
-       ~ServerFinder ();
-
-       void start ();
-
-       void search_thread ();
-       void listen_thread ();
-
-       bool server_found (std::string) const;
-       void start_accept ();
-       void handle_accept (boost::system::error_code ec, boost::shared_ptr<Socket> socket);
-
-       void config_changed (Config::Property what);
-
-       bool _disabled;
-
-       /** Thread to periodically issue broadcasts and requests to find encoding servers */
-       boost::thread* _search_thread;
-       /** Thread to listen to the responses from servers */
-       boost::thread* _listen_thread;
-
-       std::list<ServerDescription> _servers;
-       mutable boost::mutex _servers_mutex;
-
-       boost::asio::io_service _listen_io_service;
-       boost::shared_ptr<boost::asio::ip::tcp::acceptor> _listen_acceptor;
-       bool _stop;
-
-       boost::condition _search_condition;
-       boost::mutex _search_condition_mutex;
-
-       static ServerFinder* _instance;
-};
index 7b272f5..0f0b0c1 100644 (file)
@@ -55,6 +55,8 @@ sources = """
           dolby_cp750.cc
           emailer.cc
           encoder.cc
+          encode_server.cc
+          encode_server_finder.cc
           encoded_log_entry.cc
           environment_info.cc
           examine_content_job.cc
@@ -107,8 +109,6 @@ sources = """
           screen_kdm.cc
           send_kdm_email_job.cc
           send_problem_report_job.cc
-          server.cc
-          server_finder.cc
           single_stream_audio_content.cc
           sndfile_base.cc
           sndfile_content.cc
index 9a350f8..0cd0bac 100644 (file)
@@ -49,7 +49,7 @@
 #include "lib/cinema.h"
 #include "lib/screen_kdm.h"
 #include "lib/send_kdm_email_job.h"
-#include "lib/server_finder.h"
+#include "lib/encode_server_finder.h"
 #include "lib/update_checker.h"
 #include "lib/cross.h"
 #include "lib/content_factory.h"
@@ -1044,7 +1044,7 @@ private:
        void check ()
        {
                try {
-                       ServerFinder::instance()->rethrow ();
+                       EncodeServerFinder::instance()->rethrow ();
                } catch (exception& e) {
                        error_dialog (0, std_to_wx (e.what ()));
                }
index 14b08d2..80e10da 100644 (file)
@@ -27,7 +27,7 @@
 #include "lib/config.h"
 #include "lib/log.h"
 #include "lib/signal_manager.h"
-#include "lib/server_finder.h"
+#include "lib/encode_server_finder.h"
 #include "lib/json_server.h"
 #include "lib/ratio.h"
 #include "lib/video_content.h"
@@ -119,7 +119,7 @@ show_servers ()
 {
        while (true) {
                int N = 0;
-               list<ServerDescription> servers = ServerFinder::instance()->servers ();
+               list<EncodeServerDescription> servers = EncodeServerFinder::instance()->servers ();
 
                if (Config::instance()->use_any_servers ()) {
                        if (servers.empty ()) {
@@ -128,7 +128,7 @@ show_servers ()
                        } else {
                                cout << std::left << setw(24) << "Host" << " Threads\n";
                                ++N;
-                               BOOST_FOREACH (ServerDescription const & i, servers) {
+                               BOOST_FOREACH (EncodeServerDescription const & i, servers) {
                                        cout << std::left << setw(24) << i.host_name() << " " << i.threads() << "\n";
                                        ++N;
                                }
@@ -144,7 +144,7 @@ show_servers ()
                                BOOST_FOREACH (string const & i, Config::instance()->servers()) {
                                        cout << std::left << setw(24) << i << " ";
                                        optional<int> threads;
-                                       BOOST_FOREACH (ServerDescription const & j, servers) {
+                                       BOOST_FOREACH (EncodeServerDescription const & j, servers) {
                                                if (i == j.host_name()) {
                                                        threads = j.threads();
                                                }
@@ -250,7 +250,7 @@ main (int argc, char* argv[])
        signal_manager = new SignalManager ();
 
        if (no_remote) {
-               ServerFinder::instance()->disable ();
+               EncodeServerFinder::instance()->disable ();
        }
 
        if (json_port) {
@@ -353,7 +353,7 @@ main (int argc, char* argv[])
        */
        JobManager::drop ();
 
-       ServerFinder::drop ();
+       EncodeServerFinder::drop ();
 
        return error ? EXIT_FAILURE : EXIT_SUCCESS;
 }
index cc783ed..e339b19 100644 (file)
@@ -21,7 +21,7 @@
 #include "wx/wx_signal_manager.h"
 #include "lib/util.h"
 #include "lib/encoded_log_entry.h"
-#include "lib/server.h"
+#include "lib/encode_server.h"
 #include "lib/config.h"
 #include "lib/log.h"
 #include "lib/raw_convert.h"
@@ -291,7 +291,7 @@ private:
 
        void main_thread ()
        try {
-               Server server (server_log, false);
+               EncodeServer server (server_log, false);
                server.run (Config::instance()->num_local_encoding_threads ());
        } catch (...) {
                store_current ();
index 81c553e..8f343b3 100644 (file)
@@ -26,7 +26,7 @@
 #include "lib/file_log.h"
 #include "lib/null_log.h"
 #include "lib/version.h"
-#include "lib/server.h"
+#include "lib/encode_server.h"
 #include <boost/array.hpp>
 #include <boost/asio.hpp>
 #include <boost/algorithm/string.hpp>
@@ -110,7 +110,7 @@ main (int argc, char* argv[])
                log.reset (new NullLog);
        }
 
-       Server server (log, verbose);
+       EncodeServer server (log, verbose);
 
        try {
                server.run (num_threads);
index fdfceb5..3307d8f 100644 (file)
@@ -21,7 +21,7 @@
 #include "lib/film.h"
 #include "lib/filter.h"
 #include "lib/util.h"
-#include "lib/server.h"
+#include "lib/encode_server.h"
 #include "lib/dcp_video.h"
 #include "lib/decoder.h"
 #include "lib/exceptions.h"
@@ -29,7 +29,7 @@
 #include "lib/video_decoder.h"
 #include "lib/player.h"
 #include "lib/player_video.h"
-#include "lib/server_description.h"
+#include "lib/encode_server_description.h"
 #include <getopt.h>
 #include <iostream>
 #include <iomanip>
@@ -43,7 +43,7 @@ using boost::shared_ptr;
 using dcp::Data;
 
 static shared_ptr<Film> film;
-static ServerDescription* server;
+static EncodeServerDescription* server;
 static shared_ptr<FileLog> log_ (new FileLog ("servomatictest.log"));
 static int frame_count = 0;
 
@@ -139,7 +139,7 @@ main (int argc, char* argv[])
        dcpomatic_setup ();
 
        try {
-               server = new ServerDescription (server_host, 1);
+               server = new EncodeServerDescription (server_host, 1);
                film.reset (new Film (film_dir));
                film->read_metadata ();
 
index 8b32ad3..18ed09e 100644 (file)
@@ -17,7 +17,7 @@
 
 */
 
-#include "lib/server.h"
+#include "lib/encode_server.h"
 #include "server_dialog.h"
 #include "wx_util.h"
 
index 0c46b1e..a05c42d 100644 (file)
@@ -19,8 +19,8 @@
 
 #include "servers_list_dialog.h"
 #include "wx_util.h"
-#include "lib/server_finder.h"
-#include "lib/server_description.h"
+#include "lib/encode_server_finder.h"
+#include "lib/encode_server_description.h"
 #include <boost/lexical_cast.hpp>
 #include <boost/foreach.hpp>
 
@@ -63,7 +63,9 @@ ServersListDialog::ServersListDialog (wxWindow* parent)
        s->Layout ();
        s->SetSizeHints (this);
 
-       _server_finder_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&ServersListDialog::servers_list_changed, this));
+       _server_finder_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
+               boost::bind (&ServersListDialog::servers_list_changed, this)
+               );
        servers_list_changed ();
 }
 
@@ -73,7 +75,7 @@ ServersListDialog::servers_list_changed ()
        _list->DeleteAllItems ();
 
        int n = 0;
-       BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
+       BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
                wxListItem list_item;
                list_item.SetId (n);
                _list->InsertItem (list_item);
index 6adc906..08d9c73 100644 (file)
@@ -17,7 +17,7 @@
 
 */
 
-#include "lib/server.h"
+#include "lib/encode_server.h"
 #include <wx/wx.h>
 #include <wx/listctrl.h>
 #include <boost/signals2.hpp>
index 8a3b0fe..6f7a170 100644 (file)
  *  @brief Test the server class.
  *
  *  Create a test image and then encode it using the standard mechanism
- *  and also using a Server object running on localhost.  Compare the resulting
+ *  and also using a EncodeServer object running on localhost.  Compare the resulting
  *  encoded data to check that they are the same.
  */
 
-#include "lib/server.h"
+#include "lib/encode_server.h"
 #include "lib/image.h"
 #include "lib/cross.h"
 #include "lib/dcp_video.h"
 #include "lib/player_video.h"
 #include "lib/raw_image_proxy.h"
 #include "lib/j2k_image_proxy.h"
-#include "lib/server_description.h"
+#include "lib/encode_server_description.h"
 #include "lib/file_log.h"
 #include <boost/test/unit_test.hpp>
 #include <boost/thread.hpp>
@@ -44,7 +44,7 @@ using boost::optional;
 using dcp::Data;
 
 void
-do_remote_encode (shared_ptr<DCPVideo> frame, ServerDescription description, Data locally_encoded)
+do_remote_encode (shared_ptr<DCPVideo> frame, EncodeServerDescription description, Data locally_encoded)
 {
        Data remotely_encoded;
        BOOST_CHECK_NO_THROW (remotely_encoded = frame->encode_remotely (description, 60));
@@ -112,14 +112,14 @@ BOOST_AUTO_TEST_CASE (client_server_test_rgb)
 
        Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
 
-       Server* server = new Server (log, true);
+       EncodeServer* server = new EncodeServer (log, true);
 
-       new thread (boost::bind (&Server::run, server, 2));
+       new thread (boost::bind (&EncodeServer::run, server, 2));
 
        /* Let the server get itself ready */
        dcpomatic_sleep (1);
 
-       ServerDescription description ("localhost", 2);
+       EncodeServerDescription description ("localhost", 2);
 
        list<thread*> threads;
        for (int i = 0; i < 8; ++i) {
@@ -192,14 +192,14 @@ BOOST_AUTO_TEST_CASE (client_server_test_yuv)
 
        Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
 
-       Server* server = new Server (log, true);
+       EncodeServer* server = new EncodeServer (log, true);
 
-       new thread (boost::bind (&Server::run, server, 2));
+       new thread (boost::bind (&EncodeServer::run, server, 2));
 
        /* Let the server get itself ready */
        dcpomatic_sleep (1);
 
-       ServerDescription description ("localhost", 2);
+       EncodeServerDescription description ("localhost", 2);
 
        list<thread*> threads;
        for (int i = 0; i < 8; ++i) {
@@ -284,14 +284,14 @@ BOOST_AUTO_TEST_CASE (client_server_test_j2k)
 
        Data j2k_locally_encoded = j2k_frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
 
-       Server* server = new Server (log, true);
+       EncodeServer* server = new EncodeServer (log, true);
 
-       new thread (boost::bind (&Server::run, server, 2));
+       new thread (boost::bind (&EncodeServer::run, server, 2));
 
        /* Let the server get itself ready */
        dcpomatic_sleep (1);
 
-       ServerDescription description ("localhost", 2);
+       EncodeServerDescription description ("localhost", 2);
 
        list<thread*> threads;
        for (int i = 0; i < 8; ++i) {
index 366bc93..e41cc65 100644 (file)
@@ -28,7 +28,7 @@
 #include "lib/job_manager.h"
 #include "lib/job.h"
 #include "lib/cross.h"
-#include "lib/server_finder.h"
+#include "lib/encode_server_finder.h"
 #include "lib/image.h"
 #include "lib/ratio.h"
 #include "lib/log_entry.h"
@@ -80,7 +80,7 @@ struct TestConfig
                Config::instance()->set_default_j2k_bandwidth (100000000);
                Config::instance()->set_log_types (LogEntry::TYPE_GENERAL | LogEntry::TYPE_WARNING | LogEntry::TYPE_ERROR);
 
-               ServerFinder::instance()->disable ();
+               EncodeServerFinder::instance()->disable ();
 
                signal_manager = new TestSignalManager ();
        }