From 7cd7b360169493d864206e2cdfb4f688cf5a12cf Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Wed, 10 Jun 2015 16:12:40 +0100 Subject: [PATCH] Resurrect neater repeat-write handling. --- src/lib/encoder.cc | 82 ++++++++++++++++++--------------------- src/lib/encoder.h | 6 ++- src/lib/util.cc | 44 --------------------- src/lib/util.h | 2 - src/lib/writer.cc | 96 ++++++++++++++++++++++++++++++++++++++++++---- src/lib/writer.h | 11 ++++++ 6 files changed, 142 insertions(+), 99 deletions(-) diff --git a/src/lib/encoder.cc b/src/lib/encoder.cc index 83baacd41..6eb295412 100644 --- a/src/lib/encoder.cc +++ b/src/lib/encoder.cc @@ -62,7 +62,7 @@ int const Encoder::_history_size = 25; Encoder::Encoder (shared_ptr f, weak_ptr j, shared_ptr writer) : _film (f) , _job (j) - , _video_frames_out (0) + , _video_frames_enqueued (0) , _terminate (false) , _writer (writer) { @@ -168,7 +168,7 @@ int Encoder::video_frames_out () const { boost::mutex::scoped_lock (_state_mutex); - return _video_frames_out; + return _video_frames_enqueued; } /** Should be called when a frame has been encoded successfully. @@ -217,20 +217,22 @@ Encoder::enqueue (shared_ptr pv) */ rethrow (); - if (_writer->can_fake_write (_video_frames_out)) { + if (_writer->can_fake_write (_video_frames_enqueued)) { /* We can fake-write this frame */ - _writer->fake_write (_video_frames_out, pv->eyes ()); + _writer->fake_write (_video_frames_enqueued, pv->eyes ()); frame_done (); + } else if (_last_player_video && pv->same (_last_player_video)) { + _writer->repeat (_video_frames_enqueued, pv->eyes ()); } else if (pv->has_j2k ()) { /* This frame already has JPEG2000 data, so just write it */ - _writer->write (pv->j2k(), _video_frames_out, pv->eyes ()); + _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ()); } else { /* Queue this new frame for encoding */ LOG_TIMING ("adding to queue of %1", _queue.size ()); _queue.push_back (shared_ptr ( new DCPVideo ( pv, - _video_frames_out, + _video_frames_enqueued, _film->video_frame_rate(), _film->j2k_bandwidth(), _film->resolution(), @@ -246,8 +248,10 @@ Encoder::enqueue (shared_ptr pv) } if (pv->eyes() != EYES_LEFT) { - ++_video_frames_out; + ++_video_frames_enqueued; } + + _last_player_video = pv; } void @@ -279,8 +283,6 @@ try encodings. */ int remote_backoff = 0; - shared_ptr last_dcp_video; - optional last_encoded; while (true) { @@ -303,47 +305,39 @@ try optional encoded; - if (last_dcp_video && vf->same (last_dcp_video)) { - /* We already have encoded data for the same input as this one, so take a short-cut */ - encoded = last_encoded; - } else { - /* We need to encode this input */ - if (server) { - try { - encoded = vf->encode_remotely (server.get ()); - - if (remote_backoff > 0) { - LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ()); - } - - /* This job succeeded, so remove any backoff */ - remote_backoff = 0; - - } catch (std::exception& e) { - if (remote_backoff < 60) { - /* back off more */ - remote_backoff += 10; - } - LOG_ERROR ( - N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), - vf->index(), server->host_name(), e.what(), remote_backoff - ); + /* We need to encode this input */ + if (server) { + try { + encoded = vf->encode_remotely (server.get ()); + + if (remote_backoff > 0) { + LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ()); } - } else { - try { - LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index()); - encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)); - LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index()); - } catch (std::exception& e) { - LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); + /* This job succeeded, so remove any backoff */ + remote_backoff = 0; + + } catch (std::exception& e) { + if (remote_backoff < 60) { + /* back off more */ + remote_backoff += 10; } + LOG_ERROR ( + N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), + vf->index(), server->host_name(), e.what(), remote_backoff + ); + } + + } else { + try { + LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index()); + encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)); + LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index()); + } catch (std::exception& e) { + LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); } } - last_dcp_video = vf; - last_encoded = encoded; - if (encoded) { _writer->write (encoded.get(), vf->index (), vf->eyes ()); frame_done (); diff --git a/src/lib/encoder.h b/src/lib/encoder.h index 3b8c233bb..8b7c1f3ad 100644 --- a/src/lib/encoder.h +++ b/src/lib/encoder.h @@ -98,8 +98,8 @@ private: /** Number of frames that we should keep history for */ static int const _history_size; - /** Number of video frames written for the DCP so far */ - int _video_frames_out; + /** Number of video frames enqueued so far */ + int _video_frames_enqueued; bool _terminate; std::list > _queue; @@ -113,6 +113,8 @@ private: boost::shared_ptr _writer; Waker _waker; + boost::shared_ptr _last_player_video; + boost::signals2::scoped_connection _server_found_connection; }; diff --git a/src/lib/util.cc b/src/lib/util.cc index 7d2ae2216..92b8847bd 100644 --- a/src/lib/util.cc +++ b/src/lib/util.cc @@ -631,50 +631,6 @@ split_get_request (string url) return r; } -long -frame_info_position (int frame, Eyes eyes) -{ - static int const info_size = 48; - - switch (eyes) { - case EYES_BOTH: - return frame * info_size; - case EYES_LEFT: - return frame * info_size * 2; - case EYES_RIGHT: - return frame * info_size * 2 + info_size; - default: - DCPOMATIC_ASSERT (false); - } - - DCPOMATIC_ASSERT (false); -} - -dcp::FrameInfo -read_frame_info (FILE* file, int frame, Eyes eyes) -{ - dcp::FrameInfo info; - dcpomatic_fseek (file, frame_info_position (frame, eyes), SEEK_SET); - fread (&info.offset, sizeof (info.offset), 1, file); - fread (&info.size, sizeof (info.size), 1, file); - - char hash_buffer[33]; - fread (hash_buffer, 1, 32, file); - hash_buffer[32] = '\0'; - info.hash = hash_buffer; - - return info; -} - -void -write_frame_info (FILE* file, int frame, Eyes eyes, dcp::FrameInfo info) -{ - dcpomatic_fseek (file, frame_info_position (frame, eyes), SEEK_SET); - fwrite (&info.offset, sizeof (info.offset), 1, file); - fwrite (&info.size, sizeof (info.size), 1, file); - fwrite (info.hash.c_str(), 1, info.hash.size(), file); -} - string video_asset_filename (shared_ptr asset) { diff --git a/src/lib/util.h b/src/lib/util.h index f908d4fed..da6f660a7 100644 --- a/src/lib/util.h +++ b/src/lib/util.h @@ -99,8 +99,6 @@ public: extern FFmpegSubtitlePeriod subtitle_period (AVSubtitle const &); extern void set_backtrace_file (boost::filesystem::path); -extern dcp::FrameInfo read_frame_info (FILE* file, int frame, Eyes eyes); -extern void write_frame_info (FILE* file, int frame, Eyes eyes, dcp::FrameInfo info); extern std::map split_get_request (std::string url); extern std::string video_asset_filename (boost::shared_ptr asset); extern std::string audio_asset_filename (boost::shared_ptr asset); diff --git a/src/lib/writer.cc b/src/lib/writer.cc index 527fb3701..5c711ef92 100644 --- a/src/lib/writer.cc +++ b/src/lib/writer.cc @@ -86,6 +86,7 @@ Writer::Writer (shared_ptr f, weak_ptr j) , _maximum_frames_in_memory (0) , _full_written (0) , _fake_written (0) + , _repeat_written (0) , _pushed_to_disk (0) { /* Remove any old DCP */ @@ -190,6 +191,33 @@ Writer::write (Data encoded, int frame, Eyes eyes) _empty_condition.notify_all (); } +void +Writer::repeat (int frame, Eyes eyes) +{ + boost::mutex::scoped_lock lock (_mutex); + + while (_queued_full_in_memory > _maximum_frames_in_memory) { + /* The queue is too big; wait until that is sorted out */ + _full_condition.wait (lock); + } + + QueueItem qi; + qi.type = QueueItem::REPEAT; + qi.frame = frame; + if (_film->three_d() && eyes == EYES_BOTH) { + qi.eyes = EYES_LEFT; + _queue.push_back (qi); + qi.eyes = EYES_RIGHT; + _queue.push_back (qi); + } else { + qi.eyes = eyes; + _queue.push_back (qi); + } + + /* Now there's something to do: wake anything wait()ing on _empty_condition */ + _empty_condition.notify_all (); +} + void Writer::fake_write (int frame, Eyes eyes) { @@ -264,6 +292,20 @@ Writer::have_sequenced_image_at_queue_head () return false; } +void +Writer::write_frame_info (int frame, Eyes eyes, dcp::FrameInfo info) const +{ + FILE* file = fopen_boost (_film->info_file(), "ab"); + if (!file) { + throw OpenFileError (_film->info_file ()); + } + dcpomatic_fseek (file, frame_info_position (frame, eyes), SEEK_SET); + fwrite (&info.offset, sizeof (info.offset), 1, file); + fwrite (&info.size, sizeof (info.size), 1, file); + fwrite (info.hash.c_str(), 1, info.hash.size(), file); + fclose (file); +} + void Writer::thread () try @@ -332,12 +374,7 @@ try } dcp::FrameInfo fin = _picture_asset_writer->write (qi.encoded->data().get (), qi.encoded->size()); - FILE* file = fopen_boost (_film->info_file(), "ab"); - if (!file) { - throw OpenFileError (_film->info_file ()); - } - write_frame_info (file, qi.frame, qi.eyes, fin); - fclose (file); + write_frame_info (qi.frame, qi.eyes, fin); _last_written[qi.eyes] = qi.encoded; ++_full_written; break; @@ -348,6 +385,15 @@ try _last_written[qi.eyes].reset (); ++_fake_written; break; + case QueueItem::REPEAT: + LOG_GENERAL (N_("Writer REPEAT-writes %1"), qi.frame); + dcp::FrameInfo fin = _picture_asset_writer->write ( + _last_written[qi.eyes]->data().get(), + _last_written[qi.eyes]->size() + ); + write_frame_info (qi.frame, qi.eyes, fin); + ++_repeat_written; + break; } lock.lock (); @@ -578,7 +624,7 @@ Writer::finish () dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer); LOG_GENERAL ( - N_("Wrote %1 FULL, %2 FAKE, %3 pushed to disk"), _full_written, _fake_written, _pushed_to_disk + N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk ); } @@ -728,3 +774,39 @@ Writer::set_encoder_threads (int threads) { _maximum_frames_in_memory = rint (threads * 1.1); } + +long +Writer::frame_info_position (int frame, Eyes eyes) const +{ + static int const info_size = 48; + + switch (eyes) { + case EYES_BOTH: + return frame * info_size; + case EYES_LEFT: + return frame * info_size * 2; + case EYES_RIGHT: + return frame * info_size * 2 + info_size; + default: + DCPOMATIC_ASSERT (false); + } + + + DCPOMATIC_ASSERT (false); +} + +dcp::FrameInfo +Writer::read_frame_info (FILE* file, int frame, Eyes eyes) const +{ + dcp::FrameInfo info; + dcpomatic_fseek (file, frame_info_position (frame, eyes), SEEK_SET); + fread (&info.offset, sizeof (info.offset), 1, file); + fread (&info.size, sizeof (info.size), 1, file); + + char hash_buffer[33]; + fread (hash_buffer, 1, 32, file); + hash_buffer[32] = '\0'; + info.hash = hash_buffer; + + return info; +} diff --git a/src/lib/writer.h b/src/lib/writer.h index 15fa0d33f..90b36962c 100644 --- a/src/lib/writer.h +++ b/src/lib/writer.h @@ -25,6 +25,7 @@ #include "types.h" #include "player_subtitles.h" #include "data.h" +#include #include #include #include @@ -52,6 +53,10 @@ namespace dcp { struct QueueItem { public: + QueueItem () + : size (0) + {} + enum Type { /** a normal frame with some JPEG200 data */ FULL, @@ -60,6 +65,7 @@ public: state but we use the data that is already on disk. */ FAKE, + REPEAT, } type; /** encoded data for FULL */ @@ -95,6 +101,7 @@ public: void write (Data, int, Eyes); void fake_write (int, Eyes); + void repeat (int, Eyes); void write (boost::shared_ptr); void write (PlayerSubtitles subs); void write (std::list > fonts); @@ -109,6 +116,9 @@ private: void check_existing_picture_asset (); bool check_existing_picture_asset_frame (FILE *, int, Eyes); bool have_sequenced_image_at_queue_head (); + void write_frame_info (int frame, Eyes eyes, dcp::FrameInfo info) const; + long frame_info_position (int frame, Eyes eyes) const; + dcp::FrameInfo read_frame_info (FILE* file, int frame, Eyes eyes) const; /** our Film */ boost::shared_ptr _film; @@ -144,6 +154,7 @@ private: int _full_written; /** number of FAKE written frames */ int _fake_written; + int _repeat_written; /** number of frames pushed to disk and then recovered due to the limit of frames to be held in memory. */ -- 2.30.2