2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
33 #include "server_finder.h"
35 #include "player_video.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
46 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
47 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
48 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
56 int const Encoder::_history_size = 25;
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
62 , _video_frames_enqueued (0)
65 , _terminate_enqueue (false)
66 , _terminate_encoding (false)
69 servers_list_changed ();
76 boost::mutex::scoped_lock lm (_queue_mutex);
77 _terminate_enqueue = true;
78 _full_condition.notify_all ();
79 _empty_condition.notify_all ();
85 if (!ServerFinder::instance()->disabled ()) {
86 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
93 boost::mutex::scoped_lock lock (_queue_mutex);
95 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
97 /* Keep waking workers until the queue is empty */
98 while (!_queue.empty ()) {
100 _empty_condition.notify_all ();
101 _full_condition.wait (lock);
106 LOG_GENERAL_NC (N_("Terminating encoder threads"));
108 terminate_threads ();
110 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
112 /* The following sequence of events can occur in the above code:
113 1. a remote worker takes the last image off the queue
114 2. the loop above terminates
115 3. the remote worker fails to encode the image and puts it back on the queue
116 4. the remote worker is then terminated by terminate_threads
118 So just mop up anything left in the queue here.
121 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
122 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
125 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
130 } catch (std::exception& e) {
131 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
136 /** @return an estimate of the current number of frames we are encoding per second,
140 Encoder::current_encoding_rate () const
142 boost::mutex::scoped_lock lock (_state_mutex);
143 if (int (_time_history.size()) < _history_size) {
148 gettimeofday (&now, 0);
150 return _history_size / (seconds (now) - seconds (_time_history.back ()));
153 /** @return Number of video frames that have been sent out */
155 Encoder::video_frames_out () const
157 boost::mutex::scoped_lock (_state_mutex);
158 return _video_frames_enqueued;
161 /** Should be called when a frame has been encoded successfully.
162 * @param n Source frame index.
165 Encoder::frame_done ()
167 boost::mutex::scoped_lock lock (_state_mutex);
170 gettimeofday (&tv, 0);
171 _time_history.push_front (tv);
172 if (int (_time_history.size()) > _history_size) {
173 _time_history.pop_back ();
177 /** Called in order, so each time this is called the supplied frame is the one
178 * after the previous one.
181 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
187 boost::mutex::scoped_lock threads_lock (_threads_mutex);
188 threads = _threads.size ();
191 boost::mutex::scoped_lock queue_lock (_queue_mutex);
193 /* XXX: discard 3D here if required */
195 /* Wait until the queue has gone down a bit */
196 while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
197 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
198 _full_condition.wait (queue_lock);
199 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
202 if (_terminate_enqueue) {
207 /* Re-throw any exception raised by one of our threads. If more
208 than one has thrown an exception, only one will be rethrown, I think;
209 but then, if that happens something has gone badly wrong.
213 if (_writer->can_fake_write (_video_frames_enqueued)) {
214 /* We can fake-write this frame */
215 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
217 } else if (pv->has_j2k ()) {
218 /* This frame already has JPEG2000 data, so just write it */
219 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
220 } else if (_last_player_video && pv->same (_last_player_video)) {
221 _writer->repeat (_video_frames_enqueued, pv->eyes ());
223 /* Queue this new frame for encoding */
224 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
225 _queue.push_back (shared_ptr<DCPVideo> (
228 _video_frames_enqueued,
229 _film->video_frame_rate(),
230 _film->j2k_bandwidth(),
236 /* The queue might not be empty any more, so notify anything which is
239 _empty_condition.notify_all ();
242 switch (pv->eyes ()) {
244 ++_video_frames_enqueued;
256 if (_left_done && _right_done) {
257 ++_video_frames_enqueued;
258 _left_done = _right_done = false;
261 _last_player_video = pv;
265 Encoder::terminate_threads ()
268 boost::mutex::scoped_lock queue_lock (_queue_mutex);
269 _terminate_encoding = true;
270 _full_condition.notify_all ();
271 _empty_condition.notify_all ();
274 boost::mutex::scoped_lock threads_lock (_threads_mutex);
276 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
277 if ((*i)->joinable ()) {
284 _terminate_encoding = false;
288 Encoder::encoder_thread (optional<ServerDescription> server)
292 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
294 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
297 /* Number of seconds that we currently wait between attempts
298 to connect to the server; not relevant for localhost
301 int remote_backoff = 0;
305 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
306 boost::mutex::scoped_lock lock (_queue_mutex);
307 while (_queue.empty () && !_terminate_encoding) {
308 _empty_condition.wait (lock);
311 if (_terminate_encoding) {
315 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
316 shared_ptr<DCPVideo> vf = _queue.front ();
317 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
322 optional<Data> encoded;
324 /* We need to encode this input */
327 encoded = vf->encode_remotely (server.get ());
329 if (remote_backoff > 0) {
330 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
333 /* This job succeeded, so remove any backoff */
336 } catch (std::exception& e) {
337 if (remote_backoff < 60) {
339 remote_backoff += 10;
342 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
343 vf->index(), server->host_name(), e.what(), remote_backoff
349 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
350 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
351 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
352 } catch (std::exception& e) {
353 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
359 _writer->write (encoded.get(), vf->index (), vf->eyes ());
363 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
364 _queue.push_front (vf);
368 if (remote_backoff > 0) {
369 dcpomatic_sleep (remote_backoff);
372 /* The queue might not be full any more, so notify anything that is waiting on that */
374 _full_condition.notify_all ();
380 /* Wake anything waiting on _full_condition so it can see the exception */
381 _full_condition.notify_all ();
385 Encoder::servers_list_changed ()
387 terminate_threads ();
389 /* XXX: could re-use threads */
391 boost::mutex::scoped_lock lm (_threads_mutex);
393 if (!Config::instance()->only_servers_encode ()) {
394 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
395 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
399 BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
400 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
401 for (int j = 0; j < i.threads(); ++j) {
402 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
406 _writer->set_encoder_threads (_threads.size ());