X-Git-Url: https://main.carlh.net/gitweb/?p=dcpomatic.git;a=blobdiff_plain;f=src%2Flib%2Fj2k_encoder.cc;h=273bce8fb6331acc76101e2b1b1a0f42ce609c44;hp=ac420517f9ea27f9617d233ac300d98686e450c3;hb=dd9be86db6cde0afa5da0d1d1ac43b42e05dca26;hpb=5b1b70c86df7225a2a102bdde3b38ea591a6dcbb diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index ac420517f..273bce8fb 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -45,8 +45,8 @@ using std::list; using std::cout; using std::exception; -using boost::shared_ptr; -using boost::weak_ptr; +using std::shared_ptr; +using std::weak_ptr; using boost::optional; using dcp::Data; using namespace dcpomatic; @@ -64,14 +64,8 @@ J2KEncoder::J2KEncoder (shared_ptr film, shared_ptr writer) J2KEncoder::~J2KEncoder () { - try { - terminate_threads (); - } catch (...) { - /* Destructors must not throw exceptions; anything bad - happening now is too late to worry about anyway, - I think. - */ - } + boost::mutex::scoped_lock lm (_threads_mutex); + terminate_threads (); } void @@ -114,7 +108,13 @@ J2KEncoder::end () LOG_GENERAL_NC (N_("Terminating encoder threads")); - terminate_threads (); + { + boost::mutex::scoped_lock lm (_threads_mutex); + terminate_threads (); + } + + /* Something might have been thrown during terminate_threads */ + rethrow (); LOG_GENERAL (N_("Mopping up %1"), _queue.size()); @@ -131,7 +131,7 @@ J2KEncoder::end () LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ()); try { _writer->write ( - (*i)->encode_locally(), + shared_ptr(new dcp::ArrayData((*i)->encode_locally())), (*i)->index(), (*i)->eyes() ); @@ -184,8 +184,8 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) size_t threads = 0; { - boost::mutex::scoped_lock threads_lock (_threads_mutex); - threads = _threads.size (); + boost::mutex::scoped_lock lm (_threads_mutex); + threads = _threads->size(); } boost::mutex::scoped_lock queue_lock (_queue_mutex); @@ -217,6 +217,7 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time)); /* This frame already has J2K data, so just write it */ _writer->write (pv->j2k(), position, pv->eyes ()); + frame_done (); } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) { LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time)); _writer->repeat (position, pv->eyes ()); @@ -244,29 +245,27 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) _last_player_video_time = time; } + +/** Caller must hold a lock on _threads_mutex */ void J2KEncoder::terminate_threads () { - boost::mutex::scoped_lock threads_lock (_threads_mutex); + boost::this_thread::disable_interruption dis; - int n = 0; - BOOST_FOREACH (boost::thread* i, _threads) { - /* Be careful not to throw in here otherwise _threads will not be clear()ed */ - LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ()); - i->interrupt (); - try { - i->join (); - } catch (exception& e) { - LOG_ERROR ("join() threw an exception: %1", e.what()); - } catch (...) { - LOG_ERROR_NC ("join() threw an exception"); - } - LOG_GENERAL_NC ("Thread terminated"); - ++n; - delete i; + if (!_threads) { + return; + } + + _threads->interrupt_all (); + try { + _threads->join_all (); + } catch (exception& e) { + LOG_ERROR ("join() threw an exception: %1", e.what()); + } catch (...) { + LOG_ERROR_NC ("join() threw an exception"); } - _threads.clear (); + _threads.reset (); } void @@ -308,12 +307,12 @@ try lock.unlock (); - optional encoded; + shared_ptr encoded; /* We need to encode this input */ if (server) { try { - encoded = vf->encode_remotely (server.get ()); + encoded.reset(new dcp::ArrayData(vf->encode_remotely(server.get()))); if (remote_backoff > 0) { LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ()); @@ -336,7 +335,7 @@ try } else { try { LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index()); - encoded = vf->encode_locally (); + encoded.reset(new dcp::ArrayData(vf->encode_locally())); LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index()); } catch (std::exception& e) { /* This is very bad, so don't cope with it, just pass it on */ @@ -346,7 +345,7 @@ try } if (encoded) { - _writer->write (encoded.get(), vf->index (), vf->eyes ()); + _writer->write (encoded, vf->index(), vf->eyes()); frame_done (); } else { lock.lock (); @@ -379,33 +378,20 @@ catch (...) void J2KEncoder::servers_list_changed () { + boost::mutex::scoped_lock lm (_threads_mutex); + terminate_threads (); + _threads.reset (new boost::thread_group()); /* XXX: could re-use threads */ - boost::mutex::scoped_lock lm (_threads_mutex); - -#ifdef BOOST_THREAD_PLATFORM_WIN32 - OSVERSIONINFO info; - info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO); - GetVersionEx (&info); - bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1); - if (windows_xp) { - LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP")); - } -#endif - if (!Config::instance()->only_servers_encode ()) { for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) { - boost::thread* t = new boost::thread (boost::bind(&J2KEncoder::encoder_thread, this, optional())); #ifdef DCPOMATIC_LINUX + boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); pthread_setname_np (t->native_handle(), "encode-worker"); -#endif - _threads.push_back (t); -#ifdef BOOST_THREAD_PLATFORM_WIN32 - if (windows_xp) { - SetThreadAffinityMask (t->native_handle(), 1 << i); - } +#else + _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); #endif } } @@ -417,9 +403,9 @@ J2KEncoder::servers_list_changed () LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ()); for (int j = 0; j < i.threads(); ++j) { - _threads.push_back (new boost::thread(boost::bind(&J2KEncoder::encoder_thread, this, i))); + _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i)); } } - _writer->set_encoder_threads (_threads.size ()); + _writer->set_encoder_threads (_threads->size()); }