Various work on server discovery; works on localhost.
authorCarl Hetherington <cth@carlh.net>
Tue, 5 Nov 2013 22:43:34 +0000 (22:43 +0000)
committerCarl Hetherington <cth@carlh.net>
Tue, 5 Nov 2013 22:43:34 +0000 (22:43 +0000)
src/lib/config.cc
src/lib/config.h
src/lib/cross.cc
src/lib/dcp_video_frame.cc
src/lib/encoder.cc
src/lib/encoder.h
src/lib/server.cc
src/lib/util.cc
src/lib/util.h
test/test.cc

index 0c6aed4a81455530b885c2c2efeaf6ffa9622c3f..02feecce89173ea0e03d73e3376bca9ca7858b30 100644 (file)
@@ -54,7 +54,7 @@ Config* Config::_instance = 0;
 /** Construct default configuration */
 Config::Config ()
        : _num_local_encoding_threads (max (2U, boost::thread::hardware_concurrency()))
-       , _server_port (6192)
+       , _server_port_base (6192)
        , _tms_path (".")
        , _sound_processor (SoundProcessor::from_id (N_("dolby_cp750")))
        , _default_still_length (10)
@@ -95,7 +95,12 @@ Config::read ()
 
        _num_local_encoding_threads = f.number_child<int> ("NumLocalEncodingThreads");
        _default_directory = f.string_child ("DefaultDirectory");
