More c++ tidying.
[dcpomatic.git] / src / lib / j2k_encoder.cc
index 3d1df688c2d718cc2da9a4b2a0c8e00a3a4d6b33..7c6e2347733f361c7054f3bd862b7bf1e2d19357 100644 (file)
@@ -1,5 +1,5 @@
 /*
-    Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
+    Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
 
     This file is part of DCP-o-matic.
 
@@ -26,6 +26,7 @@
 #include "util.h"
 #include "film.h"
 #include "log.h"
+#include "dcpomatic_log.h"
 #include "config.h"
 #include "dcp_video.h"
 #include "cross.h"
 #include "encode_server_description.h"
 #include "compose.hpp"
 #include <libcxml/cxml.h>
-#include <boost/foreach.hpp>
 #include <iostream>
 
 #include "i18n.h"
 
-#define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
-#define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
-#define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
-#define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
-#define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
-
 using std::list;
 using std::cout;
-using boost::shared_ptr;
-using boost::weak_ptr;
+using std::exception;
+using std::shared_ptr;
+using std::weak_ptr;
+using std::make_shared;
 using boost::optional;
 using dcp::Data;
+using namespace dcpomatic;
 
 /** @param film Film that we are encoding.
  *  @param writer Writer that we are using.
@@ -67,14 +64,8 @@ J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
 
 J2KEncoder::~J2KEncoder ()
 {
-       try {
-               terminate_threads ();
-       } catch (...) {
-               /* Destructors must not throw exceptions; anything bad
-                  happening now is too late to worry about anyway,
-                  I think.
-               */
-       }
+       boost::mutex::scoped_lock lm (_threads_mutex);
+       terminate_threads ();
 }
 
 void
