Tidy up logging a bit. Make it configurable from the GUI.
[dcpomatic.git] / src / lib / encoder.cc
index 17a6726a6841db8c3bd753b35d5f9ac1686b1991..05da6bbdf53db33e0ec62847bd2e0a6e02c4b9c8 100644 (file)
@@ -1,5 +1,5 @@
 /*
-    Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
+    Copyright (C) 2012-2014 Carl Hetherington <cth@carlh.net>
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License as published by
  *  @brief Parent class for classes which can encode video and audio frames.
  */
 
+#include <iostream>
+#include <boost/lambda/lambda.hpp>
+#include <libcxml/cxml.h>
 #include "encoder.h"
 #include "util.h"
-#include "options.h"
+#include "film.h"
+#include "log.h"
+#include "config.h"
+#include "dcp_video_frame.h"
+#include "server.h"
+#include "cross.h"
+#include "writer.h"
+#include "server_finder.h"
+#include "player.h"
+#include "player_video_frame.h"
+
+#include "i18n.h"
+
+#define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::GENERAL);
+#define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::ERROR);
+#define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TIMING);
 
 using std::pair;
-using namespace boost;
+using std::string;
+using std::stringstream;
+using std::vector;
+using std::list;
+using std::cout;
+using std::min;
+using std::make_pair;
+using boost::shared_ptr;
+using boost::weak_ptr;
+using boost::optional;
+using boost::scoped_array;
 
 int const Encoder::_history_size = 25;
 
-/** @param f Film that we are encoding.
- *  @param o Options.
- */
-Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const Options> o)
+/** @param f Film that we are encoding */
+Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j)
        : _film (f)
-       , _opt (o)
-       , _just_skipped (false)
-       , _video_frame (0)
-       , _audio_frame (0)
+       , _job (j)
+       , _video_frames_out (0)
+       , _terminate (false)
 {
+       _have_a_real_frame[EYES_BOTH] = false;
+       _have_a_real_frame[EYES_LEFT] = false;
+       _have_a_real_frame[EYES_RIGHT] = false;
+}
 
+Encoder::~Encoder ()
+{
+       terminate_threads ();
 }
 
+/** Add a worker thread for a each thread on a remote server.  Caller must hold
+ *  a lock on _mutex, or know that one is not currently required to
+ *  safely modify _threads.
+ */
+void
+Encoder::add_worker_threads (ServerDescription d)
+{
+       LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.host_name ());
+       for (int i = 0; i < d.threads(); ++i) {
+               _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
+       }
+}
+
+void
+Encoder::process_begin ()
+{
+       for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
+               _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
+       }
+
+       _writer.reset (new Writer (_film, _job));
+       ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
+}
+
+void
+Encoder::process_end ()
+{
+       boost::mutex::scoped_lock lock (_mutex);
+
+       LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
+
+       /* Keep waking workers until the queue is empty */
+       while (!_queue.empty ()) {
+               _condition.notify_all ();
+               _condition.wait (lock);
+       }
+
+       lock.unlock ();
+       
+       terminate_threads ();
+
+       LOG_GENERAL (N_("Mopping up %1"), _queue.size());
+
+       /* The following sequence of events can occur in the above code:
+            1. a remote worker takes the last image off the queue
+            2. the loop above terminates
+            3. the remote worker fails to encode the image and puts it back on the queue
+            4. the remote worker is then terminated by terminate_threads
+
+            So just mop up anything left in the queue here.
+       */
+
+       for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
+               LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
+               try {
+                       _writer->write ((*i)->encode_locally(), (*i)->index (), (*i)->eyes ());
+                       frame_done ();
+               } catch (std::exception& e) {
+                       LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+               }
+       }
+               
+       _writer->finish ();
+       _writer.reset ();
+}      
 
 /** @return an estimate of the current number of frames we are encoding per second,
  *  or 0 if not known.
  */
 float
