X-Git-Url: https://main.carlh.net/gitweb/?a=blobdiff_plain;f=src%2Flib%2Fj2k_encoder.cc;h=7c6e2347733f361c7054f3bd862b7bf1e2d19357;hb=e52d9526f0a49acb72e8b4aa980399b119171ba5;hp=5c3fd477ef16dfeefb63d90bdb13bdbdd156bd0a;hpb=5d92e7bf242200c3b3b8a079671b572569d2b198;p=dcpomatic.git diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index 5c3fd477e..7c6e23477 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -37,7 +37,6 @@ #include "encode_server_description.h" #include "compose.hpp" #include -#include #include #include "i18n.h" @@ -45,8 +44,9 @@ using std::list; using std::cout; using std::exception; -using boost::shared_ptr; -using boost::weak_ptr; +using std::shared_ptr; +using std::weak_ptr; +using std::make_shared; using boost::optional; using dcp::Data; using namespace dcpomatic; @@ -64,14 +64,8 @@ J2KEncoder::J2KEncoder (shared_ptr film, shared_ptr writer) J2KEncoder::~J2KEncoder () { - try { - terminate_threads (); - } catch (...) { - /* Destructors must not throw exceptions; anything bad - happening now is too late to worry about anyway, - I think. - */ - } + boost::mutex::scoped_lock lm (_threads_mutex); + terminate_threads (); } void @@ -90,7 +84,7 @@ J2KEncoder::begin () void J2KEncoder::call_servers_list_changed (weak_ptr encoder) { - shared_ptr e = encoder.lock (); + auto e = encoder.lock (); if (e) { e->servers_list_changed (); } @@ -114,7 +108,13 @@ J2KEncoder::end () LOG_GENERAL_NC (N_("Terminating encoder threads")); - terminate_threads (); + { + boost::mutex::scoped_lock lm (_threads_mutex); + terminate_threads (); + } + + /* Something might have been thrown during terminate_threads */ + rethrow (); LOG_GENERAL (N_("Mopping up %1"), _queue.size()); @@ -127,13 +127,13 @@ J2KEncoder::end () So just mop up anything left in the queue here. */ - for (list >::iterator i = _queue.begin(); i != _queue.end(); ++i) { - LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ()); + for (auto i: _queue) { + LOG_GENERAL(N_("Encode left-over frame %1"), i->index()); try { _writer->write ( - (*i)->encode_locally(), - (*i)->index(), - (*i)->eyes() + make_shared(i->encode_locally()), + i->index(), + i->eyes() ); frame_done (); } catch (std::exception& e) { @@ -184,8 +184,8 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) size_t threads = 0; { - boost::mutex::scoped_lock threads_lock (_threads_mutex); - threads = _threads.size (); + boost::mutex::scoped_lock lm (_threads_mutex); + threads = _threads->size(); } boost::mutex::scoped_lock queue_lock (_queue_mutex); @@ -206,7 +206,7 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) */ rethrow (); - Frame const position = time.frames_floor(_film->video_frame_rate()); + auto const position = time.frames_floor(_film->video_frame_rate()); if (_writer->can_fake_write (position)) { /* We can fake-write this frame */ @@ -217,6 +217,7 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time)); /* This frame already has J2K data, so just write it */ _writer->write (pv->j2k(), position, pv->eyes ()); + frame_done (); } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) { LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time)); _writer->repeat (position, pv->eyes ()); @@ -224,15 +225,13 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time)); /* Queue this new frame for encoding */ LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ()); - _queue.push_back (shared_ptr ( - new DCPVideo ( - pv, - position, - _film->video_frame_rate(), - _film->j2k_bandwidth(), - _film->resolution() - ) - )); + _queue.push_back (make_shared( + pv, + position, + _film->video_frame_rate(), + _film->j2k_bandwidth(), + _film->resolution() + )); /* The queue might not be empty any more, so notify anything which is waiting on that. @@ -244,34 +243,27 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) _last_player_video_time = time; } + +/** Caller must hold a lock on _threads_mutex */ void J2KEncoder::terminate_threads () { - boost::mutex::scoped_lock threads_lock (_threads_mutex); - - int n = 0; - for (list::iterator i = _threads.begin(); i != _threads.end(); ++i) { - /* Be careful not to throw in here otherwise _threads will not be clear()ed */ - LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ()); - (*i)->interrupt (); - if (!(*i)->joinable()) { - LOG_ERROR_NC ("About to join() a non-joinable thread"); - } - try { - (*i)->join (); - } catch (boost::thread_interrupted& e) { - /* This is to be expected (I think?) */ - } catch (exception& e) { - LOG_ERROR ("join() threw an exception: %1", e.what()); - } catch (...) { - LOG_ERROR_NC ("join() threw an exception"); - } - delete *i; - LOG_GENERAL_NC ("Thread terminated"); - ++n; + boost::this_thread::disable_interruption dis; + + if (!_threads) { + return; + } + + _threads->interrupt_all (); + try { + _threads->join_all (); + } catch (exception& e) { + LOG_ERROR ("join() threw an exception: %1", e.what()); + } catch (...) { + LOG_ERROR_NC ("join() threw an exception"); } - _threads.clear (); + _threads.reset (); } void @@ -299,7 +291,7 @@ try } LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size()); - shared_ptr vf = _queue.front (); + auto vf = _queue.front (); /* We're about to commit to either encoding this frame or putting it back onto the queue, so we must not be interrupted until one or other of these things have happened. This @@ -313,12 +305,12 @@ try lock.unlock (); - optional encoded; + shared_ptr encoded; /* We need to encode this input */ if (server) { try { - encoded = vf->encode_remotely (server.get ()); + encoded = make_shared(vf->encode_remotely(server.get())); if (remote_backoff > 0) { LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ()); @@ -341,7 +333,7 @@ try } else { try { LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index()); - encoded = vf->encode_locally (); + encoded = make_shared(vf->encode_locally()); LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index()); } catch (std::exception& e) { /* This is very bad, so don't cope with it, just pass it on */ @@ -351,7 +343,7 @@ try } if (encoded) { - _writer->write (encoded.get(), vf->index (), vf->eyes ()); + _writer->write (encoded, vf->index(), vf->eyes()); frame_done (); } else { lock.lock (); @@ -384,47 +376,34 @@ catch (...) void J2KEncoder::servers_list_changed () { + boost::mutex::scoped_lock lm (_threads_mutex); + terminate_threads (); + _threads = make_shared(); /* XXX: could re-use threads */ - boost::mutex::scoped_lock lm (_threads_mutex); - -#ifdef BOOST_THREAD_PLATFORM_WIN32 - OSVERSIONINFO info; - info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO); - GetVersionEx (&info); - bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1); - if (windows_xp) { - LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP")); - } -#endif - if (!Config::instance()->only_servers_encode ()) { for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) { - boost::thread* t = new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, optional ())); #ifdef DCPOMATIC_LINUX + auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); pthread_setname_np (t->native_handle(), "encode-worker"); -#endif - _threads.push_back (t); -#ifdef BOOST_THREAD_PLATFORM_WIN32 - if (windows_xp) { - SetThreadAffinityMask (t->native_handle(), 1 << i); - } +#else + _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); #endif } } - BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) { + for (auto i: EncodeServerFinder::instance()->servers()) { if (!i.current_link_version()) { continue; } LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ()); for (int j = 0; j < i.threads(); ++j) { - _threads.push_back (new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, i))); + _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i)); } } - _writer->set_encoder_threads (_threads.size ()); + _writer->set_encoder_threads (_threads->size()); }