Fix merge.
authorCarl Hetherington <cth@carlh.net>
Sun, 23 Sep 2012 17:46:09 +0000 (18:46 +0100)
committerCarl Hetherington <cth@carlh.net>
Sun, 23 Sep 2012 17:46:09 +0000 (18:46 +0100)
src/lib/dcp_video_frame.cc
src/lib/dcp_video_frame.h
src/lib/server.cc
src/lib/server.h
src/lib/util.cc
src/lib/util.h
test/test.cc

index b128f6fa068d629f603fd92f0c60f3cc7f4384e3..d8af3462d681b477bbd7859c5c618395e4a133e7 100644 (file)
@@ -296,8 +296,9 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv)
        asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
        asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
 
-       shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
-       socket->connect (*endpoint_iterator);
+       Socket socket;
+
+       socket.connect (*endpoint_iterator, 30);
 
 #ifdef DEBUG_HASH
        _input->hash ("Input for remote encoding (before sending)");
@@ -320,21 +321,19 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv)
                s << _input->line_size()[i] << " ";
        }
 
-       asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
+       socket.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
 
        for (int i = 0; i < _input->components(); ++i) {
-               asio::write (*socket, asio::buffer (_input->data()[i], _input->line_size()[i] * _input->lines(i)));
+               socket.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 30);
        }
 
-       SocketReader reader (socket);
-
        char buffer[32];
-       reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
-       reader.consume (strlen (buffer) + 1);
+       socket.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+       socket.consume (strlen (buffer) + 1);
        shared_ptr<EncodedData> e (new RemotelyEncodedData (atoi (buffer)));
 
        /* now read the rest */
-       reader.read_definite_and_consume (e->data(), e->size());
+       socket.read_definite_and_consume (e->data(), e->size(), 30);
 
 #ifdef DEBUG_HASH
        e->hash ("Encoded image (after receiving)");
@@ -375,12 +374,12 @@ EncodedData::write (shared_ptr<const Options> opt, int frame)
  *  @param socket Socket
  */
 void
-EncodedData::send (shared_ptr<asio::ip::tcp::socket> socket)
+EncodedData::send (shared_ptr<Socket> socket)
 {
        stringstream s;
        s << _size;
-       asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
-       asio::write (*socket, asio::buffer (_data, _size));
+       socket->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
+       socket->write (_data, _size, 30);
 }
 
 #ifdef DEBUG_HASH
index ee54bc0f5e5dbe8dac06e8eb00f4c3b66241eae2..da4e0c30168da272dbab043abc76655ed147fb86 100644 (file)
@@ -48,7 +48,7 @@ public:
 
        virtual ~EncodedData () {}
 
-       void send (boost::shared_ptr<boost::asio::ip::tcp::socket>);
+       void send (boost::shared_ptr<Socket> socket);
        void write (boost::shared_ptr<const Options>, int);
 
 #ifdef DEBUG_HASH
index f4e89455844c011e9c90870de2362f5964dcd979..8ca4260490aad273794390e76629dcc4de80ffef 100644 (file)
@@ -70,13 +70,11 @@ Server::Server (Log* log)
 }
 
 int
-Server::process (shared_ptr<asio::ip::tcp::socket> socket)
+Server::process (shared_ptr<Socket> socket)
 {
-       SocketReader reader (socket);
-       
        char buffer[128];
-       reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
-       reader.consume (strlen (buffer) + 1);
+       socket->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+       socket->consume (strlen (buffer) + 1);
        
        stringstream s (buffer);
        
@@ -123,7 +121,7 @@ Server::process (shared_ptr<asio::ip::tcp::socket> socket)
        }
        
        for (int i = 0; i < image->components(); ++i) {
-               reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
+               socket->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
        }
        
 #ifdef DEBUG_HASH
@@ -150,7 +148,7 @@ Server::worker_thread ()
                        _worker_condition.wait (lock);
                }
 
-               shared_ptr<asio::ip::tcp::socket> socket = _queue.front ();
+               shared_ptr<Socket> socket = _queue.front ();
                _queue.pop_front ();
                
                lock.unlock ();
@@ -192,12 +190,12 @@ Server::run (int num_threads)
        for (int i = 0; i < num_threads; ++i) {
                _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
        }
