--- /dev/null
+gdb.exe -x gdb_script dcpomatic2_dist.exe > %HOMEPATH%/Documents/dcpomatic_debug_log.txt
File "%static_deps%/bin/libcrypto-1_1-x64.dll"
File "%static_deps%/bin/libltdl-7.dll"
File "%static_deps%/bin/libdl.dll"
+File "%static_deps%/bin/libnanomsg.dll"
File "%cdist_deps%/bin/asdcp-carl.dll"
File "%cdist_deps%/bin/kumu-carl.dll"
#include "compose.hpp"
#include "exceptions.h"
#include <dcp/raw_convert.h>
+#include <nanomsg/nn.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
using boost::shared_ptr;
using dcp::raw_convert;
-CopyToDriveJob::CopyToDriveJob (boost::filesystem::path dcp, Drive drive, boost::process::opstream& to_writer, boost::process::ipstream& from_writer)
+CopyToDriveJob::CopyToDriveJob (boost::filesystem::path dcp, Drive drive, Nanomsg& nanomsg)
: Job (shared_ptr<Film>())
, _dcp (dcp)
, _drive (drive)
- , _to_writer (to_writer)
- , _from_writer (from_writer)
+ , _nanomsg (nanomsg)
{
}
void
CopyToDriveJob::run ()
{
- _to_writer << _dcp.string() << "\n";
- _to_writer << _drive.internal_name() << "\n";
- _to_writer.flush ();
+ _nanomsg.send(String::compose("%1\n%2\n", _dcp.string(), _drive.internal_name()));
while (true) {
- string s;
- getline (_from_writer, s);
+ string s = _nanomsg.blocking_get ();
if (s == DIST_WRITER_OK) {
set_state (FINISHED_OK);
return;
} else if (s == DIST_WRITER_ERROR) {
- string m;
- getline (_from_writer, m);
- string n;
- getline (_from_writer, n);
+ string const m = _nanomsg.blocking_get ();
+ string const n = _nanomsg.blocking_get ();
throw CopyError (m, raw_convert<int>(n));
} else if (s == DIST_WRITER_PROGRESS) {
- string p;
- getline (_from_writer, p);
+ string p = _nanomsg.blocking_get();
set_progress (raw_convert<float>(p));
}
}
/*
- Copyright (C) 2019 Carl Hetherington <cth@carlh.net>
+ Copyright (C) 2019-2020 Carl Hetherington <cth@carlh.net>
This file is part of DCP-o-matic.
#include "cross.h"
#include "job.h"
-#include <boost/process.hpp>
+#include "nanomsg.h"
class CopyToDriveJob : public Job
{
public:
- CopyToDriveJob (boost::filesystem::path dcp, Drive drive, boost::process::opstream& to_writer, boost::process::ipstream& from_writer);
+ CopyToDriveJob (boost::filesystem::path dcp, Drive drive, Nanomsg& nanomsg);
std::string name () const;
std::string json_name () const;
void copy (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total_remaining, uint64_t total);
boost::filesystem::path _dcp;
Drive _drive;
- boost::process::opstream& _to_writer;
- boost::process::ipstream& _from_writer;
+ Nanomsg& _nanomsg;
};
--- /dev/null
+/*
+ Copyright (C) 2020 Carl Hetherington <cth@carlh.net>
+
+ This file is part of DCP-o-matic.
+
+ DCP-o-matic is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ DCP-o-matic is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+#include "nanomsg.h"
+#include "dcpomatic_log.h"
+#include <nanomsg/nn.h>
+#include <nanomsg/pair.h>
+#include <stdexcept>
+#include <cerrno>
+
+using std::string;
+using std::runtime_error;
+using boost::optional;
+
+#define NANOMSG_URL "ipc:///tmp/dcpomatic.ipc"
+
+Nanomsg::Nanomsg (bool server)
+{
+ _socket = nn_socket (AF_SP, NN_PAIR);
+ if (_socket < 0) {
+ throw runtime_error("Could not set up nanomsg socket");
+ }
+ if (server) {
+ if (nn_bind(_socket, NANOMSG_URL) < 0) {
+ throw runtime_error(String::compose("Could not bind nanomsg socket (%1)", errno));
+ }
+ } else {
+ if (nn_connect(_socket, NANOMSG_URL) < 0) {
+ throw runtime_error(String::compose("Could not connect nanomsg socket (%1)", errno));
+ }
+ }
+}
+
+void
+Nanomsg::send (string s)
+{
+ int const r = nn_send (_socket, s.c_str(), s.length(), 0);
+ if (r < 0) {
+ throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno));
+ } else if (r != int(s.length())) {
+ throw runtime_error("Could not send to nanomsg socket (message too big)");
+ }
+}
+
+optional<string>
+Nanomsg::get_from_pending ()
+{
+ if (_pending.empty()) {
+ return optional<string>();
+ }
+
+ string const l = _pending.back();
+ _pending.pop_back();
+ return l;
+}
+
+void
+Nanomsg::recv_and_parse (bool blocking)
+{
+ char* buf = 0;
+ int const received = nn_recv (_socket, &buf, NN_MSG, blocking ? 0 : NN_DONTWAIT);
+ if (received < 0)
+ {
+ if (!blocking && errno == EAGAIN) {
+ return;
+ }
+
+ throw runtime_error ("Could not communicate with subprocess");
+ }
+
+ char* p = buf;
+ for (int i = 0; i < received; ++i) {
+ if (*p == '\n') {
+ _pending.push_front (_current);
+ _current = "";
+ } else {
+ _current += *p;
+ }
+ ++p;
+ }
+ nn_freemsg (buf);
+}
+
+string
+Nanomsg::blocking_get ()
+{
+ optional<string> l = get_from_pending ();
+ if (l) {
+ return *l;
+ }
+
+ recv_and_parse (true);
+
+ l = get_from_pending ();
+ if (!l) {
+ throw runtime_error ("Could not communicate with subprocess");
+ }
+
+ return *l;
+}
+
+optional<string>
+Nanomsg::nonblocking_get ()
+{
+ optional<string> l = get_from_pending ();
+ if (l) {
+ return *l;
+ }
+
+ recv_and_parse (false);
+ return get_from_pending ();
+}
--- /dev/null
+/*
+ Copyright (C) 2020 Carl Hetherington <cth@carlh.net>
+
+ This file is part of DCP-o-matic.
+
+ DCP-o-matic is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ DCP-o-matic is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
+
+*/
+
+#include <string>
+#include <list>
+#include <boost/optional.hpp>
+#include <boost/noncopyable.hpp>
+
+class Nanomsg : public boost::noncopyable
+{
+public:
+ explicit Nanomsg (bool server);
+
+ void send (std::string s);
+ std::string blocking_get ();
+ boost::optional<std::string> nonblocking_get ();
+
+private:
+ boost::optional<std::string> get_from_pending ();
+ void recv_and_parse (bool blocking);
+
+ int _socket;
+ std::list<std::string> _pending;
+ std::string _current;
+};
+
log_entry.cc
mid_side_decoder.cc
monitor_checker.cc
+ nanomsg.cc
overlaps.cc
player.cc
player_text.cc
AVCODEC AVUTIL AVFORMAT AVFILTER SWSCALE
BOOST_FILESYSTEM BOOST_THREAD BOOST_DATETIME BOOST_SIGNALS2 BOOST_REGEX
SAMPLERATE POSTPROC TIFF SSH DCP CXML GLIB LZMA XML++
- CURL ZIP FONTCONFIG PANGOMM CAIROMM XMLSEC SUB ICU NETTLE PNG LWEXT4 POLKIT
+ CURL ZIP FONTCONFIG PANGOMM CAIROMM XMLSEC SUB ICU NETTLE PNG
"""
if bld.env.TARGET_OSX:
if bld.env.ENABLE_DIST:
obj.source += ' copy_to_drive_job.cc'
+ obj.uselib += ' LWEXT4 NANOMSG'
+ if bld.env.TARGET_LINUX:
+ obj.uselib += ' POLKIT'
if bld.env.TARGET_WINDOWS:
obj.uselib += ' WINSOCK2 DBGHELP SHLWAPI MSWSOCK BOOST_LOCALE SETUPAPI'
#include "lib/cross.h"
#include "lib/copy_to_drive_job.h"
#include "lib/job_manager.h"
+#include <nanomsg/nn.h>
+#include <nanomsg/pair.h>
#include <wx/wx.h>
#include <boost/process.hpp>
#ifdef __WXOSX__
using std::exception;
using std::cout;
using std::cerr;
+using std::runtime_error;
using boost::shared_ptr;
class DOMFrame : public wxFrame
public:
explicit DOMFrame (wxString const & title)
: wxFrame (0, -1, title)
+ , _nanomsg (true)
, _sizer (new wxBoxSizer(wxVERTICAL))
{
/* Use a panel as the only child of the Frame so that we avoid
_sizer->Add (grid, 1, wxALL | wxEXPAND, DCPOMATIC_DIALOG_BORDER);
overall_panel->SetSizer (_sizer);
Fit ();
- SetSize (512, GetSize().GetHeight() + 32);
+ SetSize (768, GetSize().GetHeight() + 32);
/* XXX: this is a hack, but I expect we'll need logs and I'm not sure if there's
* a better place to put them.
Bind (wxEVT_SIZE, boost::bind (&DOMFrame::sized, this, _1));
- _writer = new boost::process::child ("dcpomatic2_dist_writer", boost::process::std_in < _to_writer, boost::process::std_out > _from_writer);
+ _writer = new boost::process::child ("dcpomatic2_dist_writer");
}
private:
return;
}
- JobManager::instance()->add(shared_ptr<Job>(new CopyToDriveJob(*_dcp_path, _drives[_drive->GetSelection()], _to_writer, _from_writer)));
+ JobManager::instance()->add(shared_ptr<Job>(new CopyToDriveJob(*_dcp_path, _drives[_drive->GetSelection()], _nanomsg)));
}
void drive_refresh ()
boost::optional<boost::filesystem::path> _dcp_path;
std::vector<Drive> _drives;
boost::process::child* _writer;
- boost::process::opstream _to_writer;
- boost::process::ipstream _from_writer;
+ Nanomsg _nanomsg;
wxSizer* _sizer;
};
#include "lib/digester.h"
#include "lib/file_log.h"
#include "lib/dcpomatic_log.h"
+#include "lib/nanomsg.h"
extern "C" {
#include <lwext4/ext4_mbr.h>
#include <lwext4/ext4_fs.h>
#include <boost/filesystem.hpp>
#include <iostream>
-using std::cout;
using std::cin;
using std::min;
using std::string;
+using std::runtime_error;
+using boost::optional;
#ifdef DCPOMATIC_LINUX
static PolkitAuthority* polkit_authority = 0;
static boost::filesystem::path dcp_path;
static std::string device;
static uint64_t const block_size = 4096;
+static Nanomsg* nanomsg = 0;
static
void
}
remaining -= this_time;
total_remaining -= this_time;
- cout << DIST_WRITER_PROGRESS "\n" << (1 - float(total_remaining) / total) << "\n";
- cout.flush ();
+ nanomsg->send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total)));
}
fclose (in);
digester.add (buffer, this_time);
remaining -= this_time;
total_remaining -= this_time;
- cout << DIST_WRITER_PROGRESS "\n" << (1 - float(total_remaining) / total) << "\n";
- cout.flush ();
+ nanomsg->send(String::compose(DIST_WRITER_PROGRESS "\n%1\n", (1 - float(total_remaining) / total)));
}
ext4_fclose (&in);
void
copy (boost::filesystem::path from, boost::filesystem::path to, uint64_t& total_remaining, uint64_t total)
{
- using namespace boost::filesystem;
+ LOG_DIST ("Copy %1 -> %2", from.string(), to.string());
- /* XXX: this is a hack. We are going to "treat" every byte twice; write it, and then verify it. Double the
- * bytes totals so that progress works itself out (assuming write is the same speed as read).
- */
- total_remaining *= 2;
- total *= 2;
+ using namespace boost::filesystem;
path const cr = to / from.filename();
}
} else {
string const write_digest = write (from, cr, total_remaining, total);
+ LOG_DIST ("Wrote %1 %2 with %3", from.string(), cr.string(), write_digest);
string const read_digest = read (from, cr, total_remaining, total);
+ LOG_DIST ("Read %1 %2 with %3", from.string(), cr.string(), write_digest);
if (write_digest != read_digest) {
throw VerifyError ("Hash of written data is incorrect", 0);
}
{
// ext4_dmask_set (DEBUG_ALL);
- cout << "U\n";
+ nanomsg->send("U\n");
/* We rely on static initialization for these */
static struct ext4_fs fs;
if (!bd) {
throw CopyError ("Failed to open drive", 0);
}
+ LOG_DIST_NC ("Opened drive");
struct ext4_mbr_parts parts;
parts.division[0] = 100;
if (r) {
throw CopyError ("Failed to write MBR", r);
}
+ LOG_DIST_NC ("Wrote MBR");
#ifdef DCPOMATIC_WINDOWS
struct ext4_mbr_bdevs bdevs;
if (!bd) {
throw CopyError ("Failed to open partition", 0);
}
+ LOG_DIST_NC ("Opened partition");
r = ext4_mkfs(&fs, bd, &info, F_SET_EXT4);
if (r != EOK) {
throw CopyError ("Failed to make filesystem", r);
}
+ LOG_DIST_NC ("Made filesystem");
r = ext4_device_register(bd, "ext4_fs");
if (r != EOK) {
throw CopyError ("Failed to register device", r);
}
+ LOG_DIST_NC ("Registered device");
r = ext4_mount("ext4_fs", "/mp/", false);
if (r != EOK) {
throw CopyError ("Failed to mount device", r);
}
+ LOG_DIST_NC ("Mounted device");
uint64_t total_bytes = 0;
count (dcp_path, total_bytes);
+ /* XXX: this is a hack. We are going to "treat" every byte twice; write it, and then verify it. Double the
+ * bytes totals so that progress works itself out (assuming write is the same speed as read).
+ */
+ total_bytes *= 2;
copy (dcp_path, "/mp", total_bytes, total_bytes);
r = ext4_umount("/mp/");
throw CopyError ("Failed to unmount device", r);
}
- cout << DIST_WRITER_OK "\n";
- cout.flush ();
+ nanomsg->send(DIST_WRITER_OK "\n");
} catch (CopyError& e) {
- cout << DIST_WRITER_ERROR "\n" << e.message() << "\n" << e.number() << "\n";
- cout.flush ();
+ nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number()));
} catch (VerifyError& e) {
- cout << DIST_WRITER_ERROR "\n" << e.message() << "\n" << e.number() << "\n";
- cout.flush ();
+ nanomsg->send(String::compose(DIST_WRITER_ERROR "\n%1\n%2\n", e.message(), e.number()));
}
#ifdef DCPOMATIC_LINUX
bool
idle ()
{
-#ifdef DCPOMATIC_POSIX
- struct pollfd input[1] = { { fd: 0, events: POLLIN, revents: 0 } };
- int const r = poll (input, 1, 0);
- if (r > 0 && (input[0].revents & POLLIN)) {
-#else
- DWORD num;
- LOG_DIST ("now handle in is %1", reinterpret_cast<uint64_t>(GetStdHandle(STD_INPUT_HANDLE)));
- if (!GetNumberOfConsoleInputEvents(GetStdHandle(STD_INPUT_HANDLE), &num)) {
- LOG_DIST ("Could not check console: %1", GetLastError());
+ optional<string> s = nanomsg->nonblocking_get ();
+ if (!s) {
+ return true;
}
- LOG_DIST ("%1 console events", num);
- if (num) {
-#endif
- string s;
- getline (cin, s);
- if (s.empty()) {
- return true;
- }
- dcp_path = s;
- getline (cin, s);
- device = "/dev/" + s;
+ dcp_path = *s;
+ device = "/dev/" + nanomsg->blocking_get ();
- LOG_DIST ("Here we go writing %1 to %2", dcp_path, device);
+ LOG_DIST ("Here we go writing %1 to %2", dcp_path, device);
#ifdef DCPOMATIC_LINUX
- polkit_authority = polkit_authority_get_sync (0, 0);
- PolkitSubject* subject = polkit_unix_process_new (getppid());
- polkit_authority_check_authorization (
+ polkit_authority = polkit_authority_get_sync (0, 0);
+ PolkitSubject* subject = polkit_unix_process_new (getppid());
+ polkit_authority_check_authorization (
polkit_authority, subject, "com.dcpomatic.write-drive", 0, POLKIT_CHECK_AUTHORIZATION_FLAGS_ALLOW_USER_INTERACTION, 0, polkit_callback, 0
);
#else
- write ();
+ write ();
#endif
- }
return true;
}
dcpomatic_log->set_types (dcpomatic_log->types() | LogEntry::TYPE_DIST);
LOG_DIST_NC("dcpomatic_dist_writer started");
-#ifdef DCPOMATIC_WINDOWS
- FreeConsole ();
- AllocConsole ();
-
- HANDLE handle_out = GetStdHandle(STD_OUTPUT_HANDLE);
- int hCrt = _open_osfhandle((intptr_t) handle_out, _O_TEXT);
- FILE* hf_out = _fdopen(hCrt, "w");
- setvbuf(hf_out, NULL, _IONBF, 1);
- *stdout = *hf_out;
-
- HANDLE handle_in = GetStdHandle(STD_INPUT_HANDLE);
- LOG_DIST ("handle_in is %1", reinterpret_cast<uint64_t>(handle_in));
- hCrt = _open_osfhandle((intptr_t) handle_in, _O_TEXT);
- FILE* hf_in = _fdopen(hCrt, "r");
- setvbuf(hf_in, NULL, _IONBF, 128);
- *stdin = *hf_in;
-#endif
+ try {
+ nanomsg = new Nanomsg (false);
+ } catch (runtime_error& e) {
+ LOG_DIST_NC("Could not set up nanomsg socket");
+ exit (EXIT_FAILURE);
+ }
Glib::RefPtr<Glib::MainLoop> ml = Glib::MainLoop::create ();
Glib::signal_timeout().connect(sigc::ptr_fun(&idle), 500);
uselib += 'AVUTIL SWSCALE SWRESAMPLE POSTPROC CURL BOOST_FILESYSTEM SSH ZIP CAIROMM FONTCONFIG PANGOMM SUB '
uselib += 'SNDFILE SAMPLERATE BOOST_REGEX ICU NETTLE RTAUDIO PNG '
- if bld.env.TARGET_LINUX:
- uselib += 'POLKIT '
-
if bld.env.ENABLE_DIST:
- uselib += 'LWEXT4 '
+ if bld.env.TARGET_LINUX:
+ uselib += 'POLKIT '
+ uselib += 'LWEXT4 NANOMSG '
if bld.env.TARGET_WINDOWS:
uselib += 'WINSOCK2 DBGHELP SHLWAPI MSWSOCK BOOST_LOCALE WINSOCK2 OLE32 DSOUND WINMM KSUSER '
if conf.env.TARGET_LINUX and conf.options.enable_dist:
conf.check_cfg(package='polkit-gobject-1', args='--cflags --libs', uselib_store='POLKIT', mandatory=True)
+ # nanomsg
+ if conf.options.enable_dist:
+ conf.check_cfg(package='libnanomsg', args='--cflags --libs', uselib_store='NANOMSG', mandatory=True)
+
# FFmpeg
if conf.options.static_ffmpeg:
names = ['avformat', 'avfilter', 'avcodec', 'avutil', 'swscale', 'postproc', 'swresample']