Formatting, variable name tidying and some const correctness.
[dcpomatic.git] / src / lib / j2k_encoder.cc
index c000f8599cfa89a0b3aa70320ac6b805bcda9c43..defde2227a5fee5af985b36c0ce230d130d61236 100644 (file)
@@ -1,5 +1,5 @@
 /*
-    Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
+    Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
 
     This file is part of DCP-o-matic.
 
 
 */
 
+
 /** @file src/j2k_encoder.cc
  *  @brief J2K encoder class.
  */
 
-#include "j2k_encoder.h"
-#include "util.h"
-#include "film.h"
-#include "log.h"
-#include "dcpomatic_log.h"
+
+#include "compose.hpp"
 #include "config.h"
-#include "dcp_video.h"
 #include "cross.h"
-#include "writer.h"
+#include "dcp_video.h"
+#include "dcpomatic_log.h"
+#include "encode_server_description.h"
 #include "encode_server_finder.h"
-#include "player.h"
+#include "film.h"
+#include "j2k_encoder.h"
+#include "log.h"
 #include "player_video.h"
-#include "encode_server_description.h"
-#include "compose.hpp"
+#include "util.h"
+#include "writer.h"
 #include <libcxml/cxml.h>
-#include <boost/foreach.hpp>
 #include <iostream>
 
 #include "i18n.h"
 
-using std::list;
+
 using std::cout;
 using std::exception;
-using boost::shared_ptr;
-using boost::weak_ptr;
+using std::list;
+using std::make_shared;
+using std::shared_ptr;
+using std::weak_ptr;
 using boost::optional;
 using dcp::Data;
 using namespace dcpomatic;
 
+static grk_plugin::GrokInitializer grokInitializer;
+
 /** @param film Film that we are encoding.
  *  @param writer Writer that we are using.
  */
-J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
+J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
        : _film (film)
        , _history (200)
        , _writer (writer)
+       , _dcpomatic_context(film, writer, _history, Config::instance()->gpu_binary_location())
+       , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr)
 {
-       servers_list_changed ();
 }
 
+
 J2KEncoder::~J2KEncoder ()
 {
+       _server_found_connection.disconnect();
+
+       {
+       boost::mutex::scoped_lock lm (_threads_mutex);
        terminate_threads ();
+       }
+
+       delete _context;
 }
 
 void
 J2KEncoder::begin ()
 {
-       weak_ptr<J2KEncoder> wp = shared_from_this ();
-       _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
-               boost::bind (&J2KEncoder::call_servers_list_changed, wp)
+       _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
+               boost::bind(&J2KEncoder::servers_list_changed, this)
                );
+       servers_list_changed ();
 }
 
-/* We don't want the servers-list-changed callback trying to do things
-   during destruction of J2KEncoder, and I think this is the neatest way
-   to achieve that.
-*/
+
 void
