/** Construct default configuration */
Config::Config ()
: _num_local_encoding_threads (max (2U, boost::thread::hardware_concurrency()))
- , _server_port (6192)
+ , _server_port_base (6192)
, _tms_path (".")
, _sound_processor (SoundProcessor::from_id (N_("dolby_cp750")))
, _default_still_length (10)
_num_local_encoding_threads = f.number_child<int> ("NumLocalEncodingThreads");
_default_directory = f.string_child ("DefaultDirectory");
- _server_port = f.number_child<int> ("ServerPort");
+
+ boost::optional<int> b = f.optional_number_child<int> ("ServerPort");
+ if (!b) {
+ b = f.optional_number_child<int> ("ServerPortBase");
+ }
+ _server_port_base = b.get ();
list<shared_ptr<cxml::Node> > servers = f.node_children ("Server");
for (list<shared_ptr<cxml::Node> >::iterator i = servers.begin(); i != servers.end(); ++i) {
} else if (k == N_("default_directory")) {
_default_directory = v;
} else if (k == N_("server_port")) {
- _server_port = atoi (v.c_str ());
+ _server_port_base = atoi (v.c_str ());
} else if (k == N_("server")) {
optional<ServerDescription> server = ServerDescription::create_from_metadata (v);
if (server) {
root->add_child("Version")->add_child_text ("1");
root->add_child("NumLocalEncodingThreads")->add_child_text (lexical_cast<string> (_num_local_encoding_threads));
root->add_child("DefaultDirectory")->add_child_text (_default_directory.string ());
- root->add_child("ServerPort")->add_child_text (lexical_cast<string> (_server_port));
+ root->add_child("ServerPortBase")->add_child_text (lexical_cast<string> (_server_port_base));
for (vector<ServerDescription>::const_iterator i = _servers.begin(); i != _servers.end(); ++i) {
i->as_xml (root->add_child ("Server"));
boost::filesystem::path default_directory_or (boost::filesystem::path a) const;
- /** @return port to use for J2K encoding servers */
- int server_port () const {
- return _server_port;
+ /** @return base port number to use for J2K encoding servers */
+ int server_port_base () const {
+ return _server_port_base;
}
/** @return J2K encoding servers to use */
}
/** @param p New server port */
- void set_server_port (int p) {
- _server_port = p;
+ void set_server_port_base (int p) {
+ _server_port_base = p;
}
/** @param s New list of servers */
int _num_local_encoding_threads;
/** default directory to put new films in */
boost::filesystem::path _default_directory;
- /** port to use for J2K encoding servers */
- int _server_port;
+ /** base port number to use for J2K encoding servers;
+ * this port and the one above it will be used.
+ */
+ int _server_port_base;
/** J2K encoding servers to use */
std::vector<ServerDescription> _servers;
using std::make_pair;
using boost::shared_ptr;
+/** @param s Number of seconds to sleep for */
void
dcpomatic_sleep (int s)
{
{
boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver (io_service);
- boost::asio::ip::tcp::resolver::query query (serv.host_name(), boost::lexical_cast<string> (Config::instance()->server_port ()));
+ boost::asio::ip::tcp::resolver::query query (serv.host_name(), boost::lexical_cast<string> (Config::instance()->server_port_base ()));
boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve (query);
shared_ptr<Socket> socket (new Socket);
*/
#include <iostream>
+#include <boost/lambda/lambda.hpp>
+#include <libcxml/cxml.h>
#include "encoder.h"
#include "util.h"
#include "film.h"
using std::make_pair;
using boost::shared_ptr;
using boost::optional;
+using boost::scoped_array;
int const Encoder::_history_size = 25;
, _job (j)
, _video_frames_out (0)
, _terminate (false)
+ , _broadcast_thread (0)
+ , _listen_thread (0)
{
_have_a_real_frame[EYES_BOTH] = false;
_have_a_real_frame[EYES_LEFT] = false;
}
}
+/** Add a worker thread for a remote server. Caller must hold
+ * a lock on _mutex, or know that one is not currently required to
+ * safely modify _threads.
+ */
+void
+Encoder::add_worker_thread (ServerDescription d)
+{
+ _threads.push_back (
+ make_pair (d, new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)))
+ );
+}
+
void
Encoder::process_begin ()
{
for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
- _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
+ _threads.push_back (
+ make_pair (
+ optional<ServerDescription> (),
+ new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ()))
+ )
+ );
}
vector<ServerDescription> servers = Config::instance()->servers ();
for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) {
for (int j = 0; j < i->threads (); ++j) {
- _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
+ add_worker_thread (*i);
}
}
+ _broadcast_thread = new boost::thread (boost::bind (&Encoder::broadcast_thread, this));
+ _listen_thread = new boost::thread (boost::bind (&Encoder::listen_thread, this));
+
_writer.reset (new Writer (_film, _job));
}
void
Encoder::terminate_threads ()
{
- boost::mutex::scoped_lock lock (_mutex);
- _terminate = true;
- _condition.notify_all ();
- lock.unlock ();
+ {
+ boost::mutex::scoped_lock lock (_mutex);
+ _terminate = true;
+ _condition.notify_all ();
+ }
- for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
- if ((*i)->joinable ()) {
- (*i)->join ();
+ for (ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) {
+ if (i->second->joinable ()) {
+ i->second->join ();
}
- delete *i;
+ delete i->second;
}
_threads.clear ();
+
+ if (_broadcast_thread && _broadcast_thread->joinable ()) {
+ _broadcast_thread->join ();
+ }
+ delete _broadcast_thread;
+
+ if (_listen_thread && _listen_thread->joinable ()) {
+ _listen_thread->join ();
+ }
+ delete _listen_thread;
}
void
_condition.notify_all ();
}
}
+
+void
+Encoder::broadcast_thread ()
+{
+ boost::system::error_code error;
+ boost::asio::io_service io_service;
+ boost::asio::ip::udp::socket socket (io_service);
+ socket.open (boost::asio::ip::udp::v4(), error);
+ if (error) {
+ throw NetworkError ("failed to set up broadcast socket");
+ }
+
+ socket.set_option (boost::asio::ip::udp::socket::reuse_address (true));
+ socket.set_option (boost::asio::socket_base::broadcast (true));
+
+ boost::asio::ip::udp::endpoint end_point (boost::asio::ip::address_v4::broadcast(), Config::instance()->server_port_base() + 1);
+
+ while (1) {
+ boost::mutex::scoped_lock lm (_mutex);
+ if (_terminate) {
+ socket.close (error);
+ return;
+ }
+
+ string data = DCPOMATIC_HELLO;
+ socket.send_to (boost::asio::buffer (data.c_str(), data.size() + 1), end_point);
+
+ lm.unlock ();
+ dcpomatic_sleep (10);
+ }
+}
+
+void
+Encoder::listen_thread ()
+{
+ while (1) {
+ {
+ /* See if we need to stop */
+ boost::mutex::scoped_lock lm (_mutex);
+ if (_terminate) {
+ return;
+ }
+ }
+
+ shared_ptr<Socket> sock (new Socket (10));
+
+ try {
+ sock->accept (Config::instance()->server_port_base() + 1);
+ } catch (std::exception& e) {
+ continue;
+ }
+
+ uint32_t length = sock->read_uint32 ();
+ scoped_array<char> buffer (new char[length]);
+ sock->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
+
+ stringstream s (buffer.get());
+ shared_ptr<cxml::Document> xml (new cxml::Document ("ServerAvailable"));
+ xml->read_stream (s);
+
+ {
+ /* See if we already know about this server */
+ string const ip = sock->socket().remote_endpoint().address().to_string ();
+ boost::mutex::scoped_lock lm (_mutex);
+ ThreadList::iterator i = _threads.begin();
+ while (i != _threads.end() && (!i->first || i->first->host_name() != ip)) {
+ ++i;
+ }
+
+ if (i == _threads.end ()) {
+ cout << "Adding a thread for " << ip << "\n";
+ add_worker_thread (ServerDescription (ip, xml->number_child<int> ("Threads")));
+ } else {
+ cout << "Already know about " << ip << "\n";
+ }
+ }
+ }
+}
#include <libswresample/swresample.h>
}
#include "util.h"
+#include "config.h"
class Image;
class AudioBuffers;
void encoder_thread (boost::optional<ServerDescription>);
void terminate_threads ();
+ void broadcast_thread ();
+ void listen_thread ();
+ void add_worker_thread (ServerDescription);
/** Film that we are encoding */
boost::shared_ptr<const Film> _film;
bool _have_a_real_frame[EYES_COUNT];
bool _terminate;
std::list<boost::shared_ptr<DCPVideoFrame> > _queue;
- std::list<boost::thread *> _threads;
+ typedef std::list<std::pair<boost::optional<ServerDescription>, boost::thread *> > ThreadList;
+ ThreadList _threads;
mutable boost::mutex _mutex;
boost::condition _condition;
boost::shared_ptr<Writer> _writer;
+
+ /** A thread to periodically issue broadcasts to find encoding servers */
+ boost::thread* _broadcast_thread;
+ boost::thread* _listen_thread;
};
#endif
using boost::bind;
using boost::scoped_array;
using boost::optional;
+using boost::lexical_cast;
using libdcp::Size;
ServerDescription::ServerDescription (shared_ptr<const cxml::Node> node)
ServerDescription::create_from_metadata (string v)
{
vector<string> b;
- split (b, v, is_any_of (N_(" ")));
+ split (b, v, is_any_of (" "));
if (b.size() != 2) {
return optional<ServerDescription> ();
try {
frame = process (socket);
} catch (std::exception& e) {
- _log->log (String::compose (N_("Error: %1"), e.what()));
+ _log->log (String::compose ("Error: %1", e.what()));
}
socket.reset ();
if (frame >= 0) {
struct timeval end;
gettimeofday (&end, 0);
- _log->log (String::compose (N_("Encoded frame %1 in %2"), frame, seconds (end) - seconds (start)));
+ cout << String::compose ("Encoded frame %1 in %2", frame, seconds (end) - seconds (start)) << "\n";
+ _log->log (String::compose ("Encoded frame %1 in %2", frame, seconds (end) - seconds (start)));
}
_worker_condition.notify_all ();
_broadcast.thread = new thread (bind (&Server::broadcast_thread, this));
boost::asio::io_service io_service;
- boost::asio::ip::tcp::acceptor acceptor (io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port ()));
+
+ boost::asio::ip::tcp::acceptor acceptor (
+ io_service,
+ boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base ())
+ );
+
while (1) {
shared_ptr<Socket> socket (new Socket);
acceptor.accept (socket->socket ());
boost::asio::io_service io_service;
boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
- boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port ());
+ boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
_broadcast.socket = new boost::asio::ip::udp::socket (io_service);
_broadcast.socket->open (listen_endpoint.protocol ());
{
_broadcast.buffer[sizeof(_broadcast.buffer) - 1] = '\0';
- cout << _broadcast.buffer << "\n";
-
+ if (strcmp (_broadcast.buffer, DCPOMATIC_HELLO) == 0) {
+ /* Reply to the client saying what we can do */
+ xmlpp::Document doc;
+ xmlpp::Element* root = doc.create_root_node ("ServerAvailable");
+ root->add_child("Threads")->add_child_text (lexical_cast<string> (_worker_threads.size ()));
+ stringstream xml;
+ doc.write_to_stream (xml, "UTF-8");
+
+ shared_ptr<Socket> socket (new Socket);
+ try {
+ socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1));
+ socket->write (xml.str().length() + 1);
+ socket->write ((uint8_t *) xml.str().c_str(), xml.str().length() + 1);
+ } catch (...) {
+
+ }
+ }
+
_broadcast.socket->async_receive_from (
boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
_broadcast.send_endpoint, boost::bind (&Server::broadcast_received, this)
using std::numeric_limits;
using std::pair;
using std::ofstream;
+using std::cout;
using boost::shared_ptr;
using boost::thread;
using boost::lexical_cast;
Socket::Socket (int timeout)
: _deadline (_io_service)
, _socket (_io_service)
+ , _acceptor (0)
, _timeout (timeout)
{
_deadline.expires_at (boost::posix_time::pos_infin);
check ();
}
+Socket::~Socket ()
+{
+ delete _acceptor;
+}
+
void
Socket::check ()
{
if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now ()) {
- _socket.close ();
+ if (_acceptor) {
+ _acceptor->cancel ();
+ } else {
+ _socket.close ();
+ }
_deadline.expires_at (boost::posix_time::pos_infin);
}
* @param endpoint End-point to connect to.
*/
void
-Socket::connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint)
+Socket::connect (boost::asio::ip::tcp::endpoint endpoint)
{
_deadline.expires_from_now (boost::posix_time::seconds (_timeout));
boost::system::error_code ec = boost::asio::error::would_block;
_io_service.run_one();
} while (ec == boost::asio::error::would_block);
- if (ec || !_socket.is_open ()) {
+ if (ec) {
+ throw NetworkError (ec.message ());
+ }
+
+ if (!_socket.is_open ()) {
throw NetworkError (_("connect timed out"));
}
}
+void
+Socket::accept (int port)
+{
+ _acceptor = new boost::asio::ip::tcp::acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port));
+
+ _deadline.expires_from_now (boost::posix_time::seconds (_timeout));
+ boost::system::error_code ec = boost::asio::error::would_block;
+ _acceptor->async_accept (_socket, boost::lambda::var(ec) = boost::lambda::_1);
+ do {
+ _io_service.run_one ();
+ } while (ec == boost::asio::error::would_block );
+
+ delete _acceptor;
+ _acceptor = 0;
+
+ if (ec) {
+ throw NetworkError (ec.message ());
+ }
+}
+
/** Blocking write.
* @param data Buffer to write.
* @param size Number of bytes to write.
{
public:
Socket (int timeout = 30);
+ ~Socket ();
/** @return Our underlying socket */
boost::asio::ip::tcp::socket& socket () {
return _socket;
}
- void connect (boost::asio::ip::basic_resolver_entry<boost::asio::ip::tcp> const & endpoint);
+ void connect (boost::asio::ip::tcp::endpoint);
+ void accept (int);
void write (uint32_t n);
void write (uint8_t const * data, int size);
boost::asio::io_service _io_service;
boost::asio::deadline_timer _deadline;
boost::asio::ip::tcp::socket _socket;
+ boost::asio::ip::tcp::acceptor* _acceptor;
int _timeout;
};
Config::instance()->set_num_local_encoding_threads (1);
Config::instance()->set_servers (vector<ServerDescription> ());
- Config::instance()->set_server_port (61920);
+ Config::instance()->set_server_port_base (61920);
Config::instance()->set_default_dci_metadata (DCIMetadata ());
Config::instance()->set_default_container (static_cast<Ratio*> (0));
Config::instance()->set_default_dcp_content_type (static_cast<DCPContentType*> (0));