From e89fb9d81358b51ed0e231725f7fb6eb63f96c5b Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Sun, 23 Sep 2012 17:50:31 +0100 Subject: [PATCH] Use io_service per thread. --- src/lib/dcp_video_frame.cc | 11 ++++------- src/lib/dcp_video_frame.h | 2 +- src/lib/server.cc | 28 +++++++++++++--------------- src/lib/server.h | 7 ++++--- src/lib/util.cc | 30 ++++++++---------------------- src/lib/util.h | 10 ++++++---- test/test.cc | 17 ++++++++--------- 7 files changed, 44 insertions(+), 61 deletions(-) diff --git a/src/lib/dcp_video_frame.cc b/src/lib/dcp_video_frame.cc index ee29d8601..5c0ec6a6a 100644 --- a/src/lib/dcp_video_frame.cc +++ b/src/lib/dcp_video_frame.cc @@ -297,10 +297,7 @@ DCPVideoFrame::encode_remotely (ServerDescription const * serv) asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast (Config::instance()->server_port ())); asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query); - shared_ptr socket (new asio::ip::tcp::socket (io_service)); - - DeadlineWrapper wrapper (io_service); - wrapper.set_socket (socket); + DeadlineWrapper wrapper; wrapper.connect (*endpoint_iterator, 30); @@ -378,12 +375,12 @@ EncodedData::write (shared_ptr opt, int frame) * @param socket Socket */ void -EncodedData::send (DeadlineWrapper& wrapper) +EncodedData::send (shared_ptr wrapper) { stringstream s; s << _size; - wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); - wrapper.write (_data, _size, 30); + wrapper->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30); + wrapper->write (_data, _size, 30); } #ifdef DEBUG_HASH diff --git a/src/lib/dcp_video_frame.h b/src/lib/dcp_video_frame.h index d82aee367..752f0dda7 100644 --- a/src/lib/dcp_video_frame.h +++ b/src/lib/dcp_video_frame.h @@ -48,7 +48,7 @@ public: virtual ~EncodedData () {} - void send (DeadlineWrapper& wrapper); + void send (boost::shared_ptr wrapper); void write (boost::shared_ptr, int); #ifdef DEBUG_HASH diff --git a/src/lib/server.cc b/src/lib/server.cc index 9e61c2282..1f860d254 100644 --- a/src/lib/server.cc +++ b/src/lib/server.cc @@ -70,14 +70,11 @@ Server::Server (Log* log) } int -Server::process (shared_ptr socket) +Server::process (shared_ptr wrapper) { - DeadlineWrapper wrapper (_io_service); - wrapper.set_socket (socket); - char buffer[128]; - wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); - wrapper.consume (strlen (buffer) + 1); + wrapper->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30); + wrapper->consume (strlen (buffer) + 1); stringstream s (buffer); @@ -124,7 +121,7 @@ Server::process (shared_ptr socket) } for (int i = 0; i < image->components(); ++i) { - wrapper.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30); + wrapper->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30); } #ifdef DEBUG_HASH @@ -151,7 +148,7 @@ Server::worker_thread () _worker_condition.wait (lock); } - shared_ptr socket = _queue.front (); + shared_ptr wrapper = _queue.front (); _queue.pop_front (); lock.unlock (); @@ -162,12 +159,12 @@ Server::worker_thread () gettimeofday (&start, 0); try { - frame = process (socket); + frame = process (wrapper); } catch (std::exception& e) { cerr << "Error: " << e.what() << "\n"; } - socket.reset (); + wrapper.reset (); lock.lock (); @@ -193,11 +190,12 @@ Server::run (int num_threads) for (int i = 0; i < num_threads; ++i) { _worker_threads.push_back (new thread (bind (&Server::worker_thread, this))); } - - asio::ip::tcp::acceptor acceptor (_io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); + + asio::io_service io_service; + asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ())); while (1) { - shared_ptr socket (new asio::ip::tcp::socket (_io_service)); - acceptor.accept (*socket); + shared_ptr wrapper (new DeadlineWrapper); + acceptor.accept (wrapper->socket ()); mutex::scoped_lock lock (_worker_mutex); @@ -206,7 +204,7 @@ Server::run (int num_threads) _worker_condition.wait (lock); } - _queue.push_back (socket); + _queue.push_back (wrapper); _worker_condition.notify_all (); } } diff --git a/src/lib/server.h b/src/lib/server.h index 747081443..4cb6f2563 100644 --- a/src/lib/server.h +++ b/src/lib/server.h @@ -28,6 +28,8 @@ #include #include "log.h" +class DeadlineWrapper; + /** @class ServerDescription * @brief Class to describe a server to which we can send encoding work. */ @@ -80,11 +82,10 @@ public: private: void worker_thread (); - int process (boost::shared_ptr socket); + int process (boost::shared_ptr wrapper); - boost::asio::io_service _io_service; std::vector _worker_threads; - std::list > _queue; + std::list > _queue; boost::mutex _worker_mutex; boost::condition _worker_condition; Log* _log; diff --git a/src/lib/util.cc b/src/lib/util.cc index 3f48d696b..8713c5922 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -438,28 +438,20 @@ colour_lut_index_to_name (int index) return ""; } -DeadlineWrapper::DeadlineWrapper (asio::io_service& io_service) - : _io_service (io_service) - , _deadline (io_service) +DeadlineWrapper::DeadlineWrapper () + : _deadline (_io_service) + , _socket (_io_service) , _buffer_data (0) { _deadline.expires_at (posix_time::pos_infin); check (); } -void -DeadlineWrapper::set_socket (shared_ptr socket) -{ - _socket = socket; -} - void DeadlineWrapper::check () { if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) { - if (_socket) { - _socket->close (); - } + _socket.close (); _deadline.expires_at (posix_time::pos_infin); } @@ -469,15 +461,13 @@ DeadlineWrapper::check () void DeadlineWrapper::connect (asio::ip::basic_resolver_entry const & endpoint, int timeout) { - assert (_socket); - system::error_code ec = asio::error::would_block; - _socket->async_connect (endpoint, lambda::var(ec) = lambda::_1); + _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1); do { _io_service.run_one(); } while (ec == asio::error::would_block); - if (ec || !_socket->is_open ()) { + if (ec || !_socket.is_open ()) { throw NetworkError ("connect timed out"); } } @@ -485,12 +475,10 @@ DeadlineWrapper::connect (asio::ip::basic_resolver_entry const & void DeadlineWrapper::write (uint8_t const * data, int size, int timeout) { - assert (_socket); - _deadline.expires_from_now (posix_time::seconds (timeout)); system::error_code ec = asio::error::would_block; - asio::async_write (*_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1); + asio::async_write (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1); do { _io_service.run_one (); } while (ec == asio::error::would_block); @@ -503,14 +491,12 @@ DeadlineWrapper::write (uint8_t const * data, int size, int timeout) int DeadlineWrapper::read (uint8_t* data, int size, int timeout) { - assert (_socket); - _deadline.expires_from_now (posix_time::seconds (timeout)); system::error_code ec = asio::error::would_block; int amount_read = 0; - _socket->async_read_some ( + _socket.async_read_some ( asio::buffer (data, size), (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2) ); diff --git a/src/lib/util.h b/src/lib/util.h index 2785a5dc1..8d6e2f541 100644 --- a/src/lib/util.h +++ b/src/lib/util.h @@ -116,9 +116,11 @@ extern std::string colour_lut_index_to_name (int index); class DeadlineWrapper { public: - DeadlineWrapper (boost::asio::io_service& io_service); + DeadlineWrapper (); - void set_socket (boost::shared_ptr socket); + boost::asio::ip::tcp::socket& socket () { + return _socket; + } void connect (boost::asio::ip::basic_resolver_entry const & endpoint, int timeout); void write (uint8_t const * data, int size, int timeout); @@ -133,9 +135,9 @@ private: DeadlineWrapper (DeadlineWrapper const &); - boost::asio::io_service& _io_service; + boost::asio::io_service _io_service; boost::asio::deadline_timer _deadline; - boost::shared_ptr _socket; + boost::asio::ip::tcp::socket _socket; /** a buffer for small reads */ uint8_t _buffer[256]; /** amount of valid data in the buffer */ diff --git a/test/test.cc b/test/test.cc index b77eb2b51..638d526e0 100644 --- a/test/test.cc +++ b/test/test.cc @@ -306,13 +306,12 @@ BOOST_AUTO_TEST_CASE (client_server_test) ServerDescription description ("localhost", 2); - thread* a = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)); - thread* b = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)); - thread* c = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)); - thread* d = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)); - - a->join (); - b->join (); - c->join (); - d->join (); + list threads; + for (int i = 0; i < 8; ++i) { + threads.push_back (new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded))); + } + + for (list::iterator i = threads.begin(); i != threads.end(); ++i) { + (*i)->join (); + } } -- 2.30.2