pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
}
- std::function<void (float)> set_progress;
+ std::function<void (int, int64_t, int64_t)> set_progress;
if (job) {
- set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
+ set_progress = boost::bind(&Writer::set_digest_progress, this, job.get(), _1, _2, _3);
} else {
- set_progress = [](float) {
+ set_progress = [](int, int64_t, int64_t) {
boost::this_thread::interruption_point();
};
}
+ int index = 0;
+
for (auto& i: _reels) {
- service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
+ service.post(
+ boost::bind(
+ &ReelWriter::calculate_digests,
+ &i,
+ std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
+ ));
+ ++index;
}
- service.post (boost::bind (&Writer::calculate_referenced_digests, this, set_progress));
+ service.post(
+ boost::bind(
+ &Writer::calculate_referenced_digests,
+ this,
+ std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
+ ));
work.reset ();
}
+/** Update job progress with information about the progress of a single digest calculation
+ * thread.
+ * @param id Unique identifier for the thread whose progress has changed.
+ * @param done Number of bytes that this thread has processed.
+ * @param size Total number of bytes that this thread must process.
+ */
void
-Writer::set_digest_progress (Job* job, float progress)
+Writer::set_digest_progress(Job* job, int id, int64_t done, int64_t size)
{
boost::mutex::scoped_lock lm (_digest_progresses_mutex);
- _digest_progresses[boost::this_thread::get_id()] = progress;
- float min_progress = FLT_MAX;
+ /* Update the progress for this thread */
+ _digest_progresses[id] = std::make_pair(done, size);
+
+ /* Get the total progress across all threads and use it to set job progress */
+ int64_t total_done = 0;
+ int64_t total_size = 0;
for (auto const& i: _digest_progresses) {
- min_progress = min (min_progress, i.second);
+ total_done += i.second.first;
+ total_size += i.second.second;
}
- job->set_progress (min_progress);
+ job->set_progress(float(total_done) / total_size);
Waker waker;
waker.nudge ();
/** Calculate hashes for any referenced MXF assets which do not already have one */
void
-Writer::calculate_referenced_digests (std::function<void (float)> set_progress)
+Writer::calculate_referenced_digests(std::function<void (int64_t, int64_t)> set_progress)
try
{
+ int64_t total_size = 0;
+ for (auto const& i: _reel_assets) {
+ auto file = dynamic_pointer_cast<dcp::ReelFileAsset>(i.asset);
+ if (file && !file->hash()) {
+ auto filename = file->asset_ref().asset()->file();
+ DCPOMATIC_ASSERT(filename);
+ total_size += boost::filesystem::file_size(*filename);
+ }
+ }
+
+ int64_t total_done = 0;
for (auto const& i: _reel_assets) {
auto file = dynamic_pointer_cast<dcp::ReelFileAsset>(i.asset);
if (file && !file->hash()) {
- file->asset_ref().asset()->hash (set_progress);
+ file->asset_ref().asset()->hash([&total_done, total_size, set_progress](int64_t done, int64_t) {
+ set_progress(total_done + done, total_size);
+ });
+ total_done += boost::filesystem::file_size(*file->asset_ref().asset()->file());
file->set_hash (file->asset_ref().asset()->hash());
}
}