Use io_service per thread.
authorCarl Hetherington <cth@carlh.net>
Sun, 23 Sep 2012 16:50:31 +0000 (17:50 +0100)
committerCarl Hetherington <cth@carlh.net>
Sun, 23 Sep 2012 16:50:31 +0000 (17:50 +0100)
src/lib/dcp_video_frame.cc
src/lib/dcp_video_frame.h
src/lib/server.cc
src/lib/server.h
src/lib/util.cc
src/lib/util.h
test/test.cc

index ee29d860183d4ef36c1075079872fc0ab87b36cf..5c0ec6a6a828f5ce087dbe8ab21cca5c12b6ba68 100644 (file)
@@ -297,10 +297,7 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv)
        asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
        asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
 
-       shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
-
-       DeadlineWrapper wrapper (io_service);
-       wrapper.set_socket (socket);
+       DeadlineWrapper wrapper;
 
        wrapper.connect (*endpoint_iterator, 30);
 
@@ -378,12 +375,12 @@ EncodedData::write (shared_ptr<const Options> opt, int frame)
  *  @param socket Socket
  */
 void
-EncodedData::send (DeadlineWrapper& wrapper)
+EncodedData::send (shared_ptr<DeadlineWrapper> wrapper)
 {
        stringstream s;
        s << _size;
-       wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
-       wrapper.write (_data, _size, 30);
+       wrapper->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
+       wrapper->write (_data, _size, 30);
 }
 
 #ifdef DEBUG_HASH
index d82aee36782872e1c6f6ac3d6b85f0fdc7811b91..752f0dda7d3d7cefcce3cf979abc2fa549a85560 100644 (file)
@@ -48,7 +48,7 @@ public:
 
        virtual ~EncodedData () {}
 
-       void send (DeadlineWrapper& wrapper);
+       void send (boost::shared_ptr<DeadlineWrapper> wrapper);
        void write (boost::shared_ptr<const Options>, int);
 
 #ifdef DEBUG_HASH
index 9e61c22828bf31bb463e2f3036612c75c7bc01ef..1f860d2548b936fd67778ffcfdd7cb24edd78688 100644 (file)
@@ -70,14 +70,11 @@ Server::Server (Log* log)
 }
 
 int
-Server::process (shared_ptr<asio::ip::tcp::socket> socket)
+Server::process (shared_ptr<DeadlineWrapper> wrapper)
 {
-       DeadlineWrapper wrapper (_io_service);
-       wrapper.set_socket (socket);
-       
        char buffer[128];
-       wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
-       wrapper.consume (strlen (buffer) + 1);
+       wrapper->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+       wrapper->consume (strlen (buffer) + 1);
        
        stringstream s (buffer);
        
@@ -124,7 +121,7 @@ Server::process (shared_ptr<asio::ip::tcp::socket> socket)
        }
        
        for (int i = 0; i < image->components(); ++i) {
-               wrapper.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
+               wrapper->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
        }
        
 #ifdef DEBUG_HASH
@@ -151,7 +148,7 @@ Server::worker_thread ()
                        _worker_condition.wait (lock);
                }
 
-               shared_ptr<asio::ip::tcp::socket> socket = _queue.front ();
+               shared_ptr<DeadlineWrapper> wrapper = _queue.front ();
                _queue.pop_front ();
                
                lock.unlock ();
@@ -162,12 +159,12 @@ Server::worker_thread ()
                gettimeofday (&start, 0);
                
                try {
-                       frame = process (socket);
+                       frame = process (wrapper);
                } catch (std::exception& e) {
                        cerr << "Error: " << e.what() << "\n";
                }
                
-               socket.reset ();
+               wrapper.reset ();
                
                lock.lock ();
 
@@ -193,11 +190,12 @@ Server::run (int num_threads)
        for (int i = 0; i < num_threads; ++i) {
                _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
        }
-       
-       asio::ip::tcp::acceptor acceptor (_io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
+
+       asio::io_service io_service;
+       asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
        while (1) {
-               shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (_io_service));
-               acceptor.accept (*socket);
+               shared_ptr<DeadlineWrapper> wrapper (new DeadlineWrapper);
+               acceptor.accept (wrapper->socket ());
 
                mutex::scoped_lock lock (_worker_mutex);
                
@@ -206,7 +204,7 @@ Server::run (int num_threads)
                        _worker_condition.wait (lock);
                }
                
-               _queue.push_back (socket);
+               _queue.push_back (wrapper);
                _worker_condition.notify_all ();
        }
 }
index 7470814431e1c53822d8fd18e443006885506b4e..4cb6f25631dacad1b94af089cbe4c6f6812f87c9 100644 (file)
@@ -28,6 +28,8 @@
 #include <boost/thread/condition.hpp>
 #include "log.h"
 
+class DeadlineWrapper;
+
 /** @class ServerDescription
  *  @brief Class to describe a server to which we can send encoding work.
  */
