2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
33 #include "server_finder.h"
35 #include "player_video.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
46 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
47 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
48 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
56 int const Encoder::_history_size = 25;
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
63 , _terminate_enqueue (false)
64 , _terminate_encoding (false)
67 servers_list_changed ();
74 boost::mutex::scoped_lock lm (_queue_mutex);
75 _terminate_enqueue = true;
76 _full_condition.notify_all ();
77 _empty_condition.notify_all ();
83 if (!ServerFinder::instance()->disabled ()) {
84 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
91 boost::mutex::scoped_lock lock (_queue_mutex);
93 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
95 /* Keep waking workers until the queue is empty */
96 while (!_queue.empty ()) {
98 _empty_condition.notify_all ();
99 _full_condition.wait (lock);
104 LOG_GENERAL_NC (N_("Terminating encoder threads"));
106 terminate_threads ();
108 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
110 /* The following sequence of events can occur in the above code:
111 1. a remote worker takes the last image off the queue
112 2. the loop above terminates
113 3. the remote worker fails to encode the image and puts it back on the queue
114 4. the remote worker is then terminated by terminate_threads
116 So just mop up anything left in the queue here.
119 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
120 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
123 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
128 } catch (std::exception& e) {
129 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
134 /** @return an estimate of the current number of frames we are encoding per second,
138 Encoder::current_encoding_rate () const
140 boost::mutex::scoped_lock lock (_state_mutex);
141 if (int (_time_history.size()) < _history_size) {
146 gettimeofday (&now, 0);
148 return _history_size / (seconds (now) - seconds (_time_history.back ()));
151 /** @return Number of video frames that have been sent out */
153 Encoder::video_frames_out () const
155 boost::mutex::scoped_lock (_state_mutex);
159 /** Should be called when a frame has been encoded successfully.
160 * @param n Source frame index.
163 Encoder::frame_done ()
165 boost::mutex::scoped_lock lock (_state_mutex);
168 gettimeofday (&tv, 0);
169 _time_history.push_front (tv);
170 if (int (_time_history.size()) > _history_size) {
171 _time_history.pop_back ();
175 /** Called to start encoding of the next video frame in the DCP. This is called in order,
176 * so each time the supplied frame is the one after the previous one.
177 * pv represents one video frame, and could be empty if there is nothing to encode
178 * for this DCP frame.
181 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
183 BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
190 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
196 boost::mutex::scoped_lock threads_lock (_threads_mutex);
197 threads = _threads.size ();
200 boost::mutex::scoped_lock queue_lock (_queue_mutex);
202 /* XXX: discard 3D here if required */
204 /* Wait until the queue has gone down a bit */
205 while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
206 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
207 _full_condition.wait (queue_lock);
208 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
211 if (_terminate_enqueue) {
216 /* Re-throw any exception raised by one of our threads. If more
217 than one has thrown an exception, only one will be rethrown, I think;
218 but then, if that happens something has gone badly wrong.
222 if (_writer->can_fake_write (_position)) {
223 /* We can fake-write this frame */
224 _writer->fake_write (_position, pv->eyes ());
226 } else if (pv->has_j2k ()) {
227 /* This frame already has JPEG2000 data, so just write it */
228 _writer->write (pv->j2k(), _position, pv->eyes ());
229 } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
230 _writer->repeat (_position, pv->eyes ());
232 /* Queue this new frame for encoding */
233 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
234 _queue.push_back (shared_ptr<DCPVideo> (
238 _film->video_frame_rate(),
239 _film->j2k_bandwidth(),
245 /* The queue might not be empty any more, so notify anything which is
248 _empty_condition.notify_all ();
251 _last_player_video = pv;
255 Encoder::terminate_threads ()
258 boost::mutex::scoped_lock queue_lock (_queue_mutex);
259 _terminate_encoding = true;
260 _full_condition.notify_all ();
261 _empty_condition.notify_all ();
264 boost::mutex::scoped_lock threads_lock (_threads_mutex);
266 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
267 if ((*i)->joinable ()) {
274 _terminate_encoding = false;
278 Encoder::encoder_thread (optional<ServerDescription> server)
282 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
284 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
287 /* Number of seconds that we currently wait between attempts
288 to connect to the server; not relevant for localhost
291 int remote_backoff = 0;
295 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
296 boost::mutex::scoped_lock lock (_queue_mutex);
297 while (_queue.empty () && !_terminate_encoding) {
298 _empty_condition.wait (lock);
301 if (_terminate_encoding) {
305 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
306 shared_ptr<DCPVideo> vf = _queue.front ();
307 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
312 optional<Data> encoded;
314 /* We need to encode this input */
317 encoded = vf->encode_remotely (server.get ());
319 if (remote_backoff > 0) {
320 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
323 /* This job succeeded, so remove any backoff */
326 } catch (std::exception& e) {
327 if (remote_backoff < 60) {
329 remote_backoff += 10;
332 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
333 vf->index(), server->host_name(), e.what(), remote_backoff
339 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
340 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
341 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
342 } catch (std::exception& e) {
343 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
349 _writer->write (encoded.get(), vf->index (), vf->eyes ());
353 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
354 _queue.push_front (vf);
358 if (remote_backoff > 0) {
359 dcpomatic_sleep (remote_backoff);
362 /* The queue might not be full any more, so notify anything that is waiting on that */
364 _full_condition.notify_all ();
370 /* Wake anything waiting on _full_condition so it can see the exception */
371 _full_condition.notify_all ();
375 Encoder::servers_list_changed ()
377 terminate_threads ();
379 /* XXX: could re-use threads */
381 boost::mutex::scoped_lock lm (_threads_mutex);
383 if (!Config::instance()->only_servers_encode ()) {
384 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
385 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
389 BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
390 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
391 for (int j = 0; j < i.threads(); ++j) {
392 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
396 _writer->set_encoder_threads (_threads.size ());