-       
+
        asio::io_service io_service;
        asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
        while (1) {
-               shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
-               acceptor.accept (*socket);
+               shared_ptr<Socket> socket (new Socket);
+               acceptor.accept (socket->socket ());
 
                mutex::scoped_lock lock (_worker_mutex);
                
index fac440a76134e3f90a308a2bea7fc8c7c458e7a3..32ba8dc4b96757fe0d3938317e2fe1c0c9abff2b 100644 (file)
@@ -28,6 +28,8 @@
 #include <boost/thread/condition.hpp>
 #include "log.h"
 
+class Socket;
+
 /** @class ServerDescription
  *  @brief Class to describe a server to which we can send encoding work.
  */
@@ -80,10 +82,10 @@ public:
 
 private:
        void worker_thread ();
-       int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
-       
+       int process (boost::shared_ptr<Socket> socket);
+
        std::vector<boost::thread *> _worker_threads;
-       std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue;
+       std::list<boost::shared_ptr<Socket> > _queue;
        boost::mutex _worker_mutex;
        boost::condition _worker_condition;
        Log* _log;
index cbf8decb94a9fb9141f74435023186e5044e5269..d12bd3e775a52a32187fa8ee5a4fc0d75d30c92a 100644 (file)
@@ -33,6 +33,8 @@
 #include <libssh/libssh.h>
 #include <signal.h>
 #include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+#include <boost/lambda/lambda.hpp>
 #include <openjpeg.h>
 #include <openssl/md5.h>
 #include <magick/MagickCore.h>
@@ -286,88 +288,6 @@ seconds (struct timeval t)
        return t.tv_sec + (double (t.tv_usec) / 1e6);
 }
 
-/** @param socket Socket to read from */
-SocketReader::SocketReader (shared_ptr<asio::ip::tcp::socket> socket)
-       : _socket (socket)
-       , _buffer_data (0)
-{
-
-}
-
-/** Mark some data as being `consumed', so that it will not be returned
- *  as data again.
- *  @param size Amount of data to consume, in bytes.
- */
-void
-SocketReader::consume (int size)
-{
-       assert (_buffer_data >= size);
-       
-       _buffer_data -= size;
-       if (_buffer_data > 0) {
-               /* Shift still-valid data to the start of the buffer */
-               memmove (_buffer, _buffer + size, _buffer_data);
-       }
-}
-
-/** Read a definite amount of data from our socket, and mark
- *  it as consumed.
- *  @param data Where to put the data.
- *  @param size Number of bytes to read.
- */
-void
-SocketReader::read_definite_and_consume (uint8_t* data, int size)
-{
-       int const from_buffer = min (_buffer_data, size);
-       if (from_buffer > 0) {
-               /* Get data from our buffer */
-               memcpy (data, _buffer, from_buffer);
-               consume (from_buffer);
-               /* Update our output state */
-               data += from_buffer;
-               size -= from_buffer;
-       }
-
-       /* read() the rest */
-       while (size > 0) {
-               int const n = asio::read (*_socket, asio::buffer (data, size));
-               if (n <= 0) {
-                       throw NetworkError ("could not read");
-               }
-
-               data += n;
-               size -= n;
-       }
-}
-
-/** Read as much data as is available, up to some limit.
- *  @param data Where to put the data.
- *  @param size Maximum amount of data to read.
- */
-void
-SocketReader::read_indefinite (uint8_t* data, int size)
-{
-       assert (size < int (sizeof (_buffer)));
-
-       /* Amount of extra data we need to read () */
-       int to_read = size - _buffer_data;
-       while (to_read > 0) {
-               /* read as much of it as we can (into our buffer) */
-               int const n = asio::read (*_socket, asio::buffer (_buffer + _buffer_data, to_read));
-               if (n <= 0) {
-                       throw NetworkError ("could not read");
-               }
-
-               to_read -= n;
-               _buffer_data += n;
-       }
-
-       assert (_buffer_data >= size);
-
-       /* copy data into the output buffer */
-       assert (size >= _buffer_data);
-       memcpy (data, _buffer, size);
-}
 
 #ifdef DVDOMATIC_POSIX
 void
@@ -517,3 +437,152 @@ colour_lut_index_to_name (int index)
        assert (false);
        return "";
 }
