asio::ip::tcp::resolver::query query (serv->host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
- shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
- socket->connect (*endpoint_iterator);
+ Socket socket;
+
+ socket.connect (*endpoint_iterator, 30);
#ifdef DEBUG_HASH
_input->hash ("Input for remote encoding (before sending)");
s << _input->line_size()[i] << " ";
}
- asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
+ socket.write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
for (int i = 0; i < _input->components(); ++i) {
- asio::write (*socket, asio::buffer (_input->data()[i], _input->line_size()[i] * _input->lines(i)));
+ socket.write (_input->data()[i], _input->line_size()[i] * _input->lines(i), 30);
}
- SocketReader reader (socket);
-
char buffer[32];
- reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
- reader.consume (strlen (buffer) + 1);
+ socket.read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ socket.consume (strlen (buffer) + 1);
shared_ptr<EncodedData> e (new RemotelyEncodedData (atoi (buffer)));
/* now read the rest */
- reader.read_definite_and_consume (e->data(), e->size());
+ socket.read_definite_and_consume (e->data(), e->size(), 30);
#ifdef DEBUG_HASH
e->hash ("Encoded image (after receiving)");
* @param socket Socket
*/
void
-EncodedData::send (shared_ptr<asio::ip::tcp::socket> socket)
+EncodedData::send (shared_ptr<Socket> socket)
{
stringstream s;
s << _size;
- asio::write (*socket, asio::buffer (s.str().c_str(), s.str().length() + 1));
- asio::write (*socket, asio::buffer (_data, _size));
+ socket->write ((uint8_t *) s.str().c_str(), s.str().length() + 1, 30);
+ socket->write (_data, _size, 30);
}
#ifdef DEBUG_HASH
virtual ~EncodedData () {}
- void send (boost::shared_ptr<boost::asio::ip::tcp::socket>);
+ void send (boost::shared_ptr<Socket> socket);
void write (boost::shared_ptr<const Options>, int);
#ifdef DEBUG_HASH
}
int
-Server::process (shared_ptr<asio::ip::tcp::socket> socket)
+Server::process (shared_ptr<Socket> socket)
{
- SocketReader reader (socket);
-
char buffer[128];
- reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
- reader.consume (strlen (buffer) + 1);
+ socket->read_indefinite ((uint8_t *) buffer, sizeof (buffer), 30);
+ socket->consume (strlen (buffer) + 1);
stringstream s (buffer);
}
for (int i = 0; i < image->components(); ++i) {
- reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
+ socket->read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i), 30);
}
#ifdef DEBUG_HASH
_worker_condition.wait (lock);
}
- shared_ptr<asio::ip::tcp::socket> socket = _queue.front ();
+ shared_ptr<Socket> socket = _queue.front ();
_queue.pop_front ();
lock.unlock ();
for (int i = 0; i < num_threads; ++i) {
_worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
}
-
+
asio::io_service io_service;
asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
while (1) {
- shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
- acceptor.accept (*socket);
+ shared_ptr<Socket> socket (new Socket);
+ acceptor.accept (socket->socket ());
mutex::scoped_lock lock (_worker_mutex);
#include <boost/thread/condition.hpp>
#include "log.h"
+class Socket;
+
/** @class ServerDescription
* @brief Class to describe a server to which we can send encoding work.
*/
private:
void worker_thread ();
- int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
-
+ int process (boost::shared_ptr<Socket> socket);
+
std::vector<boost::thread *> _worker_threads;
- std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue;
+ std::list<boost::shared_ptr<Socket> > _queue;
boost::mutex _worker_mutex;
boost::condition _worker_condition;
Log* _log;
#include <libssh/libssh.h>
#include <signal.h>
#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+#include <boost/lambda/lambda.hpp>
#include <openjpeg.h>
#include <openssl/md5.h>
#include <magick/MagickCore.h>
return t.tv_sec + (double (t.tv_usec) / 1e6);
}
-/** @param socket Socket to read from */
-SocketReader::SocketReader (shared_ptr<asio::ip::tcp::socket> socket)
- : _socket (socket)
- , _buffer_data (0)
-{
-
-}
-
-/** Mark some data as being `consumed', so that it will not be returned
- * as data again.
- * @param size Amount of data to consume, in bytes.
- */
-void
-SocketReader::consume (int size)
-{
- assert (_buffer_data >= size);
-
- _buffer_data -= size;
- if (_buffer_data > 0) {
- /* Shift still-valid data to the start of the buffer */
- memmove (_buffer, _buffer + size, _buffer_data);
- }
-}
-
-/** Read a definite amount of data from our socket, and mark
- * it as consumed.
- * @param data Where to put the data.
- * @param size Number of bytes to read.
- */
-void
-SocketReader::read_definite_and_consume (uint8_t* data, int size)
-{
- int const from_buffer = min (_buffer_data, size);
- if (from_buffer > 0) {
- /* Get data from our buffer */
- memcpy (data, _buffer, from_buffer);
- consume (from_buffer);
- /* Update our output state */
- data += from_buffer;
- size -= from_buffer;
- }
-
- /* read() the rest */
- while (size > 0) {
- int const n = asio::read (*_socket, asio::buffer (data, size));
- if (n <= 0) {
- throw NetworkError ("could not read");
- }
-
- data += n;
- size -= n;
- }
-}
-
-/** Read as much data as is available, up to some limit.
- * @param data Where to put the data.
- * @param size Maximum amount of data to read.
- */
-void
-SocketReader::read_indefinite (uint8_t* data, int size)
-{
- assert (size < int (sizeof (_buffer)));
-
- /* Amount of extra data we need to read () */
- int to_read = size - _buffer_data;
- while (to_read > 0) {
- /* read as much of it as we can (into our buffer) */
- int const n = asio::read (*_socket, asio::buffer (_buffer + _buffer_data, to_read));
- if (n <= 0) {
- throw NetworkError ("could not read");
- }
-
- to_read -= n;
- _buffer_data += n;
- }
-
- assert (_buffer_data >= size);
-
- /* copy data into the output buffer */
- assert (size >= _buffer_data);
- memcpy (data, _buffer, size);
-}
#ifdef DVDOMATIC_POSIX
void
assert (false);
return "";
}
+
+Socket::Socket ()
+ : _deadline (_io_service)
+ , _socket (_io_service)
+ , _buffer_data (0)
+{
+ _deadline.expires_at (posix_time::pos_infin);
+ check ();
+}
+
+void
+Socket::check ()
+{
+ if (_deadline.expires_at() <= asio::deadline_timer::traits_type::now ()) {
+ _socket.close ();
+ _deadline.expires_at (posix_time::pos_infin);
+ }
+
+ _deadline.async_wait (boost::bind (&Socket::check, this));
+}
+
+void
+Socket::connect (asio::ip::basic_resolver_entry<asio::ip::tcp> const & endpoint, int timeout)
+{
+ system::error_code ec = asio::error::would_block;
+ _socket.async_connect (endpoint, lambda::var(ec) = lambda::_1);
+ do {
+ _io_service.run_one();
+ } while (ec == asio::error::would_block);
+
+ if (ec || !_socket.is_open ()) {
+ throw NetworkError ("connect timed out");
+ }
+}
+
+void
+Socket::write (uint8_t const * data, int size, int timeout)
+{
+ _deadline.expires_from_now (posix_time::seconds (timeout));
+ system::error_code ec = asio::error::would_block;
+
+ asio::async_write (_socket, asio::buffer (data, size), lambda::var(ec) = lambda::_1);
+ do {
+ _io_service.run_one ();
+ } while (ec == asio::error::would_block);
+
+ if (ec) {
+ throw NetworkError ("write timed out");
+ }
+}
+
+int
+Socket::read (uint8_t* data, int size, int timeout)
+{
+ _deadline.expires_from_now (posix_time::seconds (timeout));
+ system::error_code ec = asio::error::would_block;
+
+ int amount_read = 0;
+
+ _socket.async_read_some (
+ asio::buffer (data, size),
+ (lambda::var(ec) = lambda::_1, lambda::var(amount_read) = lambda::_2)
+ );
+
+ do {
+ _io_service.run_one ();
+ } while (ec == asio::error::would_block);
+
+ if (ec) {
+ amount_read = 0;
+ }
+
+ return amount_read;
+}
+
+/** Mark some data as being `consumed', so that it will not be returned
+ * as data again.
+ * @param size Amount of data to consume, in bytes.
+ */
+void
+Socket::consume (int size)
+{
+ assert (_buffer_data >= size);
+
+ _buffer_data -= size;
+ if (_buffer_data > 0) {
+ /* Shift still-valid data to the start of the buffer */
+ memmove (_buffer, _buffer + size, _buffer_data);
+ }
+}
+
+/** Read a definite amount of data from our socket, and mark
+ * it as consumed.
+ * @param data Where to put the data.
+ * @param size Number of bytes to read.
+ */
+void
+Socket::read_definite_and_consume (uint8_t* data, int size, int timeout)
+{
+ int const from_buffer = min (_buffer_data, size);
+ if (from_buffer > 0) {
+ /* Get data from our buffer */
+ memcpy (data, _buffer, from_buffer);
+ consume (from_buffer);
+ /* Update our output state */
+ data += from_buffer;
+ size -= from_buffer;
+ }
+
+ /* read() the rest */
+ while (size > 0) {
+ int const n = read (data, size, timeout);
+ if (n <= 0) {
+ throw NetworkError ("could not read");
+ }
+
+ data += n;
+ size -= n;
+ }
+}
+
+/** Read as much data as is available, up to some limit.
+ * @param data Where to put the data.
+ * @param size Maximum amount of data to read.
+ */
+void
+Socket::read_indefinite (uint8_t* data, int size, int timeout)
+{
+ assert (size < int (sizeof (_buffer)));
+
+ /* Amount of extra data we need to read () */
+ int to_read = size - _buffer_data;
+ while (to_read > 0) {
+ /* read as much of it as we can (into our buffer) */
+ int const n = read (_buffer + _buffer_data, to_read, timeout);
+ if (n <= 0) {
+ throw NetworkError ("could not read");
+ }
+
+ to_read -= n;
+ _buffer_data += n;
+ }
+
+ assert (_buffer_data >= size);
+
+ /* copy data into the output buffer */
+ assert (size >= _buffer_data);
+ memcpy (data, _buffer, size);
+}
extern void md5_data (std::string, void const *, int);
#endif
-/** @class SocketReader
- * @brief A helper class from reading from sockets.
- *
- * You can probably do this stuff directly in boost, but I'm not sure how.
- */
-class SocketReader
-{
-public:
- SocketReader (boost::shared_ptr<boost::asio::ip::tcp::socket>);
-
- void read_definite_and_consume (uint8_t *, int);
- void read_indefinite (uint8_t *, int);
- void consume (int);
-
-private:
- /** socket we are reading from */
- boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
- /** a buffer for small reads */
- uint8_t _buffer[256];
- /** amount of valid data in the buffer */
- int _buffer_data;
-};
-
/** @class Size
* @brief Representation of the size of something */
struct Size
extern int dcp_audio_sample_rate (int);
extern std::string colour_lut_index_to_name (int index);
+class Socket
+{
+public:
+ Socket ();
+
+ boost::asio::ip::tcp::socket& socket () {
+ return _socket;
+ }
+
+ void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint, int timeout);
+ void write (uint8_t const * data, int size, int timeout);
+
+ void read_definite_and_consume (uint8_t* data, int size, int timeout);
+ void read_indefinite (uint8_t* data, int size, int timeout);
+ void consume (int amount);
+
+private:
+ void check ();
+ int read (uint8_t* data, int size, int timeout);
+
+ Socket (Socket const &);
+
+ boost::asio::io_service _io_service;
+ boost::asio::deadline_timer _deadline;
+ boost::asio::ip::tcp::socket _socket;
+ /** a buffer for small reads */
+ uint8_t _buffer[256];
+ /** amount of valid data in the buffer */
+ int _buffer_data;
+};
+
#endif
BOOST_CHECK_EQUAL (s.content_path(), "build/test/a/b/c/d/e/foo/bar/baz");
}
+void
+do_remote_encode (shared_ptr<DCPVideoFrame> frame, ServerDescription* description, shared_ptr<EncodedData> locally_encoded)
+{
+ shared_ptr<EncodedData> remotely_encoded;
+ BOOST_CHECK_NO_THROW (remotely_encoded = frame->encode_remotely (description));
+ BOOST_CHECK (remotely_encoded);
+
+ BOOST_CHECK_EQUAL (locally_encoded->size(), remotely_encoded->size());
+ BOOST_CHECK (memcmp (locally_encoded->data(), remotely_encoded->data(), locally_encoded->size()) == 0);
+}
+
BOOST_AUTO_TEST_CASE (client_server_test)
{
shared_ptr<SimpleImage> image (new SimpleImage (PIX_FMT_RGB24, Size (1998, 1080)));
FileLog log ("build/test/client_server_test.log");
- DCPVideoFrame frame (
- image,
- Size (1998, 1080),
- 0,
- Scaler::from_id ("bicubic"),
- 0,
- 24,
- "",
- 0,
- 200000000,
- &log
+ shared_ptr<DCPVideoFrame> frame (
+ new DCPVideoFrame (
+ image,
+ Size (1998, 1080),
+ 0,
+ Scaler::from_id ("bicubic"),
+ 0,
+ 24,
+ "",
+ 0,
+ 200000000,
+ &log
+ )
);
- shared_ptr<EncodedData> locally_encoded = frame.encode_locally ();
+ shared_ptr<EncodedData> locally_encoded = frame->encode_locally ();
Config::instance()->set_server_port (61920);
Server* server = new Server (&log);
- thread t (boost::bind (&Server::run, server, 1));
+ new thread (boost::bind (&Server::run, server, 2));
- ServerDescription description ("localhost", 1);
- shared_ptr<EncodedData> remotely_encoded = frame.encode_remotely (&description);
+ ServerDescription description ("localhost", 2);
- BOOST_CHECK_EQUAL (locally_encoded->size(), remotely_encoded->size());
- BOOST_CHECK (memcmp (locally_encoded->data(), remotely_encoded->data(), locally_encoded->size()) == 0);
+ list<thread*> threads;
+ for (int i = 0; i < 8; ++i) {
+ threads.push_back (new thread (boost::bind (do_remote_encode, frame, &description, locally_encoded)));
+ }
+
+ for (list<thread*>::iterator i = threads.begin(); i != threads.end(); ++i) {
+ (*i)->join ();
+ }
}