Work around deadlock when destroying J2KEncoder with a full writer queue (#2784).
[dcpomatic.git] / src / lib / copy_to_drive_job.cc
index fe35daba6c4b0b1b6061617945fd44ed1719e568..7d208e0ecb6f3ac8225085de659e0d0893f9608f 100644 (file)
 
 */
 
-#include "disk_writer_messages.h"
-#include "copy_to_drive_job.h"
+
 #include "compose.hpp"
+#include "copy_to_drive_job.h"
+#include "dcpomatic_log.h"
+#include "disk_writer_messages.h"
 #include "exceptions.h"
 #include <dcp/raw_convert.h>
 #include <nanomsg/nn.h>
-#include <unistd.h>
 #include <fcntl.h>
+#include <unistd.h>
+#include <iostream>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
-#include <iostream>
 
 #include "i18n.h"
 
-using std::string;
+
 using std::cout;
 using std::min;
-using boost::shared_ptr;
+using std::shared_ptr;
+using std::string;
+using boost::optional;
 using dcp::raw_convert;
 
-CopyToDriveJob::CopyToDriveJob (boost::filesystem::path dcp, Drive drive, Nanomsg& nanomsg)
+
+CopyToDriveJob::CopyToDriveJob(std::vector<boost::filesystem::path> const& dcps, Drive drive, Nanomsg& nanomsg)
        : Job (shared_ptr<Film>())
-       , _dcp (dcp)
+       , _dcps (dcps)
        , _drive (drive)
        , _nanomsg (nanomsg)
 {
 
 }
 
+
 string
 CopyToDriveJob::name () const
 {
-       return String::compose (_("Copying %1 to %2"), _dcp.filename().string(), _drive.description());
+       if (_dcps.size() == 1) {
+               return String::compose(_("Copying %1\nto %2"), _dcps[0].filename().string(), _drive.description());
+       }
+
+       return String::compose(_("Copying DCPs to %1"), _drive.description());
 }
 
+
 string
 CopyToDriveJob::json_name () const
 {
@@ -63,30 +74,63 @@ CopyToDriveJob::json_name () const
 void
 CopyToDriveJob::run ()
 {
-       if (!_nanomsg.nonblocking_send(String::compose(DISK_WRITER_WRITE "\n%1\n%2\n", _dcp.string(), _drive.internal_name()))) {
-               throw CopyError ("Could not communicate with writer process", 0);
+       LOG_DISK("Sending write requests to disk %1 for:", _drive.device());
+       for (auto dcp: _dcps) {
+               LOG_DISK("%1", dcp.string());
+       }
+
+       string request = String::compose(DISK_WRITER_WRITE "\n%1\n", _drive.device());
+       for (auto dcp: _dcps) {
+               request += String::compose("%1\n", dcp.string());
        }
+       request += "\n";
+       if (!_nanomsg.send(request, 2000)) {
+               LOG_DISK_NC("Failed to send write request.");
+               throw CommunicationFailedError ();
+       }
+
+       enum State {
+               SETUP,
+               FORMAT,
+               COPY,
+               VERIFY
+       } state = SETUP;
 
-       bool formatting = false;
        while (true) {
-               string s = _nanomsg.blocking_get ();
-               if (s == DISK_WRITER_OK) {
+               auto response = DiskWriterBackEndResponse::read_from_nanomsg(_nanomsg, 10000);
+               if (!response) {
+                       continue;
+               }
+
+               switch (response->type()) {
+               case DiskWriterBackEndResponse::Type::OK:
                        set_state (FINISHED_OK);
                        return;
-               } else if (s == DISK_WRITER_ERROR) {
-                       string const m = _nanomsg.blocking_get ();
-                       string const n = _nanomsg.blocking_get ();
-                       throw CopyError (m, raw_convert<int>(n));
-               } else if (s == DISK_WRITER_FORMATTING) {
-                       sub ("Formatting drive");
-                       set_progress_unknown ();
-                       formatting = true;
-               } else if (s == DISK_WRITER_PROGRESS) {
-                       if (formatting) {
-                               sub ("Copying DCP");
-                               formatting = false;
+               case DiskWriterBackEndResponse::Type::PONG:
+                       break;
+               case DiskWriterBackEndResponse::Type::ERROR:
+                       throw CopyError(response->error_message(), response->ext4_error_number(), response->platform_error_number());
+               case DiskWriterBackEndResponse::Type::FORMAT_PROGRESS:
+                       if (state == SETUP) {
+                               sub (_("Formatting drive"));
+                               state = FORMAT;
+                       }
+                       set_progress(response->progress());
+                       break;
+               case DiskWriterBackEndResponse::Type::COPY_PROGRESS:
+                       if (state == FORMAT) {
+                               sub (_("Copying DCP"));
+                               state = COPY;
+                       }
+                       set_progress(response->progress());
+                       break;
+               case DiskWriterBackEndResponse::Type::VERIFY_PROGRESS:
+                       if (state == COPY) {
+                               sub (_("Verifying copied files"));
+                               state = VERIFY;
                        }
-                       set_progress (raw_convert<float>(_nanomsg.blocking_get()));
+                       set_progress(response->progress());
+                       break;
                }
        }
 }