-       _server_port = f.number_child<int> ("ServerPort");
+
+       boost::optional<int> b = f.optional_number_child<int> ("ServerPort");
+       if (!b) {
+               b = f.optional_number_child<int> ("ServerPortBase");
+       }
+       _server_port_base = b.get ();
        
        list<shared_ptr<cxml::Node> > servers = f.node_children ("Server");
        for (list<shared_ptr<cxml::Node> >::iterator i = servers.begin(); i != servers.end(); ++i) {
@@ -191,7 +196,7 @@ Config::read_old_metadata ()
                } else if (k == N_("default_directory")) {
                        _default_directory = v;
                } else if (k == N_("server_port")) {
-                       _server_port = atoi (v.c_str ());
+                       _server_port_base = atoi (v.c_str ());
                } else if (k == N_("server")) {
                        optional<ServerDescription> server = ServerDescription::create_from_metadata (v);
                        if (server) {
@@ -287,7 +292,7 @@ Config::write () const
        root->add_child("Version")->add_child_text ("1");
        root->add_child("NumLocalEncodingThreads")->add_child_text (lexical_cast<string> (_num_local_encoding_threads));
        root->add_child("DefaultDirectory")->add_child_text (_default_directory.string ());
-       root->add_child("ServerPort")->add_child_text (lexical_cast<string> (_server_port));
+       root->add_child("ServerPortBase")->add_child_text (lexical_cast<string> (_server_port_base));
        
        for (vector<ServerDescription>::const_iterator i = _servers.begin(); i != _servers.end(); ++i) {
                i->as_xml (root->add_child ("Server"));
index 7dd5abd178ec498ee22daf7337e0386bd25dff91..07b3d68910edce82281a7295132cd85e60f32364 100644 (file)
@@ -59,9 +59,9 @@ public:
 
        boost::filesystem::path default_directory_or (boost::filesystem::path a) const;
 
-       /** @return port to use for J2K encoding servers */
-       int server_port () const {
-               return _server_port;
+       /** @return base port number to use for J2K encoding servers */
+       int server_port_base () const {
+               return _server_port_base;
        }
 
        /** @return J2K encoding servers to use */
@@ -156,8 +156,8 @@ public:
        }
 
        /** @param p New server port */
-       void set_server_port (int p) {
-               _server_port = p;
+       void set_server_port_base (int p) {
+               _server_port_base = p;
        }
 
        /** @param s New list of servers */
@@ -270,8 +270,10 @@ private:
        int _num_local_encoding_threads;
        /** default directory to put new films in */
        boost::filesystem::path _default_directory;
-       /** port to use for J2K encoding servers */
-       int _server_port;
+       /** base port number to use for J2K encoding servers;
+        *  this port and the one above it will be used.
+        */
+       int _server_port_base;
 
        /** J2K encoding servers to use */
        std::vector<ServerDescription> _servers;
index 94edb688b5791e9408a0dab369873bfe4d036321..45c38da2bbc2aedea59bc8b2103f0693f0685c04 100644 (file)
@@ -51,6 +51,7 @@ using std::wstring;
 using std::make_pair;
 using boost::shared_ptr;
 
+/** @param s Number of seconds to sleep for */
 void
 dcpomatic_sleep (int s)
 {
index 464cf672ff515f9c8c0bd026d9bb1e650433b1f5..25abd6f0df57ee7f9f0304e9713b919052639c00 100644 (file)
@@ -256,7 +256,7 @@ DCPVideoFrame::encode_remotely (ServerDescription serv)
 {
        boost::asio::io_service io_service;
        boost::asio::ip::tcp::resolver resolver (io_service);
-       boost::asio::ip::tcp::resolver::query query (serv.host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
+       boost::asio::ip::tcp::resolver::query query (serv.host_name(), boost::lexical_cast<string> (Config::instance()->server_port_base ()));
        boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
 
        shared_ptr<Socket> socket (new Socket);
index 924a91439c4d889d73ad75b6d916575e9525f798..f1d2375b609f4c1addea946466be534b5e5080ac 100644 (file)
@@ -22,6 +22,8 @@
  */
 
 #include <iostream>
+#include <boost/lambda/lambda.hpp>
+#include <libcxml/cxml.h>
 #include "encoder.h"
 #include "util.h"
 #include "film.h"
@@ -44,6 +46,7 @@ using std::min;
 using std::make_pair;
 using boost::shared_ptr;
 using boost::optional;
+using boost::scoped_array;
 
 int const Encoder::_history_size = 25;
 
@@ -53,6 +56,8 @@ Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
        , _job (j)
        , _video_frames_out (0)
        , _terminate (false)
+       , _broadcast_thread (0)
+       , _listen_thread (0)
 {
        _have_a_real_frame[EYES_BOTH] = false;
        _have_a_real_frame[EYES_LEFT] = false;
@@ -67,21 +72,41 @@ Encoder::~Encoder ()
        }
 }
 
+/** Add a worker thread for a remote server.  Caller must hold
+ *  a lock on _mutex, or know that one is not currently required to
+ *  safely modify _threads.
+ */
+void
+Encoder::add_worker_thread (ServerDescription d)
+{
+       _threads.push_back (
+               make_pair (d, new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)))
+               );
+}
+
 void
 Encoder::process_begin ()
 {
        for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
-               _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
+               _threads.push_back (
+                       make_pair (
+                               optional<ServerDescription> (),
+                               new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ()))
+                               )
+                       );
        }
 
        vector<ServerDescription> servers = Config::instance()->servers ();
 
        for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) {
                for (int j = 0; j < i->threads (); ++j) {
-                       _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
+                       add_worker_thread (*i);
                }
        }
 
+       _broadcast_thread = new boost::thread (boost::bind (&Encoder::broadcast_thread, this));
+       _listen_thread = new boost::thread (boost::bind (&Encoder::listen_thread, this));
+
        _writer.reset (new Writer (_film, _job));
 }
 
@@ -228,19 +253,30 @@ Encoder::process_audio (shared_ptr<const AudioBuffers> data)
 void
 Encoder::terminate_threads ()
 {
-       boost::mutex::scoped_lock lock (_mutex);
-       _terminate = true;
-       _condition.notify_all ();
-       lock.unlock ();
+       {
+               boost::mutex::scoped_lock lock (_mutex);
+               _terminate = true;
+               _condition.notify_all ();
+       }
 
-       for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
-               if ((*i)->joinable ()) {
-                       (*i)->join ();
+       for (ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) {
+               if (i->second->joinable ()) {
+                       i->second->join ();
                }
-               delete *i;
+               delete i->second;
        }
 
        _threads.clear ();
+                    
+       if (_broadcast_thread && _broadcast_thread->joinable ()) {
+               _broadcast_thread->join ();
+       }
+       delete _broadcast_thread;
+
+       if (_listen_thread && _listen_thread->joinable ()) {
+               _listen_thread->join ();
+       }
+       delete _listen_thread;
 }
 
 void
@@ -326,3 +362,81 @@ Encoder::encoder_thread (optional<ServerDescription> server)
                _condition.notify_all ();
        }
 }
