using dcp::Size;
using dcp::Data;
-EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose)
- : _terminate (false)
+EncodeServer::EncodeServer (shared_ptr<Log> log, bool verbose, int num_threads)
+ : Server (Config::instance()->server_port_base())
, _log (log)
, _verbose (verbose)
- , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base()))
+ , _num_threads (num_threads)
{
}
EncodeServer::~EncodeServer ()
{
{
- boost::mutex::scoped_lock lm (_worker_mutex);
- _terminate = true;
+ boost::mutex::scoped_lock lm (_mutex);
_empty_condition.notify_all ();
_full_condition.notify_all ();
}
delete i;
}
- _io_service.stop ();
-
_broadcast.io_service.stop ();
if (_broadcast.thread) {
DCPOMATIC_ASSERT (_broadcast.thread->joinable ());
EncodeServer::worker_thread ()
{
while (true) {
- boost::mutex::scoped_lock lock (_worker_mutex);
+ boost::mutex::scoped_lock lock (_mutex);
while (_queue.empty () && !_terminate) {
_empty_condition.wait (lock);
}
}
void
-EncodeServer::run (int num_threads)
+EncodeServer::run ()
{
- LOG_GENERAL ("Server starting with %1 threads", num_threads);
+ LOG_GENERAL ("Server starting with %1 threads", _num_threads);
if (_verbose) {
- cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
+ cout << "DCP-o-matic server starting with " << _num_threads << " threads.\n";
}
- for (int i = 0; i < num_threads; ++i) {
+ for (int i = 0; i < _num_threads; ++i) {
_worker_threads.push_back (new thread (bind (&EncodeServer::worker_thread, this)));
}
_broadcast.thread = new thread (bind (&EncodeServer::broadcast_thread, this));
- start_accept ();
- _io_service.run ();
+ Server::run ();
}
void
}
void
-EncodeServer::start_accept ()
-{
- if (_terminate) {
- return;
- }
-
- shared_ptr<Socket> socket (new Socket);
- _acceptor.async_accept (socket->socket (), boost::bind (&EncodeServer::handle_accept, this, socket, boost::asio::placeholders::error));
-}
-
-void
-EncodeServer::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+EncodeServer::handle (shared_ptr<Socket> socket)
{
- if (error) {
- return;
- }
-
- boost::mutex::scoped_lock lock (_worker_mutex);
+ boost::mutex::scoped_lock lock (_mutex);
/* Wait until the queue has gone down a bit */
while (_queue.size() >= _worker_threads.size() * 2 && !_terminate) {
_queue.push_back (socket);
_empty_condition.notify_all ();
-
- start_accept ();
}
#define DCPOMATIC_ENCODE_SERVER_H
/** @file src/encode_server.h
- * @brief Server class.
+ * @brief EncodeServer class.
*/
+#include "server.h"
#include "exception_store.h"
#include <boost/thread.hpp>
#include <boost/asio.hpp>
* @brief A class to run a server which can accept requests to perform JPEG2000
* encoding work.
*/
-class EncodeServer : public ExceptionStore, public boost::noncopyable
+class EncodeServer : public Server, public ExceptionStore
{
public:
- EncodeServer (boost::shared_ptr<Log> log, bool verbose);
+ EncodeServer (boost::shared_ptr<Log> log, bool verbose, int num_threads);
~EncodeServer ();
- void run (int num_threads);
+ void run ();
private:
+ void handle (boost::shared_ptr<Socket>);
void worker_thread ();
int process (boost::shared_ptr<Socket> socket, struct timeval &, struct timeval &);
void broadcast_thread ();
void broadcast_received ();
- void start_accept ();
- void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
-
- bool _terminate;
std::vector<boost::thread *> _worker_threads;
std::list<boost::shared_ptr<Socket> > _queue;
- boost::mutex _worker_mutex;
boost::condition _full_condition;
boost::condition _empty_condition;
boost::shared_ptr<Log> _log;
bool _verbose;
-
- boost::asio::io_service _io_service;
- boost::asio::ip::tcp::acceptor _acceptor;
+ int _num_threads;
struct Broadcast {
{
public:
void rethrow () {
- boost::mutex::scoped_lock lm (_mutex);
+ boost::mutex::scoped_lock lm (_exception_mutex);
if (_exception) {
boost::exception_ptr tmp = _exception;
_exception = boost::exception_ptr ();
protected:
void store_current () {
- boost::mutex::scoped_lock lm (_mutex);
+ boost::mutex::scoped_lock lm (_exception_mutex);
_exception = boost::current_exception ();
}
private:
boost::exception_ptr _exception;
- mutable boost::mutex _mutex;
+ mutable boost::mutex _exception_mutex;
};
#endif
--- /dev/null
+/*
+ Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#include "server.h"
+#include "dcpomatic_socket.h"
+
+#include "i18n.h"
+
+using boost::shared_ptr;
+
+Server::Server (int port)
+ : _terminate (false)
+ , _acceptor (_io_service, boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), port))
+{
+
+}
+
+Server::~Server ()
+{
+ boost::mutex::scoped_lock lm (_mutex);
+ _terminate = true;
+ _io_service.stop ();
+}
+
+void
+Server::run ()
+{
+ start_accept ();
+ _io_service.run ();
+}
+
+void
+Server::start_accept ()
+{
+ {
+ boost::mutex::scoped_lock lm (_mutex);
+ if (_terminate) {
+ return;
+ }
+ }
+
+ shared_ptr<Socket> socket (new Socket);
+ _acceptor.async_accept (socket->socket (), boost::bind (&Server::handle_accept, this, socket, boost::asio::placeholders::error));
+}
+
+void
+Server::handle_accept (shared_ptr<Socket> socket, boost::system::error_code const & error)
+{
+ if (error) {
+ return;
+ }
+
+ handle (socket);
+ start_accept ();
+}
--- /dev/null
+/*
+ Copyright (C) 2015 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#ifndef DCPOMATIC_SERVER_H
+#define DCPOMATIC_SERVER_H
+
+#include <boost/thread.hpp>
+#include <boost/asio.hpp>
+#include <boost/thread/condition.hpp>
+#include <string>
+
+class Socket;
+
+class Server : public boost::noncopyable
+{
+public:
+ Server (int port);
+ virtual ~Server ();
+
+ virtual void run ();
+
+protected:
+ boost::mutex _mutex;
+ bool _terminate;
+
+private:
+ virtual void handle (boost::shared_ptr<Socket> socket) = 0;
+
+ void start_accept ();
+ void handle_accept (boost::shared_ptr<Socket>, boost::system::error_code const &);
+
+ boost::asio::io_service _io_service;
+ boost::asio::ip::tcp::acceptor _acceptor;
+};
+
+#endif
screen_kdm.cc
send_kdm_email_job.cc
send_problem_report_job.cc
+ server.cc
single_stream_audio_content.cc
sndfile_base.cc
sndfile_content.cc
void main_thread ()
try {
- EncodeServer server (server_log, false);
- server.run (Config::instance()->num_local_encoding_threads ());
+ EncodeServer server (server_log, false, Config::instance()->num_local_encoding_threads());
+ server.run ();
} catch (...) {
store_current ();
}
log.reset (new NullLog);
}
- EncodeServer server (log, verbose);
+ EncodeServer server (log, verbose, num_threads);
try {
- server.run (num_threads);
+ server.run ();
} catch (boost::system::system_error& e) {
if (e.code() == boost::system::errc::address_in_use) {
cerr << argv[0] << ": address already in use. Is another DCP-o-matic server instance already running?\n";
Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
- EncodeServer* server = new EncodeServer (log, true);
+ EncodeServer* server = new EncodeServer (log, true, 2);
- new thread (boost::bind (&EncodeServer::run, server, 2));
+ new thread (boost::bind (&EncodeServer::run, server));
/* Let the server get itself ready */
dcpomatic_sleep (1);
Data locally_encoded = frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
- EncodeServer* server = new EncodeServer (log, true);
+ EncodeServer* server = new EncodeServer (log, true, 2);
- new thread (boost::bind (&EncodeServer::run, server, 2));
+ new thread (boost::bind (&EncodeServer::run, server));
/* Let the server get itself ready */
dcpomatic_sleep (1);
Data j2k_locally_encoded = j2k_frame->encode_locally (boost::bind (&Log::dcp_log, log.get(), _1, _2));
- EncodeServer* server = new EncodeServer (log, true);
+ EncodeServer* server = new EncodeServer (log, true, 2);
- new thread (boost::bind (&EncodeServer::run, server, 2));
+ new thread (boost::bind (&EncodeServer::run, server));
/* Let the server get itself ready */
dcpomatic_sleep (1);