2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
21 /** @file src/encoder.h
22 * @brief Parent class for classes which can encode video and audio frames.
30 #include "dcp_video.h"
33 #include "encode_server_finder.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
57 int const Encoder::_history_size = 200;
59 /** @param film Film that we are encoding.
60 * @param writer Writer that we are using.
62 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
66 servers_list_changed ();
74 /* Destructors must not throw exceptions; anything bad
75 happening now is too late to worry about anyway,
84 weak_ptr<Encoder> wp = shared_from_this ();
85 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
86 boost::bind (&Encoder::call_servers_list_changed, wp)
90 /* We don't want the servers-list-changed callback trying to do things
91 during destruction of Encoder, and I think this is the neatest way
95 Encoder::call_servers_list_changed (weak_ptr<Encoder> encoder)
97 shared_ptr<Encoder> e = encoder.lock ();
99 e->servers_list_changed ();
106 boost::mutex::scoped_lock lock (_queue_mutex);
108 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
110 /* Keep waking workers until the queue is empty */
111 while (!_queue.empty ()) {
113 _empty_condition.notify_all ();
114 _full_condition.wait (lock);
119 LOG_GENERAL_NC (N_("Terminating encoder threads"));
121 terminate_threads ();
123 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
125 /* The following sequence of events can occur in the above code:
126 1. a remote worker takes the last image off the queue
127 2. the loop above terminates
128 3. the remote worker fails to encode the image and puts it back on the queue
129 4. the remote worker is then terminated by terminate_threads
131 So just mop up anything left in the queue here.
134 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
135 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
138 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
143 } catch (std::exception& e) {
144 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
149 /** @return an estimate of the current number of frames we are encoding per second,
153 Encoder::current_encoding_rate () const
155 boost::mutex::scoped_lock lock (_state_mutex);
156 if (int (_time_history.size()) < _history_size) {
161 gettimeofday (&now, 0);
163 return _history_size / (seconds (now) - seconds (_time_history.back ()));
166 /** @return Number of video frames that have been queued for encoding */
168 Encoder::video_frames_enqueued () const
170 if (!_last_player_video_time) {
174 return _last_player_video_time->frames_floor (_film->video_frame_rate ());
177 /** Should be called when a frame has been encoded successfully */
179 Encoder::frame_done ()
181 boost::mutex::scoped_lock lock (_state_mutex);
184 gettimeofday (&tv, 0);
185 _time_history.push_front (tv);
186 if (int (_time_history.size()) > _history_size) {
187 _time_history.pop_back ();
191 /** Called to request encoding of the next video frame in the DCP. This is called in order,
192 * so each time the supplied frame is the one after the previous one.
193 * pv represents one video frame, and could be empty if there is nothing to encode
194 * for this DCP frame.
196 * @param pv PlayerVideo to encode.
197 * @param time Time of \p pv within the DCP.
200 Encoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
206 boost::mutex::scoped_lock threads_lock (_threads_mutex);
207 threads = _threads.size ();
210 boost::mutex::scoped_lock queue_lock (_queue_mutex);
212 /* Wait until the queue has gone down a bit. Allow one thing in the queue even
213 when there are no threads.
215 while (_queue.size() >= (threads * 2) + 1) {
216 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
217 _full_condition.wait (queue_lock);
218 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
222 /* Re-throw any exception raised by one of our threads. If more
223 than one has thrown an exception, only one will be rethrown, I think;
224 but then, if that happens something has gone badly wrong.
228 Frame const position = time.frames_floor(_film->video_frame_rate());
230 if (_writer->can_fake_write (position)) {
231 /* We can fake-write this frame */
232 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
233 _writer->fake_write (position, pv->eyes ());
235 } else if (pv->has_j2k ()) {
236 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
237 /* This frame already has JPEG2000 data, so just write it */
238 _writer->write (pv->j2k(), position, pv->eyes ());
239 } else if (_last_player_video && _writer->can_repeat(position) && pv->same (_last_player_video)) {
240 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
241 _writer->repeat (position, pv->eyes ());
243 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
244 /* Queue this new frame for encoding */
245 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
246 _queue.push_back (shared_ptr<DCPVideo> (
250 _film->video_frame_rate(),
251 _film->j2k_bandwidth(),
257 /* The queue might not be empty any more, so notify anything which is
260 _empty_condition.notify_all ();
263 _last_player_video = pv;
264 _last_player_video_time = time;
268 Encoder::terminate_threads ()
270 boost::mutex::scoped_lock threads_lock (_threads_mutex);
273 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
274 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
276 DCPOMATIC_ASSERT ((*i)->joinable ());
279 } catch (boost::thread_interrupted& e) {
280 /* This is to be expected */
283 LOG_GENERAL_NC ("Thread terminated");
291 Encoder::encoder_thread (optional<EncodeServerDescription> server)
295 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
297 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
300 /* Number of seconds that we currently wait between attempts
301 to connect to the server; not relevant for localhost
304 int remote_backoff = 0;
308 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
309 boost::mutex::scoped_lock lock (_queue_mutex);
310 while (_queue.empty ()) {
311 _empty_condition.wait (lock);
314 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
315 shared_ptr<DCPVideo> vf = _queue.front ();
317 /* We're about to commit to either encoding this frame or putting it back onto the queue,
318 so we must not be interrupted until one or other of these things have happened. This
319 block has thread interruption disabled.
322 boost::this_thread::disable_interruption dis;
324 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
329 optional<Data> encoded;
331 /* We need to encode this input */
334 encoded = vf->encode_remotely (server.get ());
336 if (remote_backoff > 0) {
337 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
340 /* This job succeeded, so remove any backoff */
343 } catch (std::exception& e) {
344 if (remote_backoff < 60) {
346 remote_backoff += 10;
349 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
350 vf->index(), server->host_name(), e.what(), remote_backoff
356 LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
357 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
358 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
359 } catch (std::exception& e) {
360 /* This is very bad, so don't cope with it, just pass it on */
361 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
367 _writer->write (encoded.get(), vf->index (), vf->eyes ());
371 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
372 _queue.push_front (vf);
377 if (remote_backoff > 0) {
378 boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
381 /* The queue might not be full any more, so notify anything that is waiting on that */
383 _full_condition.notify_all ();
386 catch (boost::thread_interrupted& e) {
387 /* Ignore these and just stop the thread */
388 _full_condition.notify_all ();
393 /* Wake anything waiting on _full_condition so it can see the exception */
394 _full_condition.notify_all ();
398 Encoder::servers_list_changed ()
400 terminate_threads ();
402 /* XXX: could re-use threads */
404 boost::mutex::scoped_lock lm (_threads_mutex);
406 #ifdef BOOST_THREAD_PLATFORM_WIN32
408 info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
409 GetVersionEx (&info);
410 bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
412 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
416 if (!Config::instance()->only_servers_encode ()) {
417 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
418 boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
419 _threads.push_back (t);
420 #ifdef BOOST_THREAD_PLATFORM_WIN32
422 SetThreadAffinityMask (t->native_handle(), 1 << i);
428 BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
429 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
430 for (int j = 0; j < i.threads(); ++j) {
431 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
435 _writer->set_encoder_threads (_threads.size ());