Rename SafeStringStream -> locked_stringstream. Bump deps for removal of stringstream.
[dcpomatic.git] / src / lib / server.cc
index 40d1c4c0ca9d769a036ab21240d26686382d6ab0..09e0a4bd0524cf91c6a4db010859713bda2d081f 100644 (file)
 /*
-    Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
+    Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
 
-    This program is free software; you can redistribute it and/or modify
+    This file is part of DCP-o-matic.
+
+    DCP-o-matic is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License as published by
     the Free Software Foundation; either version 2 of the License, or
     (at your option) any later version.
 
-    This program is distributed in the hope that it will be useful,
+    DCP-o-matic is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     GNU General Public License for more details.
 
     You should have received a copy of the GNU General Public License
-    along with this program; if not, write to the Free Software
-    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+    along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
 
 */
 
-/** @file src/server.cc
- *  @brief Class to describe a server to which we can send
- *  encoding work, and a class to implement such a server.
- */
-
-#include <string>
-#include <vector>
-#include <sstream>
-#include <iostream>
-#include <boost/algorithm/string.hpp>
-#include <boost/lexical_cast.hpp>
-#include <boost/scoped_array.hpp>
-#include <libcxml/cxml.h>
 #include "server.h"
-#include "util.h"
-#include "scaler.h"
-#include "image.h"
-#include "dcp_video_frame.h"
-#include "config.h"
+#include "dcpomatic_socket.h"
 
 #include "i18n.h"
 
-using std::string;
-using std::stringstream;
-using std::multimap;
-using std::vector;
 using boost::shared_ptr;
-using boost::algorithm::is_any_of;
-using boost::algorithm::split;
-using boost::thread;
-using boost::bind;
-using boost::scoped_array;
-using libdcp::Size;
-
-ServerDescription::ServerDescription (shared_ptr<const cxml::Node> node)
-{
-       _host_name = node->string_child ("HostName");
-       _threads = node->number_child<int> ("Threads");
-}
-
-void
-ServerDescription::as_xml (xmlpp::Node* root) const
-{
-       root->add_child("HostName")->add_child_text (_host_name);
-       root->add_child("Threads")->add_child_text (boost::lexical_cast<string> (_threads));
-}
 
-/** Create a server description from a string of metadata returned from as_metadata().
- *  @param v Metadata.
- *  @return ServerDescription, or 0.
- */
-ServerDescription *
-ServerDescription::create_from_metadata (string v)
+Server::Server (int port)
+       : _terminate (false)
+       , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port))
 {
-       vector<string> b;
-       split (b, v, is_any_of (N_(" ")));
-
-       if (b.size() != 2) {
-               return 0;
-       }
 
-       return new ServerDescription (b[0], atoi (b[1].c_str ()));
 }
 
-Server::Server (shared_ptr<Log> log)
-       : _log (log)
+Server::~Server ()
 {
-
+       boost::mutex::scoped_lock lm (_mutex);
+       _terminate = true;
+       _io_service.stop ();
 }
 
-int
-Server::process (shared_ptr<Socket> socket)
+void
+Server::run ()
 {
-       uint32_t length = socket->read_uint32 ();
-       scoped_array<char> buffer (new char[length]);
-       socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
-       
-       stringstream s (buffer.get());
-       multimap<string, string> kv = read_key_value (s);
-
-       if (get_required_string (kv, "encode") != "please") {
-               return -1;
-       }
-
-       libdcp::Size size (get_required_int (kv, "width"), get_required_int (kv, "height"));
-       int frame = get_required_int (kv, "frame");
-       int frames_per_second = get_required_int (kv, "frames_per_second");
-       int colour_lut_index = get_required_int (kv, "colour_lut");
-       int j2k_bandwidth = get_required_int (kv, "j2k_bandwidth");
-
-       /* This checks that colour_lut_index is within range */
-       colour_lut_index_to_name (colour_lut_index);
-
-       shared_ptr<Image> image (new SimpleImage (PIX_FMT_RGB24, size, true));
-
-       image->read_from_socket (socket);
-
-       DCPVideoFrame dcp_video_frame (
-               image, frame, frames_per_second, colour_lut_index, j2k_bandwidth, _log
-               );
-       
-       shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
-       try {
-               encoded->send (socket);
-       } catch (std::exception& e) {
-               _log->log (String::compose (
-                                  N_("Send failed; frame %1, data size %2, pixel format %3, image size %4x%5, %6 components"),
-                                  frame, encoded->size(), image->pixel_format(), image->size().width, image->size().height, image->components()
-                                  )
-                       );
-               throw;
-       }
-
-       return frame;
+       start_accept ();
+       _io_service.run ();
 }
 
 void
-Server::worker_thread ()
+Server::start_accept ()
 {
-       while (1) {
-               boost::mutex::scoped_lock lock (_worker_mutex);
-               while (_queue.empty ()) {
-                       _worker_condition.wait (lock);
+       {
+               boost::mutex::scoped_lock lm (_mutex);
+               if (_terminate) {
+                       return;
                }
-
-               shared_ptr<Socket> socket = _queue.front ();
-               _queue.pop_front ();
-               
-               lock.unlock ();
-
-               int frame = -1;
-
-               struct timeval start;
-               gettimeofday (&start, 0);
-               
-               try {
-                       frame = process (socket);
-               } catch (std::exception& e) {
-                       _log->log (String::compose (N_("Error: %1"), e.what()));
-               }
-               
-               socket.reset ();
-               
-               lock.lock ();
-
-               if (frame >= 0) {
-                       struct timeval end;
-                       gettimeofday (&end, 0);
-                       _log->log (String::compose (N_("Encoded frame %1 in %2"), frame, seconds (end) - seconds (start)));
-               }
-               
-               _worker_condition.notify_all ();
        }
+
+       shared_ptr<Socket> socket (new Socket);
+       _acceptor.async_accept (socket->socket (), boost::bind (&Server::handle_accept, this, socket, boost::asio::placeholders::error));
 }
 
 void
-Server::run (int num_threads)
+Server::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
 {
-       _log->log (String::compose (N_("Server starting with %1 threads"), num_threads));
-       
-       for (int i = 0; i < num_threads; ++i) {
-               _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
+       if (error) {
+               return;
        }
 
-       boost::asio::io_service io_service;
-       boost::asio::ip::tcp::acceptor acceptor (io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port ()));
-       while (1) {
-               shared_ptr<Socket> socket (new Socket);
-               acceptor.accept (socket->socket ());
-
-               boost::mutex::scoped_lock lock (_worker_mutex);
-               
-               /* Wait until the queue has gone down a bit */
-               while (int (_queue.size()) >= num_threads * 2) {
-                       _worker_condition.wait (lock);
-               }
-               
-               _queue.push_back (socket);
-               _worker_condition.notify_all ();
-       }
+       handle (socket);
+       start_accept ();
 }