From 73ebb92e9df01ba7afb97121b6e2cef6ca13a18e Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Thu, 23 Jul 2020 22:53:43 +0200 Subject: [PATCH] Use thread_group for improved exception safety (#1785). --- src/lib/encode_server.cc | 14 ++++-------- src/lib/encode_server.h | 2 +- src/lib/j2k_encoder.cc | 46 +++++++++++++++------------------------- src/lib/j2k_encoder.h | 5 ++--- 4 files changed, 24 insertions(+), 43 deletions(-) diff --git a/src/lib/encode_server.cc b/src/lib/encode_server.cc index 8db3f867c..c242cb216 100644 --- a/src/lib/encode_server.cc +++ b/src/lib/encode_server.cc @@ -87,14 +87,9 @@ EncodeServer::~EncodeServer () _full_condition.notify_all (); } - BOOST_FOREACH (boost::thread* i, _worker_threads) { - try { - i->join (); - } catch (...) { - - } - delete i; - } + try { + _worker_threads.join_all (); + } catch (...) {} { boost::mutex::scoped_lock lm (_broadcast.mutex); @@ -241,8 +236,7 @@ EncodeServer::run () } for (int i = 0; i < _num_threads; ++i) { - boost::thread* t = new thread(bind(&EncodeServer::worker_thread, this)); - _worker_threads.push_back (t); + boost::thread* t = _worker_threads.create_thread (bind(&EncodeServer::worker_thread, this)); #ifdef DCPOMATIC_LINUX pthread_setname_np (t->native_handle(), "encode-server-worker"); #endif diff --git a/src/lib/encode_server.h b/src/lib/encode_server.h index 91e007503..a43cea7ef 100644 --- a/src/lib/encode_server.h +++ b/src/lib/encode_server.h @@ -54,7 +54,7 @@ private: void broadcast_thread (); void broadcast_received (); - std::vector _worker_threads; + boost::thread_group _worker_threads; std::list > _queue; boost::condition _full_condition; boost::condition _empty_condition; diff --git a/src/lib/j2k_encoder.cc b/src/lib/j2k_encoder.cc index 3d7b342da..498050073 100644 --- a/src/lib/j2k_encoder.cc +++ b/src/lib/j2k_encoder.cc @@ -185,11 +185,7 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) { _waker.nudge (); - size_t threads = 0; - { - boost::mutex::scoped_lock threads_lock (_threads_mutex); - threads = _threads.size (); - } + size_t threads = _threads->size(); boost::mutex::scoped_lock queue_lock (_queue_mutex); @@ -250,26 +246,20 @@ J2KEncoder::encode (shared_ptr pv, DCPTime time) void J2KEncoder::terminate_threads () { - boost::mutex::scoped_lock threads_lock (_threads_mutex); + if (!_threads) { + return; + } - int n = 0; - BOOST_FOREACH (boost::thread* i, _threads) { - /* Be careful not to throw in here otherwise _threads will not be clear()ed */ - LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ()); - i->interrupt (); - try { - i->join (); - } catch (exception& e) { - LOG_ERROR ("join() threw an exception: %1", e.what()); - } catch (...) { - LOG_ERROR_NC ("join() threw an exception"); - } - LOG_GENERAL_NC ("Thread terminated"); - ++n; - delete i; + _threads->interrupt_all (); + try { + _threads->join_all (); + } catch (exception& e) { + LOG_ERROR ("join() threw an exception: %1", e.what()); + } catch (...) { + LOG_ERROR_NC ("join() threw an exception"); } - _threads.clear (); + _threads.reset (); } void @@ -383,11 +373,10 @@ void J2KEncoder::servers_list_changed () { terminate_threads (); + _threads.reset (new boost::thread_group()); /* XXX: could re-use threads */ - boost::mutex::scoped_lock lm (_threads_mutex); - #ifdef BOOST_THREAD_PLATFORM_WIN32 OSVERSIONINFO info; info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO); @@ -400,12 +389,11 @@ J2KEncoder::servers_list_changed () if (!Config::instance()->only_servers_encode ()) { for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) { - boost::thread* t = new boost::thread (boost::bind(&J2KEncoder::encoder_thread, this, optional())); + boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional())); #ifdef DCPOMATIC_LINUX pthread_setname_np (t->native_handle(), "encode-worker"); #endif - _threads.push_back (t); -#ifdef BOOST_THREAD_PLATFORM_WIN32 +#ifdef DCPOMATIC_WINDOWS if (windows_xp) { SetThreadAffinityMask (t->native_handle(), 1 << i); } @@ -420,9 +408,9 @@ J2KEncoder::servers_list_changed () LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ()); for (int j = 0; j < i.threads(); ++j) { - _threads.push_back (new boost::thread(boost::bind(&J2KEncoder::encoder_thread, this, i))); + _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i)); } } - _writer->set_encoder_threads (_threads.size ()); + _writer->set_encoder_threads (_threads->size()); } diff --git a/src/lib/j2k_encoder.h b/src/lib/j2k_encoder.h index b4542c40a..d56fc1aec 100644 --- a/src/lib/j2k_encoder.h +++ b/src/lib/j2k_encoder.h @@ -87,9 +87,8 @@ private: EventHistory _history; - /** Mutex for _threads */ - mutable boost::mutex _threads_mutex; - std::list _threads; + boost::shared_ptr _threads; + mutable boost::mutex _queue_mutex; std::list > _queue; /** condition to manage thread wakeups when we have nothing to do */ -- 2.30.2