-Encoder::current_frames_per_second () const
+Encoder::current_encoding_rate () const
 {
-       boost::mutex::scoped_lock lock (_history_mutex);
+       boost::mutex::scoped_lock lock (_state_mutex);
        if (int (_time_history.size()) < _history_size) {
                return 0;
        }
@@ -61,20 +158,12 @@ Encoder::current_frames_per_second () const
        return _history_size / (seconds (now) - seconds (_time_history.back ()));
 }
 
-/** @return true if the last frame to be processed was skipped as it already existed */
-bool
-Encoder::skipping () const
+/** @return Number of video frames that have been sent out */
+int
+Encoder::video_frames_out () const
 {
-       boost::mutex::scoped_lock (_history_mutex);
-       return _just_skipped;
-}
-
-/** @return Number of video frames that have been received */
-SourceFrame
-Encoder::video_frame () const
-{
-       boost::mutex::scoped_lock (_history_mutex);
-       return _video_frame;
+       boost::mutex::scoped_lock (_state_mutex);
+       return _video_frames_out;
 }
 
 /** Should be called when a frame has been encoded successfully.
@@ -83,8 +172,7 @@ Encoder::video_frame () const
 void
 Encoder::frame_done ()
 {
-       boost::mutex::scoped_lock lock (_history_mutex);
-       _just_skipped = false;
+       boost::mutex::scoped_lock lock (_state_mutex);
        
        struct timeval tv;
        gettimeofday (&tv, 0);
@@ -94,65 +182,173 @@ Encoder::frame_done ()
        }
 }
 
-/** Called by a subclass when it has just skipped the processing
-    of a frame because it has already been done.
-*/
 void
-Encoder::frame_skipped ()
+Encoder::process_video (shared_ptr<PlayerVideoFrame> pvf, bool same)
 {
-       boost::mutex::scoped_lock lock (_history_mutex);
-       _just_skipped = true;
+       _waker.nudge ();
+       
+       boost::mutex::scoped_lock lock (_mutex);
+
+       /* XXX: discard 3D here if required */
+
+       /* Wait until the queue has gone down a bit */
+       while (_queue.size() >= _threads.size() * 2 && !_terminate) {
+               LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
+               _condition.wait (lock);
+               LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
+       }
+
+       if (_terminate) {
+               return;
+       }
+
+       _writer->rethrow ();
+       /* Re-throw any exception raised by one of our threads.  If more
+          than one has thrown an exception, only one will be rethrown, I think;
+          but then, if that happens something has gone badly wrong.
+       */
+       rethrow ();
+
+       if (_writer->can_fake_write (_video_frames_out)) {
+               _writer->fake_write (_video_frames_out, pvf->eyes ());
+               _have_a_real_frame[pvf->eyes()] = false;
+               frame_done ();
+       } else if (same && _have_a_real_frame[pvf->eyes()]) {
+               /* Use the last frame that we encoded. */
+               _writer->repeat (_video_frames_out, pvf->eyes());
+               frame_done ();
+       } else {
+               /* Queue this new frame for encoding */
+               LOG_TIMING ("adding to queue of %1", _queue.size ());
+               _queue.push_back (shared_ptr<DCPVideoFrame> (
+                                         new DCPVideoFrame (
+                                                 pvf, _video_frames_out, _film->video_frame_rate(),
+                                                 _film->j2k_bandwidth(), _film->resolution(), _film->log()
+                                                 )
+                                         ));
+               
+               _condition.notify_all ();
+               _have_a_real_frame[pvf->eyes()] = true;
+       }
+
+       if (pvf->eyes() != EYES_LEFT) {
+               ++_video_frames_out;
+       }
 }
 
 void
