asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
- shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
-
- DeadlineWrapper wrapper (io_service);
- wrapper.set_socket (socket);
+ DeadlineWrapper wrapper;
wrapper.connect (*endpoint_iterator, 30);
* @param socket Socket
*/
void
-EncodedData::send (DeadlineWrapper& wrapper)
+EncodedData::send (shared_ptr<DeadlineWrapper> wrapper)
{
stringstream s;
s << _size;
- wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
- wrapper.write (_data, _size, 30);
+ wrapper->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
+ wrapper->write (_data, _size, 30);
}
#ifdef DEBUG_HASH
virtual ~EncodedData () {}
- void send (DeadlineWrapper& wrapper);
+ void send (boost::shared_ptr<DeadlineWrapper> wrapper);
void write (boost::shared_ptr<const Options>, int);
#ifdef DEBUG_HASH
}
int
-Server::process (shared_ptr<asio::ip::tcp::socket> socket)
+Server::process (shared_ptr<DeadlineWrapper> wrapper)
{
- DeadlineWrapper wrapper (_io_service);
- wrapper.set_socket (socket);
-
char buffer[128];
- wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
- wrapper.consume (strlen (buffer) + 1);
+ wrapper->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ wrapper->consume (strlen (buffer) + 1);
stringstream s (buffer);
}
for (int i = 0; i < image->components(); ++i) {
- wrapper.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
+ wrapper->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
}
#ifdef DEBUG_HASH
_worker_condition.wait (lock);
}
- shared_ptr<asio::ip::tcp::socket> socket = _queue.front ();
+ shared_ptr<DeadlineWrapper> wrapper = _queue.front ();
_queue.pop_front ();
lock.unlock ();
gettimeofday (&start, 0);
try {
- frame = process (socket);
+ frame = process (wrapper);
} catch (std::exception& e) {
cerr << "Error: " << e.what() << "\n";
}
- socket.reset ();
+ wrapper.reset ();
lock.lock ();
for (int i = 0; i < num_threads; ++i) {
_worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
}
-
- asio::ip::tcp::acceptor acceptor (_io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
+
+ asio::io_service io_service;
+ asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
while (1) {
- shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (_io_service));
- acceptor.accept (*socket);
+ shared_ptr<DeadlineWrapper> wrapper (new DeadlineWrapper);
+ acceptor.accept (wrapper->socket ());
mutex::scoped_lock lock (_worker_mutex);
_worker_condition.wait (lock);
}
- _queue.push_back (socket);
+ _queue.push_back (wrapper);
_worker_condition.notify_all ();
}
}
#include <boost/thread/condition.hpp>
#include "log.h"
+class DeadlineWrapper;
+
/** @class ServerDescription
* @brief Class to describe a server to which we can send encoding work.
*/
private:
void worker_thread ();
- int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
+ int process (boost::shared_ptr<DeadlineWrapper> wrapper);
- boost::asio::io_service _io_service;
std::vector<boost::thread *> _worker_threads;
- std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue;
+ std::list<boost::shared_ptr<DeadlineWrapper> > _queue;
boost::mutex _worker_mutex;
boost::condition _worker_condition;
Log* _log;
return "";
}
-DeadlineWrapper::DeadlineWrapper (asio::io_service& io_service)
- : _io_service (io_service)
- , _deadline (io_service)
+DeadlineWrapper::DeadlineWrapper ()
+ : _deadline (_io_service)
+ , _socket (_io_service)
, _buffer_data (0)
{
_deadline.expires_at (posix_time::pos_infin);
check ();
}
-void
-DeadlineWrapper::set_socket (shared_ptr<asio::ip::tcp::socket> socket)
-{
- _socket = socket;
-}
-
void
DeadlineWrapper::check ()
{
if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) {
- if (_socket) {
- _socket->close ();
- }
+ _socket.close ();
_deadline.expires_at (posix_time::pos_infin);
}
void
DeadlineWrapper::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout)
{
- assert (_socket);
-
system::error_code ec = asio::error::would_block;
- _socket->async_connect (endpoint, lambda::var(ec) = lambda::_1);
+ _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1);
do {
_io_service.run_one();
} while (ec == asio::error::would_block);
- if (ec || !_socket->is_open ()) {
+ if (ec || !_socket.is_open ()) {
throw NetworkError ("connect timed out");
}
}
void
DeadlineWrapper::write (uint8_t const * data, int size, int timeout)
{
- assert (_socket);
-
_deadline.expires_from_now (posix_time::seconds (timeout));
system::error_code ec = asio::error::would_block;
- asio::async_write (*_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1);
+ asio::async_write (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1);
do {
_io_service.run_one ();
} while (ec == asio::error::would_block);
int
DeadlineWrapper::read (uint8_t* data, int size, int timeout)
{
- assert (_socket);
-
_deadline.expires_from_now (posix_time::seconds (timeout));
system::error_code ec = asio::error::would_block;
int amount_read = 0;
- _socket->async_read_some (
+ _socket.async_read_some (
asio::buffer (data, size),
(lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2)
);
class DeadlineWrapper
{
public:
- DeadlineWrapper (boost::asio::io_service& io_service);
+ DeadlineWrapper ();
- void set_socket (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
+ boost::asio::ip::tcp::socket& socket () {
+ return _socket;
+ }
void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout);
void write (uint8_t const * data, int size, int timeout);
DeadlineWrapper (DeadlineWrapper const &);
- boost::asio::io_service& _io_service;
+ boost::asio::io_service _io_service;
boost::asio::deadline_timer _deadline;
- boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
+ boost::asio::ip::tcp::socket _socket;
/** a buffer for small reads */
uint8_t _buffer[256];
/** amount of valid data in the buffer */
ServerDescription description ("localhost", 2);
- thread* a = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded));
- thread* b = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded));
- thread* c = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded));
- thread* d = new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded));
-
- a->join ();
- b->join ();
- c->join ();
- d->join ();
+ list<thread*> threads;
+ for (int i = 0; i < 8; ++i) {
+ threads.push_back (new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)));
+ }
+
+ for (list<thread*>::iterator i = threads.begin(); i != threads.end(); ++i) {
+ (*i)->join ();
+ }
}