/*
Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
- This program is free software; you can redistribute it and/or modify
+ This file is part of DCP-o-matic.
+
+ DCP-o-matic is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
- This program is distributed in the hope that it will be useful,
+ DCP-o-matic is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
*/
#include "config.h"
#include "cross.h"
#include "player_video.h"
-#include "safe_stringstream.h"
-#include "raw_convert.h"
#include "compose.hpp"
#include "log.h"
#include "encoded_log_entry.h"
+#include <dcp/raw_convert.h>
#include <libcxml/cxml.h>
#include <libxml++/libxml++.h>
#include <boost/algorithm/string.hpp>
using boost::optional;
using dcp::Size;
using dcp::Data;
+using dcp::raw_convert;
-EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose)
- : _terminate (false)
+EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose, int num_threads)
+ : Server (ENCODE_FRAME_PORT)
, _log (log)
, _verbose (verbose)
- , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
+ , _num_threads (num_threads)
{
}
EncodeServer::~EncodeServer ()
{
{
- boost::mutex::scoped_lock lm (_worker_mutex);
+ boost::mutex::scoped_lock lm (_mutex);
_terminate = true;
_empty_condition.notify_all ();
_full_condition.notify_all ();
}
BOOST_FOREACH (boost::thread* i, _worker_threads) {
- DCPOMATIC_ASSERT (i->joinable ());
- i->join ();
+ /* Ideally this would be a DCPOMATIC_ASSERT(i->joinable()) but we
+ can't throw exceptions from a destructor.
+ */
+ if (i->joinable ()) {
+ i->join ();
+ }
delete i;
}
- _io_service.stop ();
-
_broadcast.io_service.stop ();
if (_broadcast.thread) {
- DCPOMATIC_ASSERT (_broadcast.thread->joinable ());
- _broadcast.thread->join ();
+ /* Ideally this would be a DCPOMATIC_ASSERT(_broadcast.thread->joinable()) but we
+ can't throw exceptions from a destructor.
+ */
+ if (_broadcast.thread->joinable ()) {
+ _broadcast.thread->join ();
+ }
}
}
EncodeServer::worker_thread ()
{
while (true) {
- boost::mutex::scoped_lock lock (_worker_mutex);
+ boost::mutex::scoped_lock lock (_mutex);
while (_queue.empty () && !_terminate) {
_empty_condition.wait (lock);
}
}
void
-EncodeServer::run (int num_threads)
+EncodeServer::run ()
{
- LOG_GENERAL ("Server starting with %1 threads", num_threads);
+ LOG_GENERAL ("Server starting with %1 threads", _num_threads);
if (_verbose) {
- cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
+ cout << "DCP-o-matic server starting with " << _num_threads << " threads.\n";
}
- for (int i = 0; i < num_threads; ++i) {
+ for (int i = 0; i < _num_threads; ++i) {
_worker_threads.push_back (new thread (bind (&EncodeServer::worker_thread, this)));
}
_broadcast.thread = new thread (bind (&EncodeServer::broadcast_thread, this));
- start_accept ();
- _io_service.run ();
+ Server::run ();
}
void
try
{
boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
- boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
+ boost::asio::ip::udp::endpoint listen_endpoint (address, HELLO_PORT);
_broadcast.socket = new boost::asio::ip::udp::socket (_broadcast.io_service);
_broadcast.socket->open (listen_endpoint.protocol ());
}
shared_ptr<Socket> socket (new Socket);
try {
- socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1));
+ socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), SERVER_PRESENCE_PORT));
socket->write (xml.length() + 1);
socket->write ((uint8_t *) xml.c_str(), xml.length() + 1);
} catch (...) {
}
void
-EncodeServer::start_accept ()
+EncodeServer::handle (shared_ptr<Socket> socket)
{
- if (_terminate) {
- return;
- }
-
- shared_ptr<Socket> socket (new Socket);
- _acceptor.async_accept (socket->socket (), boost::bind (&EncodeServer::handle_accept, this, socket, boost::asio::placeholders::error));
-}
-
-void
-EncodeServer::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
-{
- if (error) {
- return;
- }
-
- boost::mutex::scoped_lock lock (_worker_mutex);
+ boost::mutex::scoped_lock lock (_mutex);
/* Wait until the queue has gone down a bit */
while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
_queue.push_back (socket);
_empty_condition.notify_all ();
-
- start_accept ();
}