{
asio::io_service io_service;
asio::ip::tcp::resolver resolver (io_service);
-
asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
- DeadlineWrapper wrapper;
+ Socket socket;
- wrapper.connect (*endpoint_iterator, 30);
+ socket.connect (*endpoint_iterator, 30);
#ifdef DEBUG_HASH
_input->hash ("Input for remote encoding (before sending)");
s << _input->line_size()[i] << " ";
}
- wrapper.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
+ socket.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
for (int i = 0; i < _input->components(); ++i) {
- wrapper.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 30);
+ socket.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 30);
}
char buffer[32];
- wrapper.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
- wrapper.consume (strlen (buffer) + 1);
+ socket.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ socket.consume (strlen (buffer) + 1);
shared_ptr<EncodedData> e (new RemotelyEncodedData (atoi (buffer)));
/* now read the rest */
- wrapper.read_definite_and_consume (e->data(), e->size(), 30);
+ socket.read_definite_and_consume (e->data(), e->size(), 30);
#ifdef DEBUG_HASH
e->hash ("Encoded image (after receiving)");
* @param socket Socket
*/
void
-EncodedData::send (shared_ptr<DeadlineWrapper> wrapper)
+EncodedData::send (shared_ptr<Socket> socket)
{
stringstream s;
s << _size;
- wrapper->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
- wrapper->write (_data, _size, 30);
+ socket->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
+ socket->write (_data, _size, 30);
}
#ifdef DEBUG_HASH
virtual ~EncodedData () {}
- void send (boost::shared_ptr<DeadlineWrapper> wrapper);
+ void send (boost::shared_ptr<Socket> socket);
void write (boost::shared_ptr<const Options>, int);
#ifdef DEBUG_HASH
}
int
-Server::process (shared_ptr<DeadlineWrapper> wrapper)
+Server::process (shared_ptr<Socket> socket)
{
char buffer[128];
- wrapper->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
- wrapper->consume (strlen (buffer) + 1);
+ socket->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ socket->consume (strlen (buffer) + 1);
stringstream s (buffer);
}
for (int i = 0; i < image->components(); ++i) {
- wrapper->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
+ socket->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
}
#ifdef DEBUG_HASH
DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, _log);
shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
- encoded->send (wrapper);
+ encoded->send (socket);
#ifdef DEBUG_HASH
encoded->hash ("Encoded image (as made by server and as sent back)");
_worker_condition.wait (lock);
}
- shared_ptr<DeadlineWrapper> wrapper = _queue.front ();
+ shared_ptr<Socket> socket = _queue.front ();
_queue.pop_front ();
lock.unlock ();
gettimeofday (&start, 0);
try {
- frame = process (wrapper);
+ frame = process (socket);
} catch (std::exception& e) {
cerr << "Error: " << e.what() << "\n";
}
- wrapper.reset ();
+ socket.reset ();
lock.lock ();
asio::io_service io_service;
asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
while (1) {
- shared_ptr<DeadlineWrapper> wrapper (new DeadlineWrapper);
- acceptor.accept (wrapper->socket ());
+ shared_ptr<Socket> socket (new Socket);
+ acceptor.accept (socket->socket ());
mutex::scoped_lock lock (_worker_mutex);
_worker_condition.wait (lock);
}
- _queue.push_back (wrapper);
+ _queue.push_back (socket);
_worker_condition.notify_all ();
}
}
#include <boost/thread/condition.hpp>
#include "log.h"
-class DeadlineWrapper;
+class Socket;
/** @class ServerDescription
* @brief Class to describe a server to which we can send encoding work.
private:
void worker_thread ();
- int process (boost::shared_ptr<DeadlineWrapper> wrapper);
+ int process (boost::shared_ptr<Socket> socket);
std::vector<boost::thread *> _worker_threads;
- std::list<boost::shared_ptr<DeadlineWrapper> > _queue;
+ std::list<boost::shared_ptr<Socket> > _queue;
boost::mutex _worker_mutex;
boost::condition _worker_condition;
Log* _log;
return "";
}
-DeadlineWrapper::DeadlineWrapper ()
+Socket::Socket ()
: _deadline (_io_service)
, _socket (_io_service)
, _buffer_data (0)
}
void
-DeadlineWrapper::check ()
+Socket::check ()
{
if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) {
_socket.close ();
_deadline.expires_at (posix_time::pos_infin);
}
- _deadline.async_wait (boost::bind (&DeadlineWrapper::check, this));
+ _deadline.async_wait (boost::bind (&Socket::check, this));
}
void
-DeadlineWrapper::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout)
+Socket::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout)
{
system::error_code ec = asio::error::would_block;
_socket.async_connect (endpoint, lambda::var(ec) = lambda::_1);
}
void
-DeadlineWrapper::write (uint8_t const * data, int size, int timeout)
+Socket::write (uint8_t const * data, int size, int timeout)
{
_deadline.expires_from_now (posix_time::seconds (timeout));
system::error_code ec = asio::error::would_block;
}
int
-DeadlineWrapper::read (uint8_t* data, int size, int timeout)
+Socket::read (uint8_t* data, int size, int timeout)
{
_deadline.expires_from_now (posix_time::seconds (timeout));
system::error_code ec = asio::error::would_block;
* @param size Amount of data to consume, in bytes.
*/
void
-DeadlineWrapper::consume (int size)
+Socket::consume (int size)
{
assert (_buffer_data >= size);
* @param size Number of bytes to read.
*/
void
-DeadlineWrapper::read_definite_and_consume (uint8_t* data, int size, int timeout)
+Socket::read_definite_and_consume (uint8_t* data, int size, int timeout)
{
int const from_buffer = min (_buffer_data, size);
if (from_buffer > 0) {
* @param size Maximum amount of data to read.
*/
void
-DeadlineWrapper::read_indefinite (uint8_t* data, int size, int timeout)
+Socket::read_indefinite (uint8_t* data, int size, int timeout)
{
assert (size < int (sizeof (_buffer)));
extern int dcp_audio_sample_rate (int);
extern std::string colour_lut_index_to_name (int index);
-class DeadlineWrapper
+class Socket
{
public:
- DeadlineWrapper ();
+ Socket ();
boost::asio::ip::tcp::socket& socket () {
return _socket;
void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout);
void write (uint8_t const * data, int size, int timeout);
- int read (uint8_t* data, int size, int timeout);
void read_definite_and_consume (uint8_t* data, int size, int timeout);
void read_indefinite (uint8_t* data, int size, int timeout);
private:
void check ();
+ int read (uint8_t* data, int size, int timeout);
- DeadlineWrapper (DeadlineWrapper const &);
+ Socket (Socket const &);
boost::asio::io_service _io_service;
boost::asio::deadline_timer _deadline;