X-Git-Url: https://main.carlh.net/gitweb/?p=dcpomatic.git;a=blobdiff_plain;f=src%2Flib%2Fj2k_encoder.cc;h=10496671c4f5748e6e77b88776f96cab41589f94;hp=e62e708cc608492c9a832821e0c632c1b6cec26a;hb=HEAD;hpb=9106e6ed551b13e1b7c7ee2088d54ce0ae430bcf diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index e62e708cc..32d2fefc2 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -1,5 +1,5 @@ /* - Copyright (C) 2012-2015 Carl Hetherington + Copyright (C) 2012-2021 Carl Hetherington This file is part of DCP-o-matic. @@ -18,86 +18,81 @@ */ + /** @file src/j2k_encoder.cc * @brief J2K encoder class. */ -#include "j2k_encoder.h" -#include "util.h" -#include "film.h" -#include "log.h" + +#include "compose.hpp" #include "config.h" -#include "dcp_video.h" #include "cross.h" -#include "writer.h" +#include "dcp_video.h" +#include "dcpomatic_log.h" +#include "encode_server_description.h" #include "encode_server_finder.h" -#include "player.h" +#include "film.h" +#include "j2k_encoder.h" +#include "log.h" #include "player_video.h" -#include "encode_server_description.h" -#include "compose.hpp" +#include "util.h" +#include "writer.h" #include -#include #include #include "i18n.h" -#define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL); -#define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL); -#define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR); -#define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING); -#define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE); -using std::list; using std::cout; -using boost::shared_ptr; -using boost::weak_ptr; +using std::exception; +using std::list; +using std::make_shared; +using std::shared_ptr; +using std::weak_ptr; using boost::optional; using dcp::Data; +using namespace dcpomatic; + /** @param film Film that we are encoding. * @param writer Writer that we are using. */ -J2KEncoder::J2KEncoder (shared_ptr film, shared_ptr writer) +J2KEncoder::J2KEncoder(shared_ptr film, Writer& writer) : _film (film) , _history (200) , _writer (writer) { - servers_list_changed (); } + J2KEncoder::~J2KEncoder () { - try { - terminate_threads (); - } catch (...) { - /* Destructors must not throw exceptions; anything bad - happening now is too late to worry about anyway, - I think. - */ - } + _server_found_connection.disconnect(); + + /* One of our encoder threads may be waiting on Writer::write() to return, if that method + * is blocked with the writer queue full waiting for _full_condition. In that case, the + * attempt to terminate the encoder threads below (in terminate_threads()) will fail because + * the encoder thread waiting for ::write() will have interruption disabled. + * + * To work around that, make the writer into a zombie to unblock any pending write()s and + * not block on any future ones. + */ + _writer.zombify(); + + boost::mutex::scoped_lock lm (_threads_mutex); + terminate_threads (); } + void J2KEncoder::begin () { - weak_ptr wp = shared_from_this (); - _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect ( - boost::bind (&J2KEncoder::call_servers_list_changed, wp) + _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect( + boost::bind(&J2KEncoder::servers_list_changed, this) ); + servers_list_changed (); } -/* We don't want the servers-list-changed callback trying to do things - during destruction of J2KEncoder, and I think this is the neatest way - to achieve that. -*/ -void -J2KEncoder::call_servers_list_changed (weak_ptr encoder) -{ - shared_ptr e = encoder.lock (); - if (e) { - e->servers_list_changed (); - } -} void J2KEncoder::end () @@ -117,7 +112,13 @@ J2KEncoder::end () LOG_GENERAL_NC (N_("Terminating encoder threads")); - terminate_threads (); + { + boost::mutex::scoped_lock lm (_threads_mutex); + terminate_threads (); + } + + /* Something might have been thrown during terminate_threads */ + rethrow (); LOG_GENERAL (N_("Mopping up %1"), _queue.size()); @@ -130,13 +131,13 @@ J2KEncoder::end () So just mop up anything left in the queue here. */ - for (list >::iterator i = _queue.begin(); i != _queue.end(); ++i) { - LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ()); + for (auto const& i: _queue) { + LOG_GENERAL(N_("Encode left-over frame %1"), i.index()); try { - _writer->write ( - (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)), - (*i)->index (), - (*i)->eyes () + _writer.write( + make_shared(i.encode_locally()), + i.index(), + i.eyes() ); frame_done (); } catch (std::exception& e) { @@ -145,15 +146,17 @@ J2KEncoder::end () } } + /** @return an estimate of the current number of frames we are encoding per second, - * or 0 if not known. + * if known. */ -float +optional J2KEncoder::current_encoding_rate () const { return _history.rate (); } + /** @return Number of video frames that have been queued for encoding */ int J2KEncoder::video_frames_enqueued () const @@ -165,6 +168,7 @@ J2KEncoder::video_frames_enqueued () const return _last_player_video_time->frames_floor (_film->video_frame_rate ()); } + /** Should be called when a frame has been encoded successfully */ void J2KEncoder::frame_done () @@ -172,6 +176,7 @@ J2KEncoder::frame_done () _history.event (); } + /** Called to request encoding of the next video frame in the DCP. This is called in order, * so each time the supplied frame is the one after the previous one. * pv represents one video frame, and could be empty if there is nothing to encode @@ -187,8 +192,8 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) size_t threads = 0; { - boost::mutex::scoped_lock threads_lock (_threads_mutex); - threads = _threads.size (); + boost::mutex::scoped_lock lm (_threads_mutex); + threads = _threads->size(); } boost::mutex::scoped_lock queue_lock (_queue_mutex); @@ -202,41 +207,39 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads); } - _writer->rethrow (); + _writer.rethrow(); /* Re-throw any exception raised by one of our threads. If more than one has thrown an exception, only one will be rethrown, I think; but then, if that happens something has gone badly wrong. */ rethrow (); - Frame const position = time.frames_floor(_film->video_frame_rate()); + auto const position = time.frames_floor(_film->video_frame_rate()); - if (_writer->can_fake_write (position)) { + if (_writer.can_fake_write(position)) { /* We can fake-write this frame */ LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time)); - _writer->fake_write (position, pv->eyes ()); + _writer.fake_write(position, pv->eyes ()); frame_done (); - } else if (pv->has_j2k ()) { + } else if (pv->has_j2k() && !_film->reencode_j2k()) { LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time)); /* This frame already has J2K data, so just write it */ - _writer->write (pv->j2k(), position, pv->eyes ()); - } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) { + _writer.write(pv->j2k(), position, pv->eyes ()); + frame_done (); + } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) { LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time)); - _writer->repeat (position, pv->eyes ()); + _writer.repeat(position, pv->eyes()); } else { LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time)); /* Queue this new frame for encoding */ LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ()); - _queue.push_back (shared_ptr ( - new DCPVideo ( - pv, - position, - _film->video_frame_rate(), - _film->j2k_bandwidth(), - _film->resolution(), - _film->log() - ) - )); + _queue.push_back (DCPVideo( + pv, + position, + _film->video_frame_rate(), + _film->j2k_bandwidth(), + _film->resolution() + )); /* The queue might not be empty any more, so notify anything which is waiting on that. @@ -248,33 +251,36 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) _last_player_video_time = time; } + +/** Caller must hold a lock on _threads_mutex */ void J2KEncoder::terminate_threads () { - boost::mutex::scoped_lock threads_lock (_threads_mutex); + boost::this_thread::disable_interruption dis; - int n = 0; - for (list::iterator i = _threads.begin(); i != _threads.end(); ++i) { - LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ()); - (*i)->interrupt (); - DCPOMATIC_ASSERT ((*i)->joinable ()); - try { - (*i)->join (); - } catch (boost::thread_interrupted& e) { - /* This is to be expected */ - } - delete *i; - LOG_GENERAL_NC ("Thread terminated"); - ++n; + if (!_threads) { + return; + } + + _threads->interrupt_all (); + try { + _threads->join_all (); + } catch (exception& e) { + LOG_ERROR ("join() threw an exception: %1", e.what()); + } catch (...) { + LOG_ERROR_NC ("join() threw an exception"); } - _threads.clear (); + _threads.reset (); } + void J2KEncoder::encoder_thread (optional server) try { + start_of_thread ("J2KEncoder"); + if (server) { LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ()); } else { @@ -296,7 +302,7 @@ try } LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size()); - shared_ptr vf = _queue.front (); + auto vf = _queue.front (); /* We're about to commit to either encoding this frame or putting it back onto the queue, so we must not be interrupted until one or other of these things have happened. This @@ -305,17 +311,17 @@ try { boost::this_thread::disable_interruption dis; - LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ()); + LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast(vf.eyes())); _queue.pop_front (); lock.unlock (); - optional encoded; + shared_ptr encoded; /* We need to encode this input */ if (server) { try { - encoded = vf->encode_remotely (server.get ()); + encoded = make_shared(vf.encode_remotely(server.get())); if (remote_backoff > 0) { LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ()); @@ -331,15 +337,15 @@ try } LOG_ERROR ( N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"), - vf->index(), server->host_name(), e.what(), remote_backoff + vf.index(), server->host_name(), e.what(), remote_backoff ); } } else { try { - LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index()); - encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)); - LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index()); + LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index()); + encoded = make_shared(vf.encode_locally()); + LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index()); } catch (std::exception& e) { /* This is very bad, so don't cope with it, just pass it on */ LOG_ERROR (N_("Local encode failed (%1)"), e.what ()); @@ -348,11 +354,11 @@ try } if (encoded) { - _writer->write (encoded.get(), vf->index (), vf->eyes ()); + _writer.write(encoded, vf.index(), vf.eyes()); frame_done (); } else { lock.lock (); - LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index()); + LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index()); _queue.push_front (vf); lock.unlock (); } @@ -378,50 +384,38 @@ catch (...) _full_condition.notify_all (); } + void J2KEncoder::servers_list_changed () { + boost::mutex::scoped_lock lm (_threads_mutex); + terminate_threads (); + _threads = make_shared(); /* XXX: could re-use threads */ - boost::mutex::scoped_lock lm (_threads_mutex); - -#ifdef BOOST_THREAD_PLATFORM_WIN32 - OSVERSIONINFO info; - info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO); - GetVersionEx (&info); - bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1); - if (windows_xp) { - LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP")); - } -#endif - if (!Config::instance()->only_servers_encode ()) { for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) { - boost::thread* t = new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, optional ())); #ifdef DCPOMATIC_LINUX + auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); pthread_setname_np (t->native_handle(), "encode-worker"); -#endif - _threads.push_back (t); -#ifdef BOOST_THREAD_PLATFORM_WIN32 - if (windows_xp) { - SetThreadAffinityMask (t->native_handle(), 1 << i); - } +#else + _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); #endif } } - BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) { + for (auto i: EncodeServerFinder::instance()->servers()) { if (!i.current_link_version()) { continue; } LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ()); for (int j = 0; j < i.threads(); ++j) { - _threads.push_back (new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, i))); + _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i)); } } - _writer->set_encoder_threads (_threads.size ()); + _writer.set_encoder_threads(_threads->size()); }