Discard right-eye images when using 3D sources to make 2D DCPs.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
60         : _film (film)
61         , _position (0)
62         , _writer (writer)
63 {
64         servers_list_changed ();
65 }
66
67 Encoder::~Encoder ()
68 {
69         terminate_threads ();
70 }
71
72 void
73 Encoder::begin ()
74 {
75         if (!EncodeServerFinder::instance()->disabled ()) {
76                 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
77         }
78 }
79
80 void
81 Encoder::end ()
82 {
83         boost::mutex::scoped_lock lock (_queue_mutex);
84
85         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
86
87         /* Keep waking workers until the queue is empty */
88         while (!_queue.empty ()) {
89                 rethrow ();
90                 _empty_condition.notify_all ();
91                 _full_condition.wait (lock);
92         }
93
94         lock.unlock ();
95
96         LOG_GENERAL_NC (N_("Terminating encoder threads"));
97
98         terminate_threads ();
99
100         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
101
102         /* The following sequence of events can occur in the above code:
103              1. a remote worker takes the last image off the queue
104              2. the loop above terminates
105              3. the remote worker fails to encode the image and puts it back on the queue
106              4. the remote worker is then terminated by terminate_threads
107
108              So just mop up anything left in the queue here.
109         */
110
111         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
112                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
113                 try {
114                         _writer->write (
115                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
116                                 (*i)->index (),
117                                 (*i)->eyes ()
118                                 );
119                         frame_done ();
120                 } catch (std::exception& e) {
121                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
122                 }
123         }
124 }
125
126 /** @return an estimate of the current number of frames we are encoding per second,
127  *  or 0 if not known.
128  */
129 float
130 Encoder::current_encoding_rate () const
131 {
132         boost::mutex::scoped_lock lock (_state_mutex);
133         if (int (_time_history.size()) < _history_size) {
134                 return 0;
135         }
136
137         struct timeval now;
138         gettimeofday (&now, 0);
139
140         return _history_size / (seconds (now) - seconds (_time_history.back ()));
141 }
142
143 /** @return Number of video frames that have been sent out */
144 int
145 Encoder::video_frames_out () const
146 {
147         boost::mutex::scoped_lock (_state_mutex);
148         return _position;
149 }
150
151 /** Should be called when a frame has been encoded successfully.
152  *  @param n Source frame index.
153  */
154 void
155 Encoder::frame_done ()
156 {
157         boost::mutex::scoped_lock lock (_state_mutex);
158
159         struct timeval tv;
160         gettimeofday (&tv, 0);
161         _time_history.push_front (tv);
162         if (int (_time_history.size()) > _history_size) {
163                 _time_history.pop_back ();
164         }
165 }
166
167 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
168  *  so each time the supplied frame is the one after the previous one.
169  *  pv represents one video frame, and could be empty if there is nothing to encode
170  *  for this DCP frame.
171  */
172 void
173 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
174 {
175         BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
176                 if (!_film->three_d()) {
177                         /* 2D DCP */
178                         if (i->eyes() == EYES_RIGHT) {
179                                 /* Discard right-eye images */
180                                 continue;
181                         } else if (i->eyes() == EYES_LEFT) {
182                                 /* Use left-eye images for both eyes */
183                                 i->set_eyes (EYES_BOTH);
184                         }
185                 }
186
187                 enqueue (i);
188         }
189         ++_position;
190 }
191
192 void
193 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
194 {
195         _waker.nudge ();
196
197         size_t threads = 0;
198         {
199                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
200                 threads = _threads.size ();
201         }
202
203         boost::mutex::scoped_lock queue_lock (_queue_mutex);
204
205         /* Wait until the queue has gone down a bit */
206         while (_queue.size() >= threads * 2) {
207                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
208                 _full_condition.wait (queue_lock);
209                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
210         }
211
212         _writer->rethrow ();
213         /* Re-throw any exception raised by one of our threads.  If more
214            than one has thrown an exception, only one will be rethrown, I think;
215            but then, if that happens something has gone badly wrong.
216         */
217         rethrow ();
218
219         if (_writer->can_fake_write (_position)) {
220                 /* We can fake-write this frame */
221                 _writer->fake_write (_position, pv->eyes ());
222                 frame_done ();
223         } else if (pv->has_j2k ()) {
224                 /* This frame already has JPEG2000 data, so just write it */
225                 _writer->write (pv->j2k(), _position, pv->eyes ());
226         } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
227                 _writer->repeat (_position, pv->eyes ());
228         } else {
229                 /* Queue this new frame for encoding */
230                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
231                 _queue.push_back (shared_ptr<DCPVideo> (
232                                           new DCPVideo (
233                                                   pv,
234                                                   _position,
235                                                   _film->video_frame_rate(),
236                                                   _film->j2k_bandwidth(),
237                                                   _film->resolution(),
238                                                   _film->log()
239                                                   )
240                                           ));
241
242                 /* The queue might not be empty any more, so notify anything which is
243                    waiting on that.
244                 */
245                 _empty_condition.notify_all ();
246         }
247
248         _last_player_video = pv;
249 }
250
251 void
252 Encoder::terminate_threads ()
253 {
254         boost::mutex::scoped_lock threads_lock (_threads_mutex);
255
256         int n = 0;
257         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
258                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
259                 (*i)->interrupt ();
260                 DCPOMATIC_ASSERT ((*i)->joinable ());
261                 (*i)->join ();
262                 delete *i;
263                 LOG_GENERAL_NC ("Thread terminated");
264                 ++n;
265         }
266
267         _threads.clear ();
268 }
269
270 void
271 Encoder::encoder_thread (optional<EncodeServerDescription> server)
272 try
273 {
274         if (server) {
275                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
276         } else {
277                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
278         }
279
280         /* Number of seconds that we currently wait between attempts
281            to connect to the server; not relevant for localhost
282            encodings.
283         */
284         int remote_backoff = 0;
285
286         while (true) {
287
288                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
289                 boost::mutex::scoped_lock lock (_queue_mutex);
290                 while (_queue.empty ()) {
291                         _empty_condition.wait (lock);
292                 }
293
294                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
295                 shared_ptr<DCPVideo> vf = _queue.front ();
296                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
297                 _queue.pop_front ();
298
299                 lock.unlock ();
300
301                 optional<Data> encoded;
302
303                 /* We need to encode this input */
304                 if (server) {
305                         try {
306                                 encoded = vf->encode_remotely (server.get ());
307
308                                 if (remote_backoff > 0) {
309                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
310                                 }
311
312                                 /* This job succeeded, so remove any backoff */
313                                 remote_backoff = 0;
314
315                         } catch (std::exception& e) {
316                                 if (remote_backoff < 60) {
317                                         /* back off more */
318                                         remote_backoff += 10;
319                                 }
320                                 LOG_ERROR (
321                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
322                                         vf->index(), server->host_name(), e.what(), remote_backoff
323                                         );
324                         }
325
326                 } else {
327                         try {
328                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
329                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
330                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
331                         } catch (std::exception& e) {
332                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
333                                 throw;
334                         }
335                 }
336
337                 if (encoded) {
338                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
339                         frame_done ();
340                 } else {
341                         lock.lock ();
342                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
343                         _queue.push_front (vf);
344                         lock.unlock ();
345                 }
346
347                 if (remote_backoff > 0) {
348                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
349                 }
350
351                 /* The queue might not be full any more, so notify anything that is waiting on that */
352                 lock.lock ();
353                 _full_condition.notify_all ();
354         }
355 }
356 catch (...)
357 {
358         store_current ();
359         /* Wake anything waiting on _full_condition so it can see the exception */
360         _full_condition.notify_all ();
361 }
362
363 void
364 Encoder::servers_list_changed ()
365 {
366         terminate_threads ();
367
368         /* XXX: could re-use threads */
369
370         boost::mutex::scoped_lock lm (_threads_mutex);
371
372 #ifdef BOOST_THREAD_PLATFORM_WIN32
373         OSVERSIONINFO info;
374         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
375         GetVersionEx (&info);
376         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
377         if (windows_xp) {
378                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
379         }
380 #endif
381
382         if (!Config::instance()->only_servers_encode ()) {
383                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
384                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
385                         _threads.push_back (t);
386 #ifdef BOOST_THREAD_PLATFORM_WIN32
387                         if (windows_xp) {
388                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
389                         }
390 #endif
391                 }
392         }
393
394         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
395                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
396                 for (int j = 0; j < i.threads(); ++j) {
397                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
398                 }
399         }
400
401         _writer->set_encoder_threads (_threads.size ());
402 }