+
+void
+Encoder::broadcast_thread ()
+{
+       boost::system::error_code error;
+       boost::asio::io_service io_service;
+       boost::asio::ip::udp::socket socket (io_service);
+       socket.open (boost::asio::ip::udp::v4(), error);
+       if (error) {
+               throw NetworkError ("failed to set up broadcast socket");
+       }
+
+        socket.set_option (boost::asio::ip::udp::socket::reuse_address (true));
+        socket.set_option (boost::asio::socket_base::broadcast (true));
+       
+        boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1);            
+
+       while (1) {
+               boost::mutex::scoped_lock lm (_mutex);
+               if (_terminate) {
+                       socket.close (error);
+                       return;
+               }
+               
+               string data = DCPOMATIC_HELLO;
+               socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
+
+               lm.unlock ();
+               dcpomatic_sleep (10);
+       }
+}
+
+void
+Encoder::listen_thread ()
+{
+       while (1) {
+               {
+                       /* See if we need to stop */
+                       boost::mutex::scoped_lock lm (_mutex);
+                       if (_terminate) {
+                               return;
+                       }
+               }
+
+               shared_ptr<Socket> sock (new Socket (10));
+
+               try {
+                       sock->accept (Config::instance()->server_port_base() + 1);
+               } catch (std::exception& e) {
+                       continue;
+               }
+
+               uint32_t length = sock->read_uint32 ();
+               scoped_array<char> buffer (new char[length]);
+               sock->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
+               
+               stringstream s (buffer.get());
+               shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable"));
+               xml->read_stream (s);
+
+               {
+                       /* See if we already know about this server */
+                       string const ip = sock->socket().remote_endpoint().address().to_string ();
+                       boost::mutex::scoped_lock lm (_mutex);
+                       ThreadList::iterator i = _threads.begin();
+                       while (i != _threads.end() && (!i->first || i->first->host_name() != ip)) {
+                               ++i;
+                       }
+
+                       if (i == _threads.end ()) {
+                               cout << "Adding a thread for " << ip << "\n";
+                               add_worker_thread (ServerDescription (ip, xml->number_child<int> ("Threads")));
+                       } else {
+                               cout << "Already know about " << ip << "\n";
+                       }
+               }
+       }
+}
index ab3f40762e4f512ae94012778ae8422171ac6261..e799c8469da652b838ee71ad1f3f14eecb09c623 100644 (file)
@@ -36,6 +36,7 @@ extern "C" {
 #include <libswresample/swresample.h>
 }
 #include "util.h"
+#include "config.h"
 
 class Image;
 class AudioBuffers;
@@ -83,6 +84,9 @@ private:
        
        void encoder_thread (boost::optional<ServerDescription>);
        void terminate_threads ();
+       void broadcast_thread ();
+       void listen_thread ();
+       void add_worker_thread (ServerDescription);
 
        /** Film that we are encoding */
        boost::shared_ptr<const Film> _film;
@@ -103,11 +107,16 @@ private:
        bool _have_a_real_frame[EYES_COUNT];
        bool _terminate;
        std::list<boost::shared_ptr<DCPVideoFrame> > _queue;
-       std::list<boost::thread *> _threads;
+       typedef std::list<std::pair<boost::optional<ServerDescription>, boost::thread *> > ThreadList;
+       ThreadList _threads;
        mutable boost::mutex _mutex;
        boost::condition _condition;
 
        boost::shared_ptr<Writer> _writer;