+
+Socket::Socket ()
+       : _deadline (_io_service)
+       , _socket (_io_service)
+       , _buffer_data (0)
+{
+       _deadline.expires_at (posix_time::pos_infin);
+       check ();
+}
+
+void
+Socket::check ()
+{
+       if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) {
+               _socket.close ();
+               _deadline.expires_at (posix_time::pos_infin);
+       }
+
+       _deadline.async_wait (boost::bind (&Socket::check, this));
+}
+
+void
+Socket::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout)
+{
+       system::error_code ec = asio::error::would_block;
+       _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1);
+       do {
+               _io_service.run_one();
+       } while (ec == asio::error::would_block);
+
+       if (ec || !_socket.is_open ()) {
+               throw NetworkError ("connect timed out");
+       }
+}
+
+void
+Socket::write (uint8_t const * data, int size, int timeout)
+{
+       _deadline.expires_from_now (posix_time::seconds (timeout));
+       system::error_code ec = asio::error::would_block;
+
+       asio::async_write (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1);
+       do {
+               _io_service.run_one ();
+       } while (ec == asio::error::would_block);
+
+       if (ec) {
+               throw NetworkError ("write timed out");
+       }
+}
+
+int
+Socket::read (uint8_t* data, int size, int timeout)
+{
+       _deadline.expires_from_now (posix_time::seconds (timeout));
+       system::error_code ec = asio::error::would_block;
+
+       int amount_read = 0;
+
+       _socket.async_read_some (
+               asio::buffer (data, size),
+               (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2)
+               );
+
+       do {
+               _io_service.run_one ();
+       } while (ec == asio::error::would_block);
+       
+       if (ec) {
+               amount_read = 0;
+       }
+
+       return amount_read;
+}
+
+/** Mark some data as being `consumed', so that it will not be returned
+ *  as data again.
+ *  @param size Amount of data to consume, in bytes.
+ */
+void
+Socket::consume (int size)
+{
+       assert (_buffer_data >= size);
+       
+       _buffer_data -= size;
+       if (_buffer_data > 0) {
+               /* Shift still-valid data to the start of the buffer */
+               memmove (_buffer, _buffer + size, _buffer_data);
+       }
+}
+
+/** Read a definite amount of data from our socket, and mark
+ *  it as consumed.
+ *  @param data Where to put the data.
+ *  @param size Number of bytes to read.
+ */
+void
+Socket::read_definite_and_consume (uint8_t* data, int size, int timeout)
+{
+       int const from_buffer = min (_buffer_data, size);
+       if (from_buffer > 0) {
+               /* Get data from our buffer */
+               memcpy (data, _buffer, from_buffer);
+               consume (from_buffer);
+               /* Update our output state */
+               data += from_buffer;
+               size -= from_buffer;
+       }
+
+       /* read() the rest */
+       while (size > 0) {
+               int const n = read (data, size, timeout);
+               if (n <= 0) {
+                       throw NetworkError ("could not read");
+               }
+
+               data += n;
+               size -= n;
+       }
+}
+
+/** Read as much data as is available, up to some limit.
+ *  @param data Where to put the data.
+ *  @param size Maximum amount of data to read.
+ */
+void
+Socket::read_indefinite (uint8_t* data, int size, int timeout)
+{
+       assert (size < int (sizeof (_buffer)));
+
+       /* Amount of extra data we need to read () */
+       int to_read = size - _buffer_data;
+       while (to_read > 0) {
+               /* read as much of it as we can (into our buffer) */
+               int const n = read (_buffer + _buffer_data, to_read, timeout);
+               if (n <= 0) {
+                       throw NetworkError ("could not read");
+               }
+
+               to_read -= n;
+               _buffer_data += n;
+       }
+
+       assert (_buffer_data >= size);
+
+       /* copy data into the output buffer */
+       assert (size >= _buffer_data);
+       memcpy (data, _buffer, size);
+}
index 568fe05d035ea56589be3e98639323da74df903b..d7f2330038cd653f0c8336fb1b11d9e209a67b9e 100644 (file)
@@ -56,29 +56,6 @@ enum ContentType {
 extern void md5_data (std::string, void const *, int);
 #endif
 
-/** @class SocketReader
- *  @brief A helper class from reading from sockets.
- *
- *  You can probably do this stuff directly in boost, but I'm not sure how.
- */
-class SocketReader
-{
-public:
-       SocketReader (boost::shared_ptr<boost::asio::ip::tcp::socket>);
-
-       void read_definite_and_consume (uint8_t *, int);
-       void read_indefinite (uint8_t *, int);
-       void consume (int);
-
-private:
-       /** socket we are reading from */
-       boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
-       /** a buffer for small reads */
-       uint8_t _buffer[256];
-       /** amount of valid data in the buffer */
-       int _buffer_data;
-};
-
 /** @class Size
  *  @brief Representation of the size of something */
 struct Size
@@ -136,4 +113,35 @@ extern std::string crop_string (Position, Size);
 extern int dcp_audio_sample_rate (int);
 extern std::string colour_lut_index_to_name (int index);
 
+class Socket
+{
+public:
+       Socket ();
+
+       boost::asio::ip::tcp::socket& socket () {
+               return _socket;
+       }
+
+       void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout);
+       void write (uint8_t const * data, int size, int timeout);
+       
+       void read_definite_and_consume (uint8_t* data, int size, int timeout);
+       void read_indefinite (uint8_t* data, int size, int timeout);
+       void consume (int amount);
+       
+private:
+       void check ();
+       int read (uint8_t* data, int size, int timeout);
+
+       Socket (Socket const &);
+
+       boost::asio::io_service _io_service;
+       boost::asio::deadline_timer _deadline;
+       boost::asio::ip::tcp::socket _socket;
+       /** a buffer for small reads */
+       uint8_t _buffer[256];
+       /** amount of valid data in the buffer */
+       int _buffer_data;
+};
+
 #endif
