2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
33 #include "server_finder.h"
35 #include "player_video.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
55 int const Encoder::_history_size = 25;
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
61 , _video_frames_enqueued (0)
64 , _terminate_enqueue (false)
65 , _terminate_encoding (false)
68 servers_list_changed ();
75 boost::mutex::scoped_lock lm (_queue_mutex);
76 _terminate_enqueue = true;
77 _full_condition.notify_all ();
78 _empty_condition.notify_all ();
84 if (!ServerFinder::instance()->disabled ()) {
85 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
92 boost::mutex::scoped_lock lock (_queue_mutex);
94 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
96 /* Keep waking workers until the queue is empty */
97 while (!_queue.empty ()) {
98 _empty_condition.notify_all ();
99 _full_condition.wait (lock);
104 terminate_threads ();
106 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
108 /* The following sequence of events can occur in the above code:
109 1. a remote worker takes the last image off the queue
110 2. the loop above terminates
111 3. the remote worker fails to encode the image and puts it back on the queue
112 4. the remote worker is then terminated by terminate_threads
114 So just mop up anything left in the queue here.
117 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
118 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
121 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
126 } catch (std::exception& e) {
127 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
132 /** @return an estimate of the current number of frames we are encoding per second,
136 Encoder::current_encoding_rate () const
138 boost::mutex::scoped_lock lock (_state_mutex);
139 if (int (_time_history.size()) < _history_size) {
144 gettimeofday (&now, 0);
146 return _history_size / (seconds (now) - seconds (_time_history.back ()));
149 /** @return Number of video frames that have been sent out */
151 Encoder::video_frames_out () const
153 boost::mutex::scoped_lock (_state_mutex);
154 return _video_frames_enqueued;
157 /** Should be called when a frame has been encoded successfully.
158 * @param n Source frame index.
161 Encoder::frame_done ()
163 boost::mutex::scoped_lock lock (_state_mutex);
166 gettimeofday (&tv, 0);
167 _time_history.push_front (tv);
168 if (int (_time_history.size()) > _history_size) {
169 _time_history.pop_back ();
173 /** Called in order, so each time this is called the supplied frame is the one
174 * after the previous one.
177 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
183 boost::mutex::scoped_lock threads_lock (_threads_mutex);
184 threads = _threads.size ();
187 boost::mutex::scoped_lock queue_lock (_queue_mutex);
189 /* XXX: discard 3D here if required */
191 /* Wait until the queue has gone down a bit */
192 while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
193 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
194 _full_condition.wait (queue_lock);
195 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
198 if (_terminate_enqueue) {
203 /* Re-throw any exception raised by one of our threads. If more
204 than one has thrown an exception, only one will be rethrown, I think;
205 but then, if that happens something has gone badly wrong.
209 if (_writer->can_fake_write (_video_frames_enqueued)) {
210 /* We can fake-write this frame */
211 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
213 } else if (pv->has_j2k ()) {
214 /* This frame already has JPEG2000 data, so just write it */
215 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
216 } else if (_last_player_video && pv->same (_last_player_video)) {
217 _writer->repeat (_video_frames_enqueued, pv->eyes ());
219 /* Queue this new frame for encoding */
220 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
221 _queue.push_back (shared_ptr<DCPVideo> (
224 _video_frames_enqueued,
225 _film->video_frame_rate(),
226 _film->j2k_bandwidth(),
232 /* The queue might not be empty any more, so notify anything which is
235 _empty_condition.notify_all ();
238 switch (pv->eyes ()) {
240 ++_video_frames_enqueued;
252 if (_left_done && _right_done) {
253 ++_video_frames_enqueued;
254 _left_done = _right_done = false;
257 _last_player_video = pv;
261 Encoder::terminate_threads ()
264 boost::mutex::scoped_lock queue_lock (_queue_mutex);
265 _terminate_encoding = true;
266 _full_condition.notify_all ();
267 _empty_condition.notify_all ();
270 boost::mutex::scoped_lock threads_lock (_threads_mutex);
272 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
273 if ((*i)->joinable ()) {
280 _terminate_encoding = false;
284 Encoder::encoder_thread (optional<ServerDescription> server)
288 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
290 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
293 /* Number of seconds that we currently wait between attempts
294 to connect to the server; not relevant for localhost
297 int remote_backoff = 0;
301 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
302 boost::mutex::scoped_lock lock (_queue_mutex);
303 while (_queue.empty () && !_terminate_encoding) {
304 _empty_condition.wait (lock);
307 if (_terminate_encoding) {
311 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
312 shared_ptr<DCPVideo> vf = _queue.front ();
313 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
318 optional<Data> encoded;
320 /* We need to encode this input */
323 encoded = vf->encode_remotely (server.get ());
325 if (remote_backoff > 0) {
326 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
329 /* This job succeeded, so remove any backoff */
332 } catch (std::exception& e) {
333 if (remote_backoff < 60) {
335 remote_backoff += 10;
338 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
339 vf->index(), server->host_name(), e.what(), remote_backoff
345 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
346 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
347 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
348 } catch (std::exception& e) {
349 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
355 _writer->write (encoded.get(), vf->index (), vf->eyes ());
359 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
360 _queue.push_front (vf);
364 if (remote_backoff > 0) {
365 dcpomatic_sleep (remote_backoff);
368 /* The queue might not be full any more, so notify anything that is waiting on that */
370 _full_condition.notify_all ();
379 Encoder::servers_list_changed ()
381 terminate_threads ();
383 /* XXX: could re-use threads */
385 boost::mutex::scoped_lock lm (_threads_mutex);
387 if (!Config::instance()->only_servers_encode ()) {
388 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
389 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
393 BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
394 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
395 for (int j = 0; j < i.threads(); ++j) {
396 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
400 _writer->set_encoder_threads (_threads.size ());