+
+       /** A thread to periodically issue broadcasts to find encoding servers */
+       boost::thread* _broadcast_thread;
+       boost::thread* _listen_thread;
 };
 
 #endif
index 19f27ab6a663d48442e93ec7d7c4f3d21a6da0c2..5010a2051df0e92e7e6b23efac3af963fb4f83f1 100644 (file)
@@ -53,6 +53,7 @@ using boost::thread;
 using boost::bind;
 using boost::scoped_array;
 using boost::optional;
+using boost::lexical_cast;
 using libdcp::Size;
 
 ServerDescription::ServerDescription (shared_ptr<const cxml::Node> node)
@@ -76,7 +77,7 @@ optional<ServerDescription>
 ServerDescription::create_from_metadata (string v)
 {
        vector<string> b;
-       split (b, v, is_any_of (N_(" ")));
+       split (b, v, is_any_of (" "));
 
        if (b.size() != 2) {
                return optional<ServerDescription> ();
@@ -152,7 +153,7 @@ Server::worker_thread ()
                try {
                        frame = process (socket);
                } catch (std::exception& e) {
-                       _log->log (String::compose (N_("Error: %1"), e.what()));
+                       _log->log (String::compose ("Error: %1", e.what()));
                }
                
                socket.reset ();
@@ -162,7 +163,8 @@ Server::worker_thread ()
                if (frame >= 0) {
                        struct timeval end;
                        gettimeofday (&end, 0);
-                       _log->log (String::compose (N_("Encoded frame %1 in %2"), frame, seconds (end) - seconds (start)));
+                       cout << String::compose ("Encoded frame %1 in %2", frame, seconds (end) - seconds (start)) << "\n";
+                       _log->log (String::compose ("Encoded frame %1 in %2", frame, seconds (end) - seconds (start)));
                }
                
                _worker_condition.notify_all ();
@@ -181,7 +183,12 @@ Server::run (int num_threads)
        _broadcast.thread = new thread (bind (&Server::broadcast_thread, this));
        
        boost::asio::io_service io_service;
-       boost::asio::ip::tcp::acceptor acceptor (io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port ()));
+
+       boost::asio::ip::tcp::acceptor acceptor (
+               io_service,
+               boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base ())
+               );
+       
        while (1) {
                shared_ptr<Socket> socket (new Socket);
                acceptor.accept (socket->socket ());
@@ -204,7 +211,7 @@ Server::broadcast_thread ()
        boost::asio::io_service io_service;
 
        boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
-       boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port ());
+       boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
 
        _broadcast.socket = new boost::asio::ip::udp::socket (io_service);
        _broadcast.socket->open (listen_endpoint.protocol ());