@@ -80,11 +82,10 @@ public:
 
 private:
        void worker_thread ();
-       int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
+       int process (boost::shared_ptr<DeadlineWrapper> wrapper);
 
-       boost::asio::io_service _io_service;
        std::vector<boost::thread *> _worker_threads;
-       std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue;
+       std::list<boost::shared_ptr<DeadlineWrapper> > _queue;
        boost::mutex _worker_mutex;
        boost::condition _worker_condition;
        Log* _log;
index 3f48d696bbe217aabef7ae7f33b3eeaabe6a3384..8713c59221ca53231eb31fccd3304fd632d06a99 100644 (file)
@@ -438,28 +438,20 @@ colour_lut_index_to_name (int index)
        return "";
 }
 
-DeadlineWrapper::DeadlineWrapper (asio::io_service& io_service)
-       : _io_service (io_service)
-       , _deadline (io_service)
+DeadlineWrapper::DeadlineWrapper ()
+       : _deadline (_io_service)
+       , _socket (_io_service)
        , _buffer_data (0)
 {
        _deadline.expires_at (posix_time::pos_infin);
        check ();
 }
 
-void
-DeadlineWrapper::set_socket (shared_ptr<asio::ip::tcp::socket> socket)
-{
-       _socket = socket;
-}
-
 void
 DeadlineWrapper::check ()
 {
        if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) {
-               if (_socket) {
-                       _socket->close ();
-               }
+               _socket.close ();
                _deadline.expires_at (posix_time::pos_infin);
        }
 
@@ -469,15 +461,13 @@ DeadlineWrapper::check ()
 void
 DeadlineWrapper::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout)
 {
-       assert (_socket);
-       
        system::error_code ec = asio::error::would_block;
-       _socket->async_connect (endpoint, lambda::var(ec) = lambda::_1);
+       _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1);
        do {
                _io_service.run_one();
        } while (ec == asio::error::would_block);
 
-       if (ec || !_socket->is_open ()) {
+       if (ec || !_socket.is_open ()) {
                throw NetworkError ("connect timed out");
        }
 }
@@ -485,12 +475,10 @@ DeadlineWrapper::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const &
 void
 DeadlineWrapper::write (uint8_t const * data, int size, int timeout)
 {
-       assert (_socket);
-
        _deadline.expires_from_now (posix_time::seconds (timeout));
        system::error_code ec = asio::error::would_block;
 
-       asio::async_write (*_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1);
+       asio::async_write (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1);
        do {
                _io_service.run_one ();
        } while (ec == asio::error::would_block);
@@ -503,14 +491,12 @@ DeadlineWrapper::write (uint8_t const * data, int size, int timeout)
 int
 DeadlineWrapper::read (uint8_t* data, int size, int timeout)
 {
-       assert (_socket);
-
        _deadline.expires_from_now (posix_time::seconds (timeout));
        system::error_code ec = asio::error::would_block;
 
        int amount_read = 0;
 
-       _socket->async_read_some (
+       _socket.async_read_some (
                asio::buffer (data, size),
                (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2)
                );
index 2785a5dc1fdad25a0e7e06285c78450acd5b002b..8d6e2f5415f5a202b6b30c2baa0065bfe964827c 100644 (file)
@@ -116,9 +116,11 @@ extern std::string colour_lut_index_to_name (int index);
 class DeadlineWrapper
 {
 public:
-       DeadlineWrapper (boost::asio::io_service& io_service);
+       DeadlineWrapper ();
 
-       void set_socket (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
+       boost::asio::ip::tcp::socket& socket () {
+               return _socket;
+       }
 
        void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout);
        void write (uint8_t const * data, int size, int timeout);
@@ -133,9 +135,9 @@ private:
 
        DeadlineWrapper (DeadlineWrapper const &);
 
-       boost::asio::io_service& _io_service;
+       boost::asio::io_service _io_service;
        boost::asio::deadline_timer _deadline;
-       boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
+       boost::asio::ip::tcp::socket _socket;
        /** a buffer for small reads */
        uint8_t _buffer[256];
        /** amount of valid data in the buffer */
index b77eb2b510bd404725d80d87574e16d18a134e78..638d526e05f760303a6b98945e86f098e27c961a 100644 (file)
@@ -306,13 +306,12 @@ BOOST_AUTO_TEST_CASE (client_server_test)
 
        ServerDescription description ("localhost", 2);
 
-       thread* a = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded));
-       thread* b = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded));
-       thread* c = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded));
-       thread* d = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded));
-
-       a->join ();
-       b->join ();
-       c->join ();
-       d->join ();
+       list<thread*> threads;
+       for (int i = 0; i < 8; ++i) {
+               threads.push_back (new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)));
+       }
+
+       for (list<thread*>::iterator i = threads.begin(); i != threads.end(); ++i) {
+               (*i)->join ();
+       }
 }