2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
33 #include "server_finder.h"
35 #include "player_video.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
46 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
47 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
48 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
56 int const Encoder::_history_size = 25;
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
63 , _terminate_enqueue (false)
64 , _terminate_encoding (false)
67 servers_list_changed ();
74 boost::mutex::scoped_lock lm (_queue_mutex);
75 _terminate_enqueue = true;
76 _full_condition.notify_all ();
77 _empty_condition.notify_all ();
83 if (!ServerFinder::instance()->disabled ()) {
84 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
91 boost::mutex::scoped_lock lock (_queue_mutex);
93 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
95 /* Keep waking workers until the queue is empty */
96 while (!_queue.empty ()) {
98 _empty_condition.notify_all ();
99 _full_condition.wait (lock);
104 LOG_GENERAL_NC (N_("Terminating encoder threads"));
106 terminate_threads ();
108 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
110 /* The following sequence of events can occur in the above code:
111 1. a remote worker takes the last image off the queue
112 2. the loop above terminates
113 3. the remote worker fails to encode the image and puts it back on the queue
114 4. the remote worker is then terminated by terminate_threads
116 So just mop up anything left in the queue here.
119 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
120 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
123 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
128 } catch (std::exception& e) {
129 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
134 /** @return an estimate of the current number of frames we are encoding per second,
138 Encoder::current_encoding_rate () const
140 boost::mutex::scoped_lock lock (_state_mutex);
141 if (int (_time_history.size()) < _history_size) {
146 gettimeofday (&now, 0);
148 return _history_size / (seconds (now) - seconds (_time_history.back ()));
151 /** @return Number of video frames that have been sent out */
153 Encoder::video_frames_out () const
155 boost::mutex::scoped_lock (_state_mutex);
159 /** Should be called when a frame has been encoded successfully.
160 * @param n Source frame index.
163 Encoder::frame_done ()
165 boost::mutex::scoped_lock lock (_state_mutex);
168 gettimeofday (&tv, 0);
169 _time_history.push_front (tv);
170 if (int (_time_history.size()) > _history_size) {
171 _time_history.pop_back ();
175 /** Called to start encoding of the next video frame in the DCP. This is called in order,
176 * so each time the supplied frame is the one after the previous one.
177 * pv represents one video frame, and could be empty if there is nothing to encode
178 * for this DCP frame.
181 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
184 _writer->ref_write (_position);
186 BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
194 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
200 boost::mutex::scoped_lock threads_lock (_threads_mutex);
201 threads = _threads.size ();
204 boost::mutex::scoped_lock queue_lock (_queue_mutex);
206 /* XXX: discard 3D here if required */
208 /* Wait until the queue has gone down a bit */
209 while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
210 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
211 _full_condition.wait (queue_lock);
212 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
215 if (_terminate_enqueue) {
220 /* Re-throw any exception raised by one of our threads. If more
221 than one has thrown an exception, only one will be rethrown, I think;
222 but then, if that happens something has gone badly wrong.
226 if (_writer->can_fake_write (_position)) {
227 /* We can fake-write this frame */
228 _writer->fake_write (_position, pv->eyes ());
230 } else if (pv->has_j2k ()) {
231 /* This frame already has JPEG2000 data, so just write it */
232 _writer->write (pv->j2k(), _position, pv->eyes ());
233 } else if (_last_player_video && pv->same (_last_player_video)) {
234 _writer->repeat (_position, pv->eyes ());
236 /* Queue this new frame for encoding */
237 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
238 _queue.push_back (shared_ptr<DCPVideo> (
242 _film->video_frame_rate(),
243 _film->j2k_bandwidth(),
249 /* The queue might not be empty any more, so notify anything which is
252 _empty_condition.notify_all ();
255 _last_player_video = pv;
259 Encoder::terminate_threads ()
262 boost::mutex::scoped_lock queue_lock (_queue_mutex);
263 _terminate_encoding = true;
264 _full_condition.notify_all ();
265 _empty_condition.notify_all ();
268 boost::mutex::scoped_lock threads_lock (_threads_mutex);
270 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
271 if ((*i)->joinable ()) {
278 _terminate_encoding = false;
282 Encoder::encoder_thread (optional<ServerDescription> server)
286 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
288 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
291 /* Number of seconds that we currently wait between attempts
292 to connect to the server; not relevant for localhost
295 int remote_backoff = 0;
299 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
300 boost::mutex::scoped_lock lock (_queue_mutex);
301 while (_queue.empty () && !_terminate_encoding) {
302 _empty_condition.wait (lock);
305 if (_terminate_encoding) {
309 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
310 shared_ptr<DCPVideo> vf = _queue.front ();
311 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
316 optional<Data> encoded;
318 /* We need to encode this input */
321 encoded = vf->encode_remotely (server.get ());
323 if (remote_backoff > 0) {
324 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
327 /* This job succeeded, so remove any backoff */
330 } catch (std::exception& e) {
331 if (remote_backoff < 60) {
333 remote_backoff += 10;
336 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
337 vf->index(), server->host_name(), e.what(), remote_backoff
343 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
344 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
345 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
346 } catch (std::exception& e) {
347 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
353 _writer->write (encoded.get(), vf->index (), vf->eyes ());
357 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
358 _queue.push_front (vf);
362 if (remote_backoff > 0) {
363 dcpomatic_sleep (remote_backoff);
366 /* The queue might not be full any more, so notify anything that is waiting on that */
368 _full_condition.notify_all ();
374 /* Wake anything waiting on _full_condition so it can see the exception */
375 _full_condition.notify_all ();
379 Encoder::servers_list_changed ()
381 terminate_threads ();
383 /* XXX: could re-use threads */
385 boost::mutex::scoped_lock lm (_threads_mutex);
387 if (!Config::instance()->only_servers_encode ()) {
388 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
389 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
393 BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
394 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
395 for (int j = 0; j < i.threads(); ++j) {
396 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
400 _writer->set_encoder_threads (_threads.size ());