@@ -93,7 +84,7 @@ J2KEncoder::begin ()
 void
 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
 {
-       shared_ptr<J2KEncoder> e = encoder.lock ();
+       auto e = encoder.lock ();
        if (e) {
                e->servers_list_changed ();
        }
@@ -117,7 +108,13 @@ J2KEncoder::end ()
 
        LOG_GENERAL_NC (N_("Terminating encoder threads"));
 
-       terminate_threads ();
+       {
+               boost::mutex::scoped_lock lm (_threads_mutex);
+               terminate_threads ();
+       }
+
+       /* Something might have been thrown during terminate_threads */
+       rethrow ();
 
        LOG_GENERAL (N_("Mopping up %1"), _queue.size());
 
@@ -130,13 +127,13 @@ J2KEncoder::end ()
             So just mop up anything left in the queue here.
        */
 
-       for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
-               LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
+       for (auto i: _queue) {
+               LOG_GENERAL(N_("Encode left-over frame %1"), i->index());
                try {
                        _writer->write (
-                               (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
-                               (*i)->index (),
-                               (*i)->eyes ()
+                               make_shared<dcp::ArrayData>(i->encode_locally()),
+                               i->index(),
+                               i->eyes()
                                );
                        frame_done ();
                } catch (std::exception& e) {
@@ -146,9 +143,9 @@ J2KEncoder::end ()
 }
 
 /** @return an estimate of the current number of frames we are encoding per second,
- *  or 0 if not known.
+ *  if known.
  */
-float
+optional<float>
 J2KEncoder::current_encoding_rate () const
 {
        return _history.rate ();
@@ -187,8 +184,8 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
 
        size_t threads = 0;
        {
-               boost::mutex::scoped_lock threads_lock (_threads_mutex);
-               threads = _threads.size ();
+               boost::mutex::scoped_lock lm (_threads_mutex);
+               threads = _threads->size();
        }
 
        boost::mutex::scoped_lock queue_lock (_queue_mutex);
@@ -209,17 +206,18 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
        */
        rethrow ();
 
-       Frame const position = time.frames_floor(_film->video_frame_rate());
+       auto const position = time.frames_floor(_film->video_frame_rate());
 
        if (_writer->can_fake_write (position)) {
                /* We can fake-write this frame */
                LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
                _writer->fake_write (position, pv->eyes ());
                frame_done ();
-       } else if (pv->has_j2k ()) {
+       } else if (pv->has_j2k() && !_film->reencode_j2k()) {
                LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
                /* This frame already has J2K data, so just write it */
                _writer->write (pv->j2k(), position, pv->eyes ());
+               frame_done ();
        } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
                LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
                _writer->repeat (position, pv->eyes ());
@@ -227,16 +225,13 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
                LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
                /* Queue this new frame for encoding */
                LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
-               _queue.push_back (shared_ptr<DCPVideo> (
-                                         new DCPVideo (
-                                                 pv,
-                                                 position,
-                                                 _film->video_frame_rate(),
-                                                 _film->j2k_bandwidth(),
-                                                 _film->resolution(),
-                                                 _film->log()
-                                                 )
-                                         ));
+               _queue.push_back (make_shared<DCPVideo>(
+                               pv,
+                               position,
+                               _film->video_frame_rate(),
+                               _film->j2k_bandwidth(),
+                               _film->resolution()
+                               ));
 
                /* The queue might not be empty any more, so notify anything which is
                   waiting on that.
@@ -248,27 +243,27 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
        _last_player_video_time = time;
 }
 
+
+/** Caller must hold a lock on _threads_mutex */
 void
 J2KEncoder::terminate_threads ()
 {
-       boost::mutex::scoped_lock threads_lock (_threads_mutex);
+       boost::this_thread::disable_interruption dis;
 
-       int n = 0;
-       for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
-               LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
-               (*i)->interrupt ();
-               DCPOMATIC_ASSERT ((*i)->joinable ());
-               try {
-                       (*i)->join ();
-               } catch (boost::thread_interrupted& e) {
-                       /* This is to be expected */
-               }
-               delete *i;
-               LOG_GENERAL_NC ("Thread terminated");
-               ++n;
+       if (!_threads) {
+               return;
+       }
+
+       _threads->interrupt_all ();
+       try {
+               _threads->join_all ();
+       } catch (exception& e) {
+               LOG_ERROR ("join() threw an exception: %1", e.what());
+       } catch (...) {
+               LOG_ERROR_NC ("join() threw an exception");
        }
 
-       _threads.clear ();
+       _threads.reset ();
 }
 
 void
@@ -296,7 +291,7 @@ try
                }
 
                LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
-               shared_ptr<DCPVideo> vf = _queue.front ();
+               auto vf = _queue.front ();
 
                /* We're about to commit to either encoding this frame or putting it back onto the queue,
                   so we must not be interrupted until one or other of these things have happened.  This
@@ -310,12 +305,12 @@ try
 
                        lock.unlock ();
 
-                       optional<Data> encoded;
+                       shared_ptr<Data> encoded;
 
                        /* We need to encode this input */
                        if (server) {
                                try {
-                                       encoded = vf->encode_remotely (server.get ());
+                                       encoded = make_shared<dcp::ArrayData>(vf->encode_remotely(server.get()));
 
                                        if (remote_backoff > 0) {
                                                LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
@@ -338,7 +333,7 @@ try
                        } else {
                                try {
                                        LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
-                                       encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
+                                       encoded = make_shared<dcp::ArrayData>(vf->encode_locally());
                                        LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
                                } catch (std::exception& e) {
                                        /* This is very bad, so don't cope with it, just pass it on */
@@ -348,7 +343,7 @@ try
                        }
 
                        if (encoded) {
-                               _writer->write (encoded.get(), vf->index (), vf->eyes ());
+                               _writer->write (encoded, vf->index(), vf->eyes());
                                frame_done ();
                        } else {
                                lock.lock ();
@@ -381,43 +376,34 @@ catch (...)
 void
 J2KEncoder::servers_list_changed ()
 {
+       boost::mutex::scoped_lock lm (_threads_mutex);
+
        terminate_threads ();
+       _threads = make_shared<boost::thread_group>();
 
        /* XXX: could re-use threads */
 
-       boost::mutex::scoped_lock lm (_threads_mutex);
-
-#ifdef BOOST_THREAD_PLATFORM_WIN32
-       OSVERSIONINFO info;
-       info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
-       GetVersionEx (&info);
-       bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
-       if (windows_xp) {
-               LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
-       }
-#endif
-
        if (!Config::instance()->only_servers_encode ()) {
                for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
-                       boost::thread* t = new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription> ()));
 #ifdef DCPOMATIC_LINUX
+                       auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
                        pthread_setname_np (t->native_handle(), "encode-worker");
-#endif
-                       _threads.push_back (t);
-#ifdef BOOST_THREAD_PLATFORM_WIN32
-                       if (windows_xp) {
-                               SetThreadAffinityMask (t->native_handle(), 1 << i);
-                       }
+#else
+                       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
 #endif
                }
        }
 
-       BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
+       for (auto i: EncodeServerFinder::instance()->servers()) {
+               if (!i.current_link_version()) {
+                       continue;
+               }
+
                LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
                for (int j = 0; j < i.threads(); ++j) {
-                       _threads.push_back (new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, i)));
+                       _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
                }
        }
 
-       _writer->set_encoder_threads (_threads.size ());
+       _writer->set_encoder_threads (_threads->size());
 }