Clean up of 3D->2D conversion.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
60         : _film (film)
61         , _writer (writer)
62 {
63         servers_list_changed ();
64 }
65
66 Encoder::~Encoder ()
67 {
68         terminate_threads ();
69 }
70
71 void
72 Encoder::begin ()
73 {
74         if (!EncodeServerFinder::instance()->disabled ()) {
75                 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
76         }
77 }
78
79 void
80 Encoder::end ()
81 {
82         boost::mutex::scoped_lock lock (_queue_mutex);
83
84         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
85
86         /* Keep waking workers until the queue is empty */
87         while (!_queue.empty ()) {
88                 rethrow ();
89                 _empty_condition.notify_all ();
90                 _full_condition.wait (lock);
91         }
92
93         lock.unlock ();
94
95         LOG_GENERAL_NC (N_("Terminating encoder threads"));
96
97         terminate_threads ();
98
99         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
100
101         /* The following sequence of events can occur in the above code:
102              1. a remote worker takes the last image off the queue
103              2. the loop above terminates
104              3. the remote worker fails to encode the image and puts it back on the queue
105              4. the remote worker is then terminated by terminate_threads
106
107              So just mop up anything left in the queue here.
108         */
109
110         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
111                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
112                 try {
113                         _writer->write (
114                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
115                                 (*i)->index (),
116                                 (*i)->eyes ()
117                                 );
118                         frame_done ();
119                 } catch (std::exception& e) {
120                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
121                 }
122         }
123 }
124
125 /** @return an estimate of the current number of frames we are encoding per second,
126  *  or 0 if not known.
127  */
128 float
129 Encoder::current_encoding_rate () const
130 {
131         boost::mutex::scoped_lock lock (_state_mutex);
132         if (int (_time_history.size()) < _history_size) {
133                 return 0;
134         }
135
136         struct timeval now;
137         gettimeofday (&now, 0);
138
139         return _history_size / (seconds (now) - seconds (_time_history.back ()));
140 }
141
142 /** @return Number of video frames that have been queued for encoding */
143 int
144 Encoder::video_frames_enqueued () const
145 {
146         return _last_player_video->time().frames_floor (_film->video_frame_rate ());
147 }
148
149 /** Should be called when a frame has been encoded successfully.
150  *  @param n Source frame index.
151  */
152 void
153 Encoder::frame_done ()
154 {
155         boost::mutex::scoped_lock lock (_state_mutex);
156
157         struct timeval tv;
158         gettimeofday (&tv, 0);
159         _time_history.push_front (tv);
160         if (int (_time_history.size()) > _history_size) {
161                 _time_history.pop_back ();
162         }
163 }
164
165 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
166  *  so each time the supplied frame is the one after the previous one.
167  *  pv represents one video frame, and could be empty if there is nothing to encode
168  *  for this DCP frame.
169  */
170 void
171 Encoder::encode (shared_ptr<PlayerVideo> pv)
172 {
173         _waker.nudge ();
174
175         size_t threads = 0;
176         {
177                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
178                 threads = _threads.size ();
179         }
180
181         boost::mutex::scoped_lock queue_lock (_queue_mutex);
182
183         /* Wait until the queue has gone down a bit */
184         while (_queue.size() >= threads * 2) {
185                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
186                 _full_condition.wait (queue_lock);
187                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
188         }
189
190         _writer->rethrow ();
191         /* Re-throw any exception raised by one of our threads.  If more
192            than one has thrown an exception, only one will be rethrown, I think;
193            but then, if that happens something has gone badly wrong.
194         */
195         rethrow ();
196
197         Frame const position = pv->time().frames_floor(_film->video_frame_rate());
198
199         if (_writer->can_fake_write (position)) {
200                 /* We can fake-write this frame */
201                 _writer->fake_write (position, pv->eyes ());
202                 frame_done ();
203         } else if (pv->has_j2k ()) {
204                 /* This frame already has JPEG2000 data, so just write it */
205                 _writer->write (pv->j2k(), position, pv->eyes ());
206         } else if (_last_player_video && _writer->can_repeat(position) && pv->same (_last_player_video)) {
207                 _writer->repeat (position, pv->eyes ());
208         } else {
209                 /* Queue this new frame for encoding */
210                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
211                 _queue.push_back (shared_ptr<DCPVideo> (
212                                           new DCPVideo (
213                                                   pv,
214                                                   position,
215                                                   _film->video_frame_rate(),
216                                                   _film->j2k_bandwidth(),
217                                                   _film->resolution(),
218                                                   _film->log()
219                                                   )
220                                           ));
221
222                 /* The queue might not be empty any more, so notify anything which is
223                    waiting on that.
224                 */
225                 _empty_condition.notify_all ();
226         }
227
228         _last_player_video = pv;
229 }
230
231 void
232 Encoder::terminate_threads ()
233 {
234         boost::mutex::scoped_lock threads_lock (_threads_mutex);
235
236         int n = 0;
237         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
238                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
239                 (*i)->interrupt ();
240                 DCPOMATIC_ASSERT ((*i)->joinable ());
241                 (*i)->join ();
242                 delete *i;
243                 LOG_GENERAL_NC ("Thread terminated");
244                 ++n;
245         }
246
247         _threads.clear ();
248 }
249
250 void
251 Encoder::encoder_thread (optional<EncodeServerDescription> server)
252 try
253 {
254         if (server) {
255                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
256         } else {
257                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
258         }
259
260         /* Number of seconds that we currently wait between attempts
261            to connect to the server; not relevant for localhost
262            encodings.
263         */
264         int remote_backoff = 0;
265
266         while (true) {
267
268                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
269                 boost::mutex::scoped_lock lock (_queue_mutex);
270                 while (_queue.empty ()) {
271                         _empty_condition.wait (lock);
272                 }
273
274                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
275                 shared_ptr<DCPVideo> vf = _queue.front ();
276                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
277                 _queue.pop_front ();
278
279                 lock.unlock ();
280
281                 optional<Data> encoded;
282
283                 /* We need to encode this input */
284                 if (server) {
285                         try {
286                                 encoded = vf->encode_remotely (server.get ());
287
288                                 if (remote_backoff > 0) {
289                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
290                                 }
291
292                                 /* This job succeeded, so remove any backoff */
293                                 remote_backoff = 0;
294
295                         } catch (std::exception& e) {
296                                 if (remote_backoff < 60) {
297                                         /* back off more */
298                                         remote_backoff += 10;
299                                 }
300                                 LOG_ERROR (
301                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
302                                         vf->index(), server->host_name(), e.what(), remote_backoff
303                                         );
304                         }
305
306                 } else {
307                         try {
308                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
309                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
310                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
311                         } catch (std::exception& e) {
312                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
313                                 throw;
314                         }
315                 }
316
317                 if (encoded) {
318                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
319                         frame_done ();
320                 } else {
321                         lock.lock ();
322                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
323                         _queue.push_front (vf);
324                         lock.unlock ();
325                 }
326
327                 if (remote_backoff > 0) {
328                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
329                 }
330
331                 /* The queue might not be full any more, so notify anything that is waiting on that */
332                 lock.lock ();
333                 _full_condition.notify_all ();
334         }
335 }
336 catch (...)
337 {
338         store_current ();
339         /* Wake anything waiting on _full_condition so it can see the exception */
340         _full_condition.notify_all ();
341 }
342
343 void
344 Encoder::servers_list_changed ()
345 {
346         terminate_threads ();
347
348         /* XXX: could re-use threads */
349
350         boost::mutex::scoped_lock lm (_threads_mutex);
351
352 #ifdef BOOST_THREAD_PLATFORM_WIN32
353         OSVERSIONINFO info;
354         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
355         GetVersionEx (&info);
356         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
357         if (windows_xp) {
358                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
359         }
360 #endif
361
362         if (!Config::instance()->only_servers_encode ()) {
363                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
364                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
365                         _threads.push_back (t);
366 #ifdef BOOST_THREAD_PLATFORM_WIN32
367                         if (windows_xp) {
368                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
369                         }
370 #endif
371                 }
372         }
373
374         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
375                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
376                 for (int j = 0; j < i.threads(); ++j) {
377                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
378                 }
379         }
380
381         _writer->set_encoder_threads (_threads.size ());
382 }