Improve progress reporting of digest calculations (might help with #2643).
authorCarl Hetherington <cth@carlh.net>
Sun, 12 Nov 2023 21:09:48 +0000 (22:09 +0100)
committerCarl Hetherington <cth@carlh.net>
Mon, 20 Nov 2023 06:29:15 +0000 (07:29 +0100)
cscript
src/lib/job.cc
src/lib/job.h
src/lib/reel_writer.cc
src/lib/reel_writer.h
src/lib/writer.cc
src/lib/writer.h
test/writer_test.cc

diff --git a/cscript b/cscript
index 795507e18fe9a1f7056dc80f8ae70fd1443f8f0a..9d5a72eafd99fc3a5af16bf769988a69856fe439 100644 (file)
--- a/cscript
+++ b/cscript
@@ -508,7 +508,7 @@ def dependencies(target, options):
         # Use distro-provided FFmpeg on Arch
         deps = []
 
-    deps.append(('libdcp', 'e3fa86ef35f212b14b593dd36dbff66e845d37e4'))
+    deps.append(('libdcp', 'c46f6125c482f2a3361cd33d1e1163927f038e9d'))
     deps.append(('libsub', 'v1.6.44'))
     deps.append(('leqm-nrt', '30dcaea1373ac62fba050e02ce5b0c1085797a23'))
     deps.append(('rtaudio', 'f619b76'))
index 28bdde7fc2dd9d34d52643356162d388e7ae35a5..2a4c891edf832fce2c6a091061452fa332604c36 100644 (file)
@@ -59,6 +59,7 @@ Job::Job (shared_ptr<const Film> film)
        , _state (NEW)
        , _sub_start_time (0)
        , _progress (0)
+       , _rate_limit_progress(true)
 {
 
 }
@@ -432,7 +433,7 @@ Job::set_progress (float p, bool force)
 {
        check_for_interruption_or_pause ();
 
-       if (!force) {
+       if (!force && _rate_limit_progress) {
                /* Check for excessively frequent progress reporting */
                boost::mutex::scoped_lock lm (_progress_mutex);
                struct timeval now;
@@ -735,3 +736,11 @@ Job::set_message (string m)
        boost::mutex::scoped_lock lm (_state_mutex);
        _message = m;
 }
+
+
+void
+Job::set_rate_limit_progress(bool rate_limit)
+{
+       _rate_limit_progress = rate_limit;
+}
+
index dc5f7bc34074041b85536978aa8554dc336a9db4..7814e0c2d75452f84912854df972a29162a9e548 100644 (file)
@@ -99,6 +99,8 @@ public:
 
        void when_finished(boost::signals2::connection& connection, std::function<void(Result)> finished);
 
+       void set_rate_limit_progress(bool rate_limit);
+
        boost::signals2::signal<void()> Progress;
        /** Emitted from the UI thread when the job is finished */
        boost::signals2::signal<void (Result)> Finished;
@@ -159,6 +161,11 @@ private:
        boost::optional<float> _progress;
        boost::optional<struct timeval> _last_progress_update;
 
+       /** true to limit emissions of the progress signal so that they don't
+        *  come too often.
+        */
+       boost::atomic<bool> _rate_limit_progress;
+
        /** condition to signal changes to pause/resume so that we know when to wake;
            this could be a general _state_change if it made more sense.
        */
index ca4a2dbb192f3838fda5679372c0572bdabf0e7e..1b33cae852eedcbb1ed5330d9e20bf52bc77bd03 100644 (file)
@@ -773,21 +773,39 @@ ReelWriter::create_reel (
        return reel;
 }
 
+
+/** @param set_progress Method to call with progress; first parameter is the number of bytes
+ *  done, second parameter is the number of bytes in total.
+ */
 void
-ReelWriter::calculate_digests (std::function<void (float)> set_progress)
+ReelWriter::calculate_digests(std::function<void (int64_t, int64_t)> set_progress)
 try
 {
+       vector<shared_ptr<const dcp::Asset>> assets;
+
        if (_picture_asset) {
-               _picture_asset->hash (set_progress);
+               assets.push_back(_picture_asset);
        }
-
        if (_sound_asset) {
-               _sound_asset->hash (set_progress);
+               assets.push_back(_sound_asset);
        }
-
        if (_atmos_asset) {
-               _atmos_asset->hash (set_progress);
+               assets.push_back(_atmos_asset);
        }
+
+       int64_t total_size = 0;
+       for (auto asset: assets) {
+               total_size += asset->file() ? boost::filesystem::file_size(*asset->file()) : 0;
+       }
+
+       int64_t total_done = 0;
+       for (auto asset: assets) {
+               asset->hash([&total_done, total_size, set_progress](int64_t done, int64_t) {
+                       set_progress(total_done + done, total_size);
+               });
+               total_done += asset->file() ? boost::filesystem::file_size(*asset->file()) : 0;
+       }
+
 } catch (boost::thread_interrupted) {
        /* set_progress contains an interruption_point, so any of these methods
         * may throw thread_interrupted, at which point we just give up.
index fff298cb7be6d0512b36902ba53aed138067bc99..c9052c832fd8570223bc8e4b6b6166bb14a2b25a 100644 (file)
@@ -82,7 +82,7 @@ public:
                bool ensure_subtitles,
                std::set<DCPTextTrack> ensure_closed_captions
                );
-       void calculate_digests (std::function<void (float)> set_progress);
+       void calculate_digests(std::function<void (int64_t, int64_t)> set_progress);
 
        Frame start () const;
 
index 9ab3d4e1edaf3be63d2f335b16677b96f09685b4..8863816e868b9b92a9e2b19be5fc65e6dfa7412c 100644 (file)
@@ -531,19 +531,32 @@ Writer::calculate_digests ()
                pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
        }
 
-       std::function<void (float)> set_progress;
+       std::function<void (int, int64_t, int64_t)> set_progress;
        if (job) {
-               set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
+               set_progress = boost::bind(&Writer::set_digest_progress, this, job.get(), _1, _2, _3);
        } else {
-               set_progress = [](float) {
+               set_progress = [](int, int64_t, int64_t) {
                        boost::this_thread::interruption_point();
                };
        }
 
+       int index = 0;
+
        for (auto& i: _reels) {
-               service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
+               service.post(
+                       boost::bind(
+                               &ReelWriter::calculate_digests,
+                               &i,
+                               std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
+                               ));
+               ++index;
        }
-       service.post (boost::bind (&Writer::calculate_referenced_digests, this, set_progress));
+       service.post(
+               boost::bind(
+                       &Writer::calculate_referenced_digests,
+                       this,
+                       std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
+                       ));
 
        work.reset ();
 
@@ -934,18 +947,29 @@ Writer::video_reel (int frame) const
 }
 
 
+/** Update job progress with information about the progress of a single digest calculation
+ *  thread.
+ *  @param id Unique identifier for the thread whose progress has changed.
+ *  @param done Number of bytes that this thread has processed.
+ *  @param size Total number of bytes that this thread must process.
+ */
 void
-Writer::set_digest_progress (Job* job, float progress)
+Writer::set_digest_progress(Job* job, int id, int64_t done, int64_t size)
 {
        boost::mutex::scoped_lock lm (_digest_progresses_mutex);
 
-       _digest_progresses[boost::this_thread::get_id()] = progress;
-       float min_progress = FLT_MAX;
+       /* Update the progress for this thread */
+       _digest_progresses[id] = std::make_pair(done, size);
+
+       /* Get the total progress across all threads and use it to set job progress */
+       int64_t total_done = 0;
+       int64_t total_size = 0;
        for (auto const& i: _digest_progresses) {
-               min_progress = min (min_progress, i.second);
+               total_done += i.second.first;
+               total_size += i.second.second;
        }
 
-       job->set_progress (min_progress);
+       job->set_progress(float(total_done) / total_size);
 
        Waker waker;
        waker.nudge ();
@@ -956,13 +980,27 @@ Writer::set_digest_progress (Job* job, float progress)
 
 /** Calculate hashes for any referenced MXF assets which do not already have one */
 void
-Writer::calculate_referenced_digests (std::function<void (float)> set_progress)
+Writer::calculate_referenced_digests(std::function<void (int64_t, int64_t)> set_progress)
 try
 {
+       int64_t total_size = 0;
+       for (auto const& i: _reel_assets) {
+               auto file = dynamic_pointer_cast<dcp::ReelFileAsset>(i.asset);
+               if (file && !file->hash()) {
+                       auto filename = file->asset_ref().asset()->file();
+                       DCPOMATIC_ASSERT(filename);
+                       total_size += boost::filesystem::file_size(*filename);
+               }
+       }
+
+       int64_t total_done = 0;
        for (auto const& i: _reel_assets) {
                auto file = dynamic_pointer_cast<dcp::ReelFileAsset>(i.asset);
                if (file && !file->hash()) {
-                       file->asset_ref().asset()->hash (set_progress);
+                       file->asset_ref().asset()->hash([&total_done, total_size, set_progress](int64_t done, int64_t) {
+                               set_progress(total_done + done, total_size);
+                       });
+                       total_done += boost::filesystem::file_size(*file->asset_ref().asset()->file());
                        file->set_hash (file->asset_ref().asset()->hash());
                }
        }
index 1fbf7bbd56b0d72e3cea39de85487ae5e2605870..efb6a17d8d54448b740e2b3fc238010c6abff01f 100644 (file)
@@ -134,9 +134,9 @@ private:
        void terminate_thread (bool);
        bool have_sequenced_image_at_queue_head ();
        size_t video_reel (int frame) const;
-       void set_digest_progress (Job* job, float progress);
+       void set_digest_progress(Job* job, int id, int64_t done, int64_t size);
        void write_cover_sheet (boost::filesystem::path output_dcp);
-       void calculate_referenced_digests (std::function<void (float)> set_progress);
+       void calculate_referenced_digests(std::function<void (int64_t, int64_t)> set_progress);
        void write_hanging_text (ReelWriter& reel);
        void calculate_digests ();
 
@@ -204,7 +204,7 @@ private:
        bool _text_only;
 
        boost::mutex _digest_progresses_mutex;
-       std::map<boost::thread::id, float> _digest_progresses;
+       std::map<int, std::pair<int64_t, int64_t>> _digest_progresses;
 
        std::list<ReferencedReelAsset> _reel_assets;
 
index 76e9ddb28799aa46575e78d93bb4388ed50217f5..86b60818f18f31fbf3361b8e215877bc104dd869 100644 (file)
@@ -23,6 +23,7 @@
 #include "lib/content.h"
 #include "lib/content_factory.h"
 #include "lib/cross.h"
+#include "lib/dcp_encoder.h"
 #include "lib/film.h"
 #include "lib/job.h"
 #include "lib/video_content.h"
@@ -36,6 +37,7 @@
 
 using std::make_shared;
 using std::shared_ptr;
+using std::string;
 using std::vector;
 
 
@@ -101,3 +103,53 @@ BOOST_AUTO_TEST_CASE (interrupt_writer)
        dcpomatic_sleep_seconds (1);
        cl.run ();
 }
+
+
+BOOST_AUTO_TEST_CASE(writer_progress_test)
+{
+       class TestJob : public Job
+       {
+       public:
+               explicit TestJob(shared_ptr<const Film> film)
+                       : Job(film)
+               {}
+
+               ~TestJob()
+               {
+                       stop_thread();
+               }
+
+               std::string name() const override {
+                       return "test";
+               }
+               std::string json_name() const override {
+                       return "test";
+               }
+               void run() override {};
+       };
+
+       auto picture1 = content_factory("test/data/flat_red.png")[0];
+       auto picture2 = content_factory("test/data/flat_red.png")[0];
+
+       auto film = new_test_film2("writer_progress_test", { picture1, picture2 });
+       film->set_reel_type(ReelType::BY_VIDEO_CONTENT);
+       picture1->video->set_length(240);
+       picture2->video->set_length(240);
+       picture2->set_position(film, dcpomatic::DCPTime::from_seconds(10));
+
+       auto job = std::make_shared<TestJob>(film);
+       job->set_rate_limit_progress(false);
+
+       float last_progress = 0;
+       string last_sub_name;
+       boost::signals2::scoped_connection connection = job->Progress.connect([job, &last_progress, &last_sub_name]() {
+               auto const progress = job->progress().get_value_or(0);
+               BOOST_REQUIRE(job->sub_name() != last_sub_name || progress >= last_progress);
+               last_progress = progress;
+               last_sub_name = job->sub_name();
+       });
+
+       DCPEncoder encoder(film, job);
+       encoder.go();
+}
+