2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
21 /** @file src/encoder.h
22 * @brief Parent class for classes which can encode video and audio frames.
30 #include "dcp_video.h"
33 #include "encode_server_finder.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
56 int const Encoder::_history_size = 25;
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
64 servers_list_changed ();
75 if (!EncodeServerFinder::instance()->disabled ()) {
76 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
83 boost::mutex::scoped_lock lock (_queue_mutex);
85 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
87 /* Keep waking workers until the queue is empty */
88 while (!_queue.empty ()) {
90 _empty_condition.notify_all ();
91 _full_condition.wait (lock);
96 LOG_GENERAL_NC (N_("Terminating encoder threads"));
100 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
102 /* The following sequence of events can occur in the above code:
103 1. a remote worker takes the last image off the queue
104 2. the loop above terminates
105 3. the remote worker fails to encode the image and puts it back on the queue
106 4. the remote worker is then terminated by terminate_threads
108 So just mop up anything left in the queue here.
111 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
112 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
115 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
120 } catch (std::exception& e) {
121 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
126 /** @return an estimate of the current number of frames we are encoding per second,
130 Encoder::current_encoding_rate () const
132 boost::mutex::scoped_lock lock (_state_mutex);
133 if (int (_time_history.size()) < _history_size) {
138 gettimeofday (&now, 0);
140 return _history_size / (seconds (now) - seconds (_time_history.back ()));
143 /** @return Number of video frames that have been sent out */
145 Encoder::video_frames_out () const
147 boost::mutex::scoped_lock (_state_mutex);
151 /** Should be called when a frame has been encoded successfully.
152 * @param n Source frame index.
155 Encoder::frame_done ()
157 boost::mutex::scoped_lock lock (_state_mutex);
160 gettimeofday (&tv, 0);
161 _time_history.push_front (tv);
162 if (int (_time_history.size()) > _history_size) {
163 _time_history.pop_back ();
167 /** Called to start encoding of the next video frame in the DCP. This is called in order,
168 * so each time the supplied frame is the one after the previous one.
169 * pv represents one video frame, and could be empty if there is nothing to encode
170 * for this DCP frame.
173 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
175 BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
182 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
188 boost::mutex::scoped_lock threads_lock (_threads_mutex);
189 threads = _threads.size ();
192 boost::mutex::scoped_lock queue_lock (_queue_mutex);
194 /* XXX: discard 3D here if required */
196 /* Wait until the queue has gone down a bit */
197 while (_queue.size() >= threads * 2) {
198 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
199 _full_condition.wait (queue_lock);
200 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
204 /* Re-throw any exception raised by one of our threads. If more
205 than one has thrown an exception, only one will be rethrown, I think;
206 but then, if that happens something has gone badly wrong.
210 if (_writer->can_fake_write (_position)) {
211 /* We can fake-write this frame */
212 _writer->fake_write (_position, pv->eyes ());
214 } else if (pv->has_j2k ()) {
215 /* This frame already has JPEG2000 data, so just write it */
216 _writer->write (pv->j2k(), _position, pv->eyes ());
217 } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
218 _writer->repeat (_position, pv->eyes ());
220 /* Queue this new frame for encoding */
221 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
222 _queue.push_back (shared_ptr<DCPVideo> (
226 _film->video_frame_rate(),
227 _film->j2k_bandwidth(),
233 /* The queue might not be empty any more, so notify anything which is
236 _empty_condition.notify_all ();
239 _last_player_video = pv;
243 Encoder::terminate_threads ()
245 boost::mutex::scoped_lock threads_lock (_threads_mutex);
248 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
249 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
251 DCPOMATIC_ASSERT ((*i)->joinable ());
254 LOG_GENERAL_NC ("Thread terminated");
262 Encoder::encoder_thread (optional<EncodeServerDescription> server)
266 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
268 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
271 /* Number of seconds that we currently wait between attempts
272 to connect to the server; not relevant for localhost
275 int remote_backoff = 0;
279 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
280 boost::mutex::scoped_lock lock (_queue_mutex);
281 while (_queue.empty ()) {
282 _empty_condition.wait (lock);
285 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
286 shared_ptr<DCPVideo> vf = _queue.front ();
287 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
292 optional<Data> encoded;
294 /* We need to encode this input */
297 encoded = vf->encode_remotely (server.get ());
299 if (remote_backoff > 0) {
300 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
303 /* This job succeeded, so remove any backoff */
306 } catch (std::exception& e) {
307 if (remote_backoff < 60) {
309 remote_backoff += 10;
312 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
313 vf->index(), server->host_name(), e.what(), remote_backoff
319 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
320 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
321 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
322 } catch (std::exception& e) {
323 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
329 _writer->write (encoded.get(), vf->index (), vf->eyes ());
333 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
334 _queue.push_front (vf);
338 if (remote_backoff > 0) {
339 boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
342 /* The queue might not be full any more, so notify anything that is waiting on that */
344 _full_condition.notify_all ();
350 /* Wake anything waiting on _full_condition so it can see the exception */
351 _full_condition.notify_all ();
355 Encoder::servers_list_changed ()
357 terminate_threads ();
359 /* XXX: could re-use threads */
361 boost::mutex::scoped_lock lm (_threads_mutex);
363 #ifdef BOOST_THREAD_PLATFORM_WIN32
365 info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
366 GetVersionEx (&info);
367 bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
369 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
373 if (!Config::instance()->only_servers_encode ()) {
374 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
375 boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
376 _threads.push_back (t);
377 #ifdef BOOST_THREAD_PLATFORM_WIN32
379 SetThreadAffinityMask (t->native_handle(), 1 << i);
385 BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
386 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
387 for (int j = 0; j < i.threads(); ++j) {
388 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
392 _writer->set_encoder_threads (_threads.size ());