-J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
+J2KEncoder::pause()
+{
+       if (Config::instance()->enable_gpu()) {
+               end(false);
+       }
+}
+
+
+void J2KEncoder::resume()
 {
-       shared_ptr<J2KEncoder> e = encoder.lock ();
-       if (e) {
-               e->servers_list_changed ();
+       if (Config::instance()->enable_gpu()) {
+               _context = new grk_plugin::GrokContext(_dcpomatic_context);
+               servers_list_changed();
        }
 }
 
+
 void
-J2KEncoder::end ()
+J2KEncoder::end (bool isFinal)
 {
-       boost::mutex::scoped_lock lock (_queue_mutex);
+       if (isFinal) {
+               boost::mutex::scoped_lock lock (_queue_mutex);
 
-       LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
+               LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
 
-       /* Keep waking workers until the queue is empty */
-       while (!_queue.empty ()) {
-               rethrow ();
-               _empty_condition.notify_all ();
-               _full_condition.wait (lock);
+               /* Keep waking workers until the queue is empty */
+                       while (!_queue.empty ()) {
+                               rethrow ();
+                               _empty_condition.notify_all ();
+                               _full_condition.wait (lock);
+                       }
+               lock.unlock ();
        }
 
-       lock.unlock ();
-
        LOG_GENERAL_NC (N_("Terminating encoder threads"));
 
-       terminate_threads ();
+       {
+               boost::mutex::scoped_lock lm (_threads_mutex);
+               terminate_threads ();
+       }
 
        /* Something might have been thrown during terminate_threads */
        rethrow ();
@@ -115,29 +138,41 @@ J2KEncoder::end ()
        LOG_GENERAL (N_("Mopping up %1"), _queue.size());
 
        /* The following sequence of events can occur in the above code:
-            1. a remote worker takes the last image off the queue
-            2. the loop above terminates
-            3. the remote worker fails to encode the image and puts it back on the queue
-            4. the remote worker is then terminated by terminate_threads
+                1. a remote worker takes the last image off the queue
+                2. the loop above terminates
+                3. the remote worker fails to encode the image and puts it back on the queue
+                4. the remote worker is then terminated by terminate_threads
 
-            So just mop up anything left in the queue here.
+                So just mop up anything left in the queue here.
        */
-
-       for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
-               LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
-               try {
-                       _writer->write (
-                               shared_ptr<dcp::Data>(new dcp::ArrayData((*i)->encode_locally())),
-                               (*i)->index(),
-                               (*i)->eyes()
-                               );
-                       frame_done ();
-               } catch (std::exception& e) {
-                       LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+       if (isFinal) {
+               for (auto & i: _queue) {
+                       if (Config::instance()->enable_gpu ()) {
+                               if (!_context->scheduleCompress(i)){
+                                       LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
+                                       // handle error
+                               }
+                       }
+                       else {
+                               LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
+                               try {
+                                       _writer.write(
+                                                       make_shared<dcp::ArrayData>(i.encode_locally()),
+                                               i.index(),
+                                               i.eyes()
+                                               );
+                                       frame_done ();
+                               } catch (std::exception& e) {
+                                       LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+                               }
+                       }
                }
        }
+       delete _context;
+       _context = nullptr;
 }
 
+
 /** @return an estimate of the current number of frames we are encoding per second,
  *  if known.
  */
@@ -147,6 +182,7 @@ J2KEncoder::current_encoding_rate () const
        return _history.rate ();
 }
 
+
 /** @return Number of video frames that have been queued for encoding */
 int
 J2KEncoder::video_frames_enqueued () const
@@ -158,6 +194,7 @@ J2KEncoder::video_frames_enqueued () const
        return _last_player_video_time->frames_floor (_film->video_frame_rate ());
 }
 
+
 /** Should be called when a frame has been encoded successfully */
 void
 J2KEncoder::frame_done ()
@@ -165,6 +202,7 @@ J2KEncoder::frame_done ()
        _history.event ();
 }
 
+
 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
  *  so each time the supplied frame is the one after the previous one.
  *  pv represents one video frame, and could be empty if there is nothing to encode
@@ -181,7 +219,10 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
        size_t threads = 0;
        {
                boost::mutex::scoped_lock lm (_threads_mutex);
-               threads = _threads->size();
+               if (_threads)
+                       threads = _threads->size();
+               else
+                       threads = std::thread::hardware_concurrency();
        }
 
        boost::mutex::scoped_lock queue_lock (_queue_mutex);
@@ -195,40 +236,40 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
                LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
        }
 
-       _writer->rethrow ();
+       _writer.rethrow();
        /* Re-throw any exception raised by one of our threads.  If more
           than one has thrown an exception, only one will be rethrown, I think;
           but then, if that happens something has gone badly wrong.
        */
        rethrow ();
 
-       Frame const position = time.frames_floor(_film->video_frame_rate());
+       auto const position = time.frames_floor(_film->video_frame_rate());
 
-       if (_writer->can_fake_write (position)) {
+       if (_writer.can_fake_write(position)) {
                /* We can fake-write this frame */
                LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
-               _writer->fake_write (position, pv->eyes ());
+               _writer.fake_write(position, pv->eyes ());
                frame_done ();
        } else if (pv->has_j2k() && !_film->reencode_j2k()) {
                LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
                /* This frame already has J2K data, so just write it */
-               _writer->write (pv->j2k(), position, pv->eyes ());
-       } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
+               _writer.write(pv->j2k(), position, pv->eyes ());
+               frame_done ();
+       } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
                LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