index 45d681be086d5115123437861c1200e9de4f88ba..638d526e05f760303a6b98945e86f098e27c961a 100644 (file)
@@ -254,6 +254,17 @@ BOOST_AUTO_TEST_CASE (paths_test)
        BOOST_CHECK_EQUAL (s.content_path(), "build/test/a/b/c/d/e/foo/bar/baz");
 }
 
+void
+do_remote_encode (shared_ptr<DCPVideoFrame> frame, ServerDescription* description, shared_ptr<EncodedData> locally_encoded)
+{
+       shared_ptr<EncodedData> remotely_encoded;
+       BOOST_CHECK_NO_THROW (remotely_encoded = frame->encode_remotely (description));
+       BOOST_CHECK (remotely_encoded);
+       
+       BOOST_CHECK_EQUAL (locally_encoded->size(), remotely_encoded->size());
+       BOOST_CHECK (memcmp (locally_encoded->data(), remotely_encoded->data(), locally_encoded->size()) == 0);
+}
+
 BOOST_AUTO_TEST_CASE (client_server_test)
 {
        shared_ptr<SimpleImage> image (new SimpleImage (PIX_FMT_RGB24, Size (1998, 1080)));
@@ -271,29 +282,36 @@ BOOST_AUTO_TEST_CASE (client_server_test)
 
        FileLog log ("build/test/client_server_test.log");
 
-       DCPVideoFrame frame (
-               image,
-               Size (1998, 1080),
-               0,
-               Scaler::from_id ("bicubic"),
-               0,
-               24,
-               "",
-               0,
-               200000000,
-               &log
+       shared_ptr<DCPVideoFrame> frame (
+               new DCPVideoFrame (
+                       image,
+                       Size (1998, 1080),
+                       0,
+                       Scaler::from_id ("bicubic"),
+                       0,
+                       24,
+                       "",
+                       0,
+                       200000000,
+                       &log
+                       )
                );
 
-       shared_ptr<EncodedData> locally_encoded = frame.encode_locally ();
+       shared_ptr<EncodedData> locally_encoded = frame->encode_locally ();
        
        Config::instance()->set_server_port (61920);
        Server* server = new Server (&log);
 
-       thread t (boost::bind (&Server::run, server, 1));
+       new thread (boost::bind (&Server::run, server, 2));
 
-       ServerDescription description ("localhost", 1);
-       shared_ptr<EncodedData> remotely_encoded = frame.encode_remotely (&description);
+       ServerDescription description ("localhost", 2);
 
-       BOOST_CHECK_EQUAL (locally_encoded->size(), remotely_encoded->size());
-       BOOST_CHECK (memcmp (locally_encoded->data(), remotely_encoded->data(), locally_encoded->size()) == 0);
+       list<thread*> threads;
+       for (int i = 0; i < 8; ++i) {
+               threads.push_back (new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)));
+       }
+
+       for (list<thread*>::iterator i = threads.begin(); i != threads.end(); ++i) {
+               (*i)->join ();
+       }
 }