@@ -224,8 +231,24 @@ Server::broadcast_received ()
 {
        _broadcast.buffer[sizeof(_broadcast.buffer) - 1] = '\0';
 
-       cout << _broadcast.buffer << "\n";
-       
+       if (strcmp (_broadcast.buffer, DCPOMATIC_HELLO) == 0) {
+               /* Reply to the client saying what we can do */
+               xmlpp::Document doc;
+               xmlpp::Element* root = doc.create_root_node ("ServerAvailable");
+               root->add_child("Threads")->add_child_text (lexical_cast<string> (_worker_threads.size ()));
+               stringstream xml;
+               doc.write_to_stream (xml, "UTF-8");
+
+               shared_ptr<Socket> socket (new Socket);
+               try {
+                       socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1));
+                       socket->write (xml.str().length() + 1);
+                       socket->write ((uint8_t *) xml.str().c_str(), xml.str().length() + 1);
+               } catch (...) {
+
+               }
+       }
+               
        _broadcast.socket->async_receive_from (
                boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
                _broadcast.send_endpoint, boost::bind (&Server::broadcast_received, this)
index 96b834fcc88fe70d3f1be8429b25920a8322318f..e2ce94dd9ea8f75eaf84a4841afe17298a99c211 100644 (file)
@@ -90,6 +90,7 @@ using std::istream;
 using std::numeric_limits;
 using std::pair;
 using std::ofstream;
+using std::cout;
 using boost::shared_ptr;
 using boost::thread;
 using boost::lexical_cast;
@@ -518,17 +519,27 @@ dcp_audio_frame_rate (int fs)
 Socket::Socket (int timeout)
        : _deadline (_io_service)
        , _socket (_io_service)
+       , _acceptor (0)
        , _timeout (timeout)
 {
        _deadline.expires_at (boost::posix_time::pos_infin);
        check ();
 }
 
+Socket::~Socket ()
+{
+       delete _acceptor;
+}
+
 void
 Socket::check ()
 {
        if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now ()) {
-               _socket.close ();
+               if (_acceptor) {
+                       _acceptor->cancel ();
+               } else {
+                       _socket.close ();
+               }
                _deadline.expires_at (boost::posix_time::pos_infin);
        }
 
@@ -539,7 +550,7 @@ Socket::check ()
  *  @param endpoint End-point to connect to.
  */
 void
-Socket::connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint)
+Socket::connect (boost::asio::ip::tcp::endpoint endpoint)
 {
        _deadline.expires_from_now (boost::posix_time::seconds (_timeout));
        boost::system::error_code ec = boost::asio::error::would_block;
@@ -548,11 +559,35 @@ Socket::connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> con
                _io_service.run_one();
        } while (ec == boost::asio::error::would_block);
 
-       if (ec || !_socket.is_open ()) {
+       if (ec) {
+               throw NetworkError (ec.message ());
+       }
+
+       if (!_socket.is_open ()) {
                throw NetworkError (_("connect timed out"));
        }
 }
 
+void
+Socket::accept (int port)
+{
+       _acceptor = new boost::asio::ip::tcp::acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port));
+       
+       _deadline.expires_from_now (boost::posix_time::seconds (_timeout));
+       boost::system::error_code ec = boost::asio::error::would_block;
+       _acceptor->async_accept (_socket, boost::lambda::var(ec) = boost::lambda::_1);
+       do {
+               _io_service.run_one ();
+       } while (ec == boost::asio::error::would_block );
+
+       delete _acceptor;
+       _acceptor = 0;
+       
+       if (ec) {
+               throw NetworkError (ec.message ());
+       }
+}
+
 /** Blocking write.
  *  @param data Buffer to write.
  *  @param size Number of bytes to write.
index 351c4c4d93da445aded791b960ecc640a49b6657..5e568cc27934cb84828cea9a0a25743520dea1b9 100644 (file)
@@ -133,13 +133,15 @@ class Socket
 {
 public:
        Socket (int timeout = 30);
+       ~Socket ();
 
        /** @return Our underlying socket */
        boost::asio::ip::tcp::socket& socket () {
                return _socket;
        }
 
-       void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint);
+       void connect (boost::asio::ip::tcp::endpoint);
+       void accept (int);
 
        void write (uint32_t n);
        void write (uint8_t const * data, int size);
@@ -155,6 +157,7 @@ private:
        boost::asio::io_service _io_service;
        boost::asio::deadline_timer _deadline;
        boost::asio::ip::tcp::socket _socket;
+       boost::asio::ip::tcp::acceptor* _acceptor;
        int _timeout;
 };
 
index 1309439d21594e4a66d3f11bf024a9d89798cf6e..473626fac8a313a80d6235b19a54bbdc9bdab876 100644 (file)
@@ -58,7 +58,7 @@ struct TestConfig
 
                Config::instance()->set_num_local_encoding_threads (1);
                Config::instance()->set_servers (vector<ServerDescription> ());
-               Config::instance()->set_server_port (61920);
+               Config::instance()->set_server_port_base (61920);
                Config::instance()->set_default_dci_metadata (DCIMetadata ());
                Config::instance()->set_default_container (static_cast<Ratio*> (0));
                Config::instance()->set_default_dcp_content_type (static_cast<DCPContentType*> (0));