-Encoder::process_video (shared_ptr<Image> i, boost::shared_ptr<Subtitle> s)
+Encoder::process_audio (shared_ptr<const AudioBuffers> data)
 {
-       if (_opt->decode_video_skip != 0 && (_video_frame % _opt->decode_video_skip) != 0) {
-               ++_video_frame;
-               return;
+       _writer->write (data);
+}
+
+void
+Encoder::terminate_threads ()
+{
+       {
+               boost::mutex::scoped_lock lock (_mutex);
+               _terminate = true;
+               _condition.notify_all ();
        }
 
-       if (_opt->video_decode_range) {
-               pair<SourceFrame, SourceFrame> const r = _opt->video_decode_range.get();
-               if (_video_frame < r.first || _video_frame >= r.second) {
-                       ++_video_frame;
-                       return;
+       for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
+               if ((*i)->joinable ()) {
+                       (*i)->join ();
                }
+               delete *i;
        }
 
-       do_process_video (i, s);
-       ++_video_frame;
+       _threads.clear ();
 }
 
 void
-Encoder::process_audio (shared_ptr<AudioBuffers> data)
+Encoder::encoder_thread (optional<ServerDescription> server)
+try
 {
-       if (_opt->audio_decode_range) {
+       /* Number of seconds that we currently wait between attempts
+          to connect to the server; not relevant for localhost
+          encodings.
+       */
+       int remote_backoff = 0;
+       
+       while (1) {
 
-               shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
-               
-               /* Range that we are encoding */
-               pair<int64_t, int64_t> required_range = _opt->audio_decode_range.get();
-               /* Range of this block of data */
-               pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
+               LOG_TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
+               boost::mutex::scoped_lock lock (_mutex);
+               while (_queue.empty () && !_terminate) {
+                       _condition.wait (lock);
+               }
 
-               if (this_range.second < required_range.first || required_range.second < this_range.first) {
-                       /* No part of this audio is within the required range */
+               if (_terminate) {
                        return;
-               } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
-                       /* Trim start */
-                       int64_t const shift = required_range.first - this_range.first;
-                       trimmed->move (shift, 0, trimmed->frames() - shift);
-                       trimmed->set_frames (trimmed->frames() - shift);
-               } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
-                       /* Trim end */
-                       trimmed->set_frames (required_range.second - this_range.first);
                }
 
-               data = trimmed;
-       }
+               LOG_TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
+               shared_ptr<DCPVideoFrame> vf = _queue.front ();
+               LOG_TIMING ("encoder thread %1 pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
+               _queue.pop_front ();
+               
+               lock.unlock ();
+
+               shared_ptr<EncodedData> encoded;
+
+               if (server) {
+                       try {
+                               encoded = vf->encode_remotely (server.get ());
+
+                               if (remote_backoff > 0) {
+                                       LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
+                               }
+                               
+                               /* This job succeeded, so remove any backoff */
+                               remote_backoff = 0;
+                               
+                       } catch (std::exception& e) {
+                               if (remote_backoff < 60) {
+                                       /* back off more */
+                                       remote_backoff += 10;
+                               }
+                               LOG_ERROR (
+                                       N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
+                                       vf->index(), server->host_name(), e.what(), remote_backoff
+                                       );
+                       }
+                               
+               } else {
+                       try {
+                               LOG_TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->index());
+                               encoded = vf->encode_locally ();
+                               LOG_TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->index());
+                       } catch (std::exception& e) {
+                               LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
+                       }
+               }
+
+               if (encoded) {
+                       _writer->write (encoded, vf->index (), vf->eyes ());
+                       frame_done ();
+               } else {
+                       lock.lock ();
+                       LOG_GENERAL (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
+                       _queue.push_front (vf);
+                       lock.unlock ();
+               }
 
-       do_process_audio (data);
+               if (remote_backoff > 0) {
+                       dcpomatic_sleep (remote_backoff);
+               }
 
-       _audio_frame += data->frames ();
+               lock.lock ();
+               _condition.notify_all ();
+       }
+}
+catch (...)
+{
+       store_current ();
+}
+
+void
+Encoder::server_found (ServerDescription s)
+{
+       add_worker_threads (s);
 }