} else if (k == "reference_filter") {
_reference_filters.push_back (Filter::from_id (v));
} else if (k == "server") {
- _servers.push_back (Server::create_from_metadata (v));
+ _servers.push_back (ServerDescription::create_from_metadata (v));
} else if (k == "screen") {
_screens.push_back (Screen::create_from_metadata (v));
} else if (k == "tms_ip") {
f << "reference_filter " << (*i)->id () << "\n";
}
- for (vector<Server*>::const_iterator i = _servers.begin(); i != _servers.end(); ++i) {
+ for (vector<ServerDescription*>::const_iterator i = _servers.begin(); i != _servers.end(); ++i) {
f << "server " << (*i)->as_metadata () << "\n";
}
#include <boost/shared_ptr.hpp>
#include <sigc++/signal.h>
-class Server;
+class ServerDescription;
class Screen;
class Scaler;
class Filter;
}
/** @return J2K encoding servers to use */
- std::vector<Server*> servers () const {
+ std::vector<ServerDescription*> servers () const {
return _servers;
}
}
/** @param s New list of servers */
- void set_servers (std::vector<Server*> s) {
+ void set_servers (std::vector<ServerDescription*> s) {
_servers = s;
Changed ();
}
int _j2k_bandwidth;
/** J2K encoding servers to use */
- std::vector<Server *> _servers;
+ std::vector<ServerDescription *> _servers;
/** Screen definitions */
std::vector<boost::shared_ptr<Screen> > _screens;
* @return Encoded data.
*/
shared_ptr<EncodedData>
-DCPVideoFrame::encode_remotely (Server const * serv)
+DCPVideoFrame::encode_remotely (ServerDescription const * serv)
{
asio::io_service io_service;
asio::ip::tcp::resolver resolver (io_service);
class FilmState;
class Options;
-class Server;
+class ServerDescription;
class Scaler;
class Image;
class Log;
virtual ~DCPVideoFrame ();
boost::shared_ptr<EncodedData> encode_locally ();
- boost::shared_ptr<EncodedData> encode_remotely (Server const *);
+ boost::shared_ptr<EncodedData> encode_remotely (ServerDescription const *);
int frame () const {
return _frame;
}
void
-J2KWAVEncoder::encoder_thread (Server* server)
+J2KWAVEncoder::encoder_thread (ServerDescription* server)
{
/* Number of seconds that we currently wait between attempts
to connect to the server; not relevant for localhost
J2KWAVEncoder::process_begin ()
{
for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
- _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (Server *) 0)));
+ _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
}
- vector<Server*> servers = Config::instance()->servers ();
+ vector<ServerDescription*> servers = Config::instance()->servers ();
- for (vector<Server*>::iterator i = servers.begin(); i != servers.end(); ++i) {
+ for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
for (int j = 0; j < (*i)->threads (); ++j) {
_worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
}
#include <sndfile.h>
#include "encoder.h"
-class Server;
+class ServerDescription;
class DCPVideoFrame;
class Image;
class Log;
private:
- void encoder_thread (Server *);
+ void encoder_thread (ServerDescription *);
void close_sound_files ();
void terminate_worker_threads ();
*/
+#ifndef DVDOMATIC_LOG_H
+#define DVDOMATIC_LOG_H
+
/** @file src/log.h
* @brief A very simple logging class.
*/
/** level above which to ignore log messages */
Level _level;
};
+
+#endif
/** @file src/server.cc
* @brief Class to describe a server to which we can send
- * encoding work.
+ * encoding work, and a class to implement such a server.
*/
#include <string>
#include <sstream>
#include <boost/algorithm/string.hpp>
#include "server.h"
+#include "util.h"
+#include "scaler.h"
+#include "image.h"
+#include "dcp_video_frame.h"
+#include "config.h"
using namespace std;
using namespace boost;
-/** Create a server from a string of metadata returned from as_metadata().
+/** Create a server description from a string of metadata returned from as_metadata().
* @param v Metadata.
- * @return Server, or 0.
+ * @return ServerDescription, or 0.
*/
-Server *
-Server::create_from_metadata (string v)
+ServerDescription *
+ServerDescription::create_from_metadata (string v)
{
vector<string> b;
split (b, v, is_any_of (" "));
return 0;
}
- return new Server (b[0], atoi (b[1].c_str ()));
+ return new ServerDescription (b[0], atoi (b[1].c_str ()));
}
/** @return Description of this server as text */
string
-Server::as_metadata () const
+ServerDescription::as_metadata () const
{
stringstream s;
s << _host_name << " " << _threads;
return s.str ();
}
+
+Server::Server ()
+ : _log ("servomatic.log")
+{
+
+}
+
+int
+Server::process (shared_ptr<asio::ip::tcp::socket> socket)
+{
+ SocketReader reader (socket);
+
+ char buffer[128];
+ reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
+ reader.consume (strlen (buffer) + 1);
+
+ stringstream s (buffer);
+
+ string command;
+ s >> command;
+ if (command != "encode") {
+ return -1;
+ }
+
+ Size in_size;
+ int pixel_format_int;
+ Size out_size;
+ int padding;
+ string scaler_id;
+ int frame;
+ float frames_per_second;
+ string post_process;
+ int colour_lut_index;
+ int j2k_bandwidth;
+
+ s >> in_size.width >> in_size.height
+ >> pixel_format_int
+ >> out_size.width >> out_size.height
+ >> padding
+ >> scaler_id
+ >> frame
+ >> frames_per_second
+ >> post_process
+ >> colour_lut_index
+ >> j2k_bandwidth;
+
+ PixelFormat pixel_format = (PixelFormat) pixel_format_int;
+ Scaler const * scaler = Scaler::from_id (scaler_id);
+ if (post_process == "none") {
+ post_process = "";
+ }
+
+ shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size));
+
+ for (int i = 0; i < image->components(); ++i) {
+ int line_size;
+ s >> line_size;
+ image->set_line_size (i, line_size);
+ }
+
+ for (int i = 0; i < image->components(); ++i) {
+ reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
+ }
+
+#ifdef DEBUG_HASH
+ image->hash ("Image for encoding (as received by server)");
+#endif
+
+ DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &_log);
+ shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
+ encoded->send (socket);
+
+#ifdef DEBUG_HASH
+ encoded->hash ("Encoded image (as made by server and as sent back)");
+#endif
+
+ return frame;
+}
+
+void
+Server::worker_thread ()
+{
+ while (1) {
+ mutex::scoped_lock lock (_worker_mutex);
+ while (_queue.empty ()) {
+ _worker_condition.wait (lock);
+ }
+
+ shared_ptr<asio::ip::tcp::socket> socket = _queue.front ();
+ _queue.pop_front ();
+
+ lock.unlock ();
+
+ int frame = -1;
+
+ struct timeval start;
+ gettimeofday (&start, 0);
+
+ try {
+ frame = process (socket);
+ } catch (std::exception& e) {
+ cerr << "Error: " << e.what() << "\n";
+ }
+
+ socket.reset ();
+
+ lock.lock ();
+
+ if (frame >= 0) {
+ struct timeval end;
+ gettimeofday (&end, 0);
+ cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n";
+ }
+
+ _worker_condition.notify_all ();
+ }
+}
+
+void
+Server::run ()
+{
+ int const num_threads = Config::instance()->num_local_encoding_threads ();
+
+ for (int i = 0; i < num_threads; ++i) {
+ _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
+ }
+
+ asio::io_service io_service;
+ asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
+ while (1) {
+ shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
+ acceptor.accept (*socket);
+
+ mutex::scoped_lock lock (_worker_mutex);
+
+ /* Wait until the queue has gone down a bit */
+ while (int (_queue.size()) >= num_threads * 2) {
+ _worker_condition.wait (lock);
+ }
+
+ _queue.push_back (socket);
+ _worker_condition.notify_all ();
+ }
+}
/** @file src/server.h
* @brief Class to describe a server to which we can send
- * encoding work.
+ * encoding work, and a class to implement such a server.
*/
#include <string>
+#include <boost/thread.hpp>
+#include <boost/asio.hpp>
+#include <boost/thread/condition.hpp>
+#include "log.h"
-/** @class Server
+/** @class ServerDescription
* @brief Class to describe a server to which we can send encoding work.
*/
-class Server
+class ServerDescription
{
public:
/** @param h Server host name or IP address in string form.
* @param t Number of threads to use on the server.
*/
- Server (std::string h, int t)
+ ServerDescription (std::string h, int t)
: _host_name (h)
, _threads (t)
{}
std::string as_metadata () const;
- static Server * create_from_metadata (std::string v);
+ static ServerDescription * create_from_metadata (std::string v);
private:
/** server's host name */
/** number of threads to use on the server */
int _threads;
};
+
+class Server
+{
+public:
+ Server ();
+
+ void run ();
+
+private:
+ void worker_thread ();
+ int process (boost::shared_ptr<boost::asio::ip::tcp::socket> socket);
+
+ std::vector<boost::thread *> _worker_threads;
+ std::list<boost::shared_ptr<boost::asio::ip::tcp::socket> > _queue;
+ boost::mutex _worker_mutex;
+ boost::condition _worker_condition;
+ Log _log;
+};
+++ /dev/null
-/*
- Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-*/
-
-#include <iostream>
-#include <stdexcept>
-#include <sstream>
-#include <cstring>
-#include <vector>
-#include <unistd.h>
-#include <errno.h>
-#include <boost/array.hpp>
-#include <boost/asio.hpp>
-#include <boost/algorithm/string.hpp>
-#include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition.hpp>
-#include "config.h"
-#include "dcp_video_frame.h"
-#include "exceptions.h"
-#include "util.h"
-#include "config.h"
-#include "scaler.h"
-#include "image.h"
-#include "log.h"
-
-#define BACKLOG 8
-
-using namespace std;
-using namespace boost;
-
-static vector<thread *> worker_threads;
-
-static std::list<shared_ptr<asio::ip::tcp::socket> > queue;
-static mutex worker_mutex;
-static condition worker_condition;
-static Log log_ ("servomatic.log");
-
-int
-process (shared_ptr<asio::ip::tcp::socket> socket)
-{
- SocketReader reader (socket);
-
- char buffer[128];
- reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
- reader.consume (strlen (buffer) + 1);
-
- stringstream s (buffer);
-
- string command;
- s >> command;
- if (command != "encode") {
- return -1;
- }
-
- Size in_size;
- int pixel_format_int;
- Size out_size;
- int padding;
- string scaler_id;
- int frame;
- float frames_per_second;
- string post_process;
- int colour_lut_index;
- int j2k_bandwidth;
-
- s >> in_size.width >> in_size.height
- >> pixel_format_int
- >> out_size.width >> out_size.height
- >> padding
- >> scaler_id
- >> frame
- >> frames_per_second
- >> post_process
- >> colour_lut_index
- >> j2k_bandwidth;
-
- PixelFormat pixel_format = (PixelFormat) pixel_format_int;
- Scaler const * scaler = Scaler::from_id (scaler_id);
- if (post_process == "none") {
- post_process = "";
- }
-
- shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size));
-
- for (int i = 0; i < image->components(); ++i) {
- int line_size;
- s >> line_size;
- image->set_line_size (i, line_size);
- }
-
- for (int i = 0; i < image->components(); ++i) {
- reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
- }
-
-#ifdef DEBUG_HASH
- image->hash ("Image for encoding (as received by server)");
-#endif
-
- DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &log_);
- shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
- encoded->send (socket);
-
-#ifdef DEBUG_HASH
- encoded->hash ("Encoded image (as made by server and as sent back)");
-#endif
-
- return frame;
-}
-
-void
-worker_thread ()
-{
- while (1) {
- mutex::scoped_lock lock (worker_mutex);
- while (queue.empty ()) {
- worker_condition.wait (lock);
- }
-
- shared_ptr<asio::ip::tcp::socket> socket = queue.front ();
- queue.pop_front ();
-
- lock.unlock ();
-
- int frame = -1;
-
- struct timeval start;
- gettimeofday (&start, 0);
-
- try {
- frame = process (socket);
- } catch (std::exception& e) {
- cerr << "Error: " << e.what() << "\n";
- }
-
- socket.reset ();
-
- lock.lock ();
-
- if (frame >= 0) {
- struct timeval end;
- gettimeofday (&end, 0);
- cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n";
- }
-
- worker_condition.notify_all ();
- }
-}
-
-int
-main ()
-{
- Scaler::setup_scalers ();
-
- int const num_threads = Config::instance()->num_local_encoding_threads ();
-
- for (int i = 0; i < num_threads; ++i) {
- worker_threads.push_back (new thread (worker_thread));
- }
-
- asio::io_service io_service;
- asio::ip::tcp::acceptor acceptor (io_service, asio::ip::tcp::endpoint (asio::ip::tcp::v4(), Config::instance()->server_port ()));
- while (1) {
- shared_ptr<asio::ip::tcp::socket> socket (new asio::ip::tcp::socket (io_service));
- acceptor.accept (*socket);
-
- mutex::scoped_lock lock (worker_mutex);
-
- /* Wait until the queue has gone down a bit */
- while (int (queue.size()) >= num_threads * 2) {
- worker_condition.wait (lock);
- }
-
- queue.push_back (socket);
- worker_condition.notify_all ();
- }
-
- return 0;
-}
--- /dev/null
+/*
+ Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#include "lib/server.h"
+#include <iostream>
+#include <stdexcept>
+#include <sstream>
+#include <cstring>
+#include <vector>
+#include <unistd.h>
+#include <errno.h>
+#include <boost/array.hpp>
+#include <boost/asio.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition.hpp>
+#include "config.h"
+#include "dcp_video_frame.h"
+#include "exceptions.h"
+#include "util.h"
+#include "config.h"
+#include "scaler.h"
+#include "image.h"
+#include "log.h"
+
+int
+main ()
+{
+ Scaler::setup_scalers ();
+ Server server;
+ server.run ();
+ return 0;
+}
def build(bld):
- for t in ['makedcp', 'fixlengths', 'servomatic']:
+ for t in ['makedcp', 'fixlengths', 'servomatic_cli']:
obj = bld(features = 'cxx cxxprogram')
obj.uselib = 'BOOST_THREAD'
obj.includes = ['..']
_reference_filters->SetLabel (std_to_wx (p.first + " " + p.second));
_reference_filters_button->Connect (wxID_ANY, wxEVT_COMMAND_BUTTON_CLICKED, wxCommandEventHandler (ConfigDialog::edit_reference_filters_clicked), 0, this);
- vector<Server*> servers = config->servers ();
- for (vector<Server*>::iterator i = servers.begin(); i != servers.end(); ++i) {
+ vector<ServerDescription*> servers = config->servers ();
+ for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
add_server_to_control (*i);
}
}
void
-ConfigDialog::add_server_to_control (Server* s)
+ConfigDialog::add_server_to_control (ServerDescription* s)
{
wxListItem item;
int const n = _servers->GetItemCount ();
{
ServerDialog* d = new ServerDialog (this, 0);
d->ShowModal ();
- Server* s = d->server ();
+ ServerDescription* s = d->server ();
d->Destroy ();
add_server_to_control (s);
- vector<Server*> o = Config::instance()->servers ();
+ vector<ServerDescription*> o = Config::instance()->servers ();
o.push_back (s);
Config::instance()->set_servers (o);
}
item.SetColumn (0);
_servers->GetItem (item);
- vector<Server*> servers = Config::instance()->servers ();
- vector<Server*>::iterator j = servers.begin();
+ vector<ServerDescription*> servers = Config::instance()->servers ();
+ vector<ServerDescription*>::iterator j = servers.begin();
while (j != servers.end() && (*j)->host_name() != wx_to_std (item.GetText ())) {
++j;
}
#include <wx/listctrl.h>
class Screen;
-class Server;
+class ServerDescription;
/** @class ConfigDialog
* @brief A dialogue to edit DVD-o-matic configuration.
void remove_server_clicked (wxCommandEvent &);
void server_selection_changed (wxListEvent &);
- void add_server_to_control (Server *);
+ void add_server_to_control (ServerDescription *);
wxTextCtrl* _tms_ip;
wxTextCtrl* _tms_path;
#include "server_dialog.h"
#include "wx_util.h"
-ServerDialog::ServerDialog (wxWindow* parent, Server* server)
+ServerDialog::ServerDialog (wxWindow* parent, ServerDescription* server)
: wxDialog (parent, wxID_ANY, wxString (_("Server")))
{
if (server) {
_server = server;
} else {
- _server = new Server ("localhost", 1);
+ _server = new ServerDescription ("localhost", 1);
}
wxFlexGridSizer* table = new wxFlexGridSizer (2, 4, 4);
_server->set_threads (_threads->GetValue ());
}
-Server *
+ServerDescription *
ServerDialog::server () const
{
return _server;
#include <wx/wx.h>
#include <wx/spinctrl.h>
-class Server;
+class ServerDescription;
class ServerDialog : public wxDialog
{
public:
- ServerDialog (wxWindow *, Server *);
+ ServerDialog (wxWindow *, ServerDescription *);
- Server* server () const;
+ ServerDescription* server () const;
private:
void host_changed (wxCommandEvent &);
void threads_changed (wxCommandEvent &);
- Server* _server;
+ ServerDescription* _server;
wxTextCtrl* _host;
wxSpinCtrl* _threads;
};