-               _writer->repeat (position, pv->eyes ());
+               _writer.repeat(position, pv->eyes());
        } else {
                LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
                /* Queue this new frame for encoding */
                LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
-               _queue.push_back (shared_ptr<DCPVideo> (
-                                         new DCPVideo (
-                                                 pv,
-                                                 position,
-                                                 _film->video_frame_rate(),
-                                                 _film->j2k_bandwidth(),
-                                                 _film->resolution()
-                                                 )
-                                         ));
+               auto dcpv = DCPVideo(
+                               pv,
+                               position,
+                               _film->video_frame_rate(),
+                               _film->j2k_bandwidth(),
+                               _film->resolution()
+                               );
+               _queue.push_back (dcpv);
 
                /* The queue might not be empty any more, so notify anything which is
                   waiting on that.
@@ -240,11 +281,12 @@ J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
        _last_player_video_time = time;
 }
 
+
+/** Caller must hold a lock on _threads_mutex */
 void
 J2KEncoder::terminate_threads ()
 {
        boost::this_thread::disable_interruption dis;
-       boost::mutex::scoped_lock lm (_threads_mutex);
 
        if (!_threads) {
                return;
@@ -262,10 +304,15 @@ J2KEncoder::terminate_threads ()
        _threads.reset ();
 }
 
+
 void
 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
 try
 {
+       auto config = Config::instance ();
+
+       start_of_thread ("J2KEncoder");
+
        if (server) {
                LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
        } else {
@@ -287,7 +334,7 @@ try
                }
 
                LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
-               shared_ptr<DCPVideo> vf = _queue.front ();
+               auto vf = _queue.front ();
 
                /* We're about to commit to either encoding this frame or putting it back onto the queue,
                   so we must not be interrupted until one or other of these things have happened.  This
@@ -296,7 +343,7 @@ try
                {
                        boost::this_thread::disable_interruption dis;
 
-                       LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
+                       LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf.index(), static_cast<int>(vf.eyes()));
                        _queue.pop_front ();
 
                        lock.unlock ();
@@ -306,7 +353,7 @@ try
                        /* We need to encode this input */
                        if (server) {
                                try {
-                                       encoded.reset(new dcp::ArrayData(vf->encode_remotely(server.get())));
+                                       encoded = make_shared<dcp::ArrayData>(vf.encode_remotely(server.get()));
 
                                        if (remote_backoff > 0) {
                                                LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
@@ -322,30 +369,40 @@ try
                                        }
                                        LOG_ERROR (
                                                N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
-                                               vf->index(), server->host_name(), e.what(), remote_backoff
+                                               vf.index(), server->host_name(), e.what(), remote_backoff
                                                );
                                }
 
                        } else {
-                               try {
-                                       LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
-                                       encoded.reset(new dcp::ArrayData(vf->encode_locally()));
-                                       LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
-                               } catch (std::exception& e) {
-                                       /* This is very bad, so don't cope with it, just pass it on */
-                                       LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
-                                       throw;
+                               if (_context) {
+                                       if (!_context->launch(vf, config->selected_gpu()) || !_context->scheduleCompress(vf)) {
+                                               LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
+                                               _queue.push_front (vf);
+                                       }
+
+                               } else {
+                                       try {
+                                               LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf.index());
+                                               encoded = make_shared<dcp::ArrayData>(vf.encode_locally());
+                                               LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf.index());
+                                       } catch (std::exception& e) {
+                                               /* This is very bad, so don't cope with it, just pass it on */
+                                               LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+                                               throw;
+                                       }
                                }
                        }
 
                        if (encoded) {
-                               _writer->write (encoded, vf->index(), vf->eyes());
+                               _writer.write(encoded, vf.index(), vf.eyes());
                                frame_done ();
                        } else {
-                               lock.lock ();
-                               LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
-                               _queue.push_front (vf);
-                               lock.unlock ();
+                               if (!Config::instance()->enable_gpu ()) {
+                                       lock.lock ();
+                                       LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf.index());
+                                       _queue.push_front (vf);
+                                       lock.unlock ();
+                               }
                        }
                }
 
@@ -369,20 +426,21 @@ catch (...)
        _full_condition.notify_all ();
 }
 
+
 void
 J2KEncoder::servers_list_changed ()
 {
-       terminate_threads ();
-       _threads.reset (new boost::thread_group());
-
        boost::mutex::scoped_lock lm (_threads_mutex);
 
+       terminate_threads ();
+       _threads = make_shared<boost::thread_group>();
+
        /* XXX: could re-use threads */
 
        if (!Config::instance()->only_servers_encode ()) {
                for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
 #ifdef DCPOMATIC_LINUX
-                       boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
+                       auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
                        pthread_setname_np (t->native_handle(), "encode-worker");
 #else
                        _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
@@ -390,7 +448,7 @@ J2KEncoder::servers_list_changed ()
                }
        }
 
-       BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) {
+       for (auto i: EncodeServerFinder::instance()->servers()) {
                if (!i.current_link_version()) {
                        continue;
                }
@@ -401,5 +459,5 @@ J2KEncoder::servers_list_changed ()
                }
        }
 
-       _writer->set_encoder_threads (_threads->size());
+       _writer.set_encoder_threads(_threads->size());
 }