Forward-declare grk_plugin stuff.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "compose.hpp"
28 #include "config.h"
29 #include "cross.h"
30 #include "dcp_video.h"
31 #include "dcpomatic_log.h"
32 #include "encode_server_description.h"
33 #include "encode_server_finder.h"
34 #include "film.h"
35 #include "cpu_j2k_encoder_thread.h"
36 #ifdef DCPOMATIC_GROK
37 #include "grok/context.h"
38 #include "grok_j2k_encoder_thread.h"
39 #endif
40 #include "remote_j2k_encoder_thread.h"
41 #include "j2k_encoder.h"
42 #include "log.h"
43 #include "player_video.h"
44 #include "util.h"
45 #include "writer.h"
46 #include <libcxml/cxml.h>
47 #include <iostream>
48
49 #include "i18n.h"
50
51
52 using std::cout;
53 using std::dynamic_pointer_cast;
54 using std::exception;
55 using std::list;
56 using std::make_shared;
57 using std::shared_ptr;
58 using std::weak_ptr;
59 using boost::optional;
60 using dcp::Data;
61 using namespace dcpomatic;
62
63
64 /** @param film Film that we are encoding.
65  *  @param writer Writer that we are using.
66  */
67 J2KEncoder::J2KEncoder(shared_ptr<const Film> film, Writer& writer)
68         : _film (film)
69         , _history (200)
70         , _writer (writer)
71 #ifdef DCPOMATIC_GROK
72         , _dcpomatic_context(new grk_plugin::DcpomaticContext(film, writer, _history, Config::instance()->gpu_binary_location()))
73         , _context(Config::instance()->enable_gpu() ? new grk_plugin::GrokContext(_dcpomatic_context) : nullptr)
74 #endif
75 {
76         servers_list_changed ();
77 }
78
79
80 J2KEncoder::~J2KEncoder ()
81 {
82         _server_found_connection.disconnect();
83
84         terminate_threads();
85
86 #ifdef DCPOMATIC_GROK
87         delete _context;
88         delete _dcpomatic_context;
89 #endif
90 }
91
92
93 void
94 J2KEncoder::servers_list_changed()
95 {
96         auto config = Config::instance();
97
98         auto const cpu = (config->enable_gpu() || config->only_servers_encode()) ? 0 : config->master_encoding_threads();
99         auto const gpu = config->enable_gpu() ? config->master_encoding_threads() : 0;
100
101         remake_threads(cpu, gpu, EncodeServerFinder::instance()->servers());
102 }
103
104
105 void
106 J2KEncoder::begin ()
107 {
108         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect(
109                 boost::bind(&J2KEncoder::servers_list_changed, this)
110                 );
111 }
112
113
114 void
115 J2KEncoder::pause()
116 {
117         if (!Config::instance()->enable_gpu()) {
118                 return;
119         }
120
121         terminate_threads ();
122
123         /* Something might have been thrown during terminate_threads */
124         rethrow ();
125
126 #ifdef DCPOMATIC_GROK
127         delete _context;
128         _context = nullptr;
129 #endif
130 }
131
132
133 void J2KEncoder::resume()
134 {
135         if (!Config::instance()->enable_gpu()) {
136                 return;
137         }
138
139 #ifdef DCPOMATIC_GROK
140         _context = new grk_plugin::GrokContext(_dcpomatic_context);
141 #endif
142         servers_list_changed();
143 }
144
145
146 void
147 J2KEncoder::end()
148 {
149         boost::mutex::scoped_lock lock (_queue_mutex);
150
151         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
152
153         /* Keep waking workers until the queue is empty */
154         while (!_queue.empty ()) {
155                 rethrow ();
156                 _full_condition.wait (lock);
157         }
158         lock.unlock ();
159
160         LOG_GENERAL_NC (N_("Terminating encoder threads"));
161
162         terminate_threads ();
163
164         /* Something might have been thrown during terminate_threads */
165         rethrow ();
166
167         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
168
169         /* The following sequence of events can occur in the above code:
170              1. a remote worker takes the last image off the queue
171              2. the loop above terminates
172              3. the remote worker fails to encode the image and puts it back on the queue
173              4. the remote worker is then terminated by terminate_threads
174
175              So just mop up anything left in the queue here.
176         */
177         for (auto & i: _queue) {
178 #ifdef DCPOMATIC_GROK
179                 if (Config::instance()->enable_gpu ()) {
180                         if (!_context->scheduleCompress(i)){
181                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), i.index());
182                                 // handle error
183                         }
184                 } else {
185 #else
186                 {
187 #endif
188                         LOG_GENERAL(N_("Encode left-over frame %1"), i.index());
189                         try {
190                                 _writer.write(
191                                         make_shared<dcp::ArrayData>(i.encode_locally()),
192                                         i.index(),
193                                         i.eyes()
194                                         );
195                                 frame_done ();
196                         } catch (std::exception& e) {
197                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
198                         }
199                 }
200         }
201
202 #ifdef DCPOMATIC_GROK
203         delete _context;
204         _context = nullptr;
205 #endif
206 }
207
208
209 /** @return an estimate of the current number of frames we are encoding per second,
210  *  if known.
211  */
212 optional<float>
213 J2KEncoder::current_encoding_rate () const
214 {
215         return _history.rate ();
216 }
217
218
219 /** @return Number of video frames that have been queued for encoding */
220 int
221 J2KEncoder::video_frames_enqueued () const
222 {
223         if (!_last_player_video_time) {
224                 return 0;
225         }
226
227         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
228 }
229
230
231 /** Should be called when a frame has been encoded successfully */
232 void
233 J2KEncoder::frame_done ()
234 {
235         _history.event ();
236 }
237
238
239 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
240  *  so each time the supplied frame is the one after the previous one.
241  *  pv represents one video frame, and could be empty if there is nothing to encode
242  *  for this DCP frame.
243  *
244  *  @param pv PlayerVideo to encode.
245  *  @param time Time of \p pv within the DCP.
246  */
247 void
248 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
249 {
250         _waker.nudge ();
251
252         size_t threads = 0;
253         {
254                 boost::mutex::scoped_lock lm (_threads_mutex);
255                 threads = _threads.size();
256         }
257
258         boost::mutex::scoped_lock queue_lock (_queue_mutex);
259
260         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
261            when there are no threads.
262         */
263         while (_queue.size() >= (threads * 2) + 1) {
264                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
265                 _full_condition.wait (queue_lock);
266                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
267         }
268
269         _writer.rethrow();
270         /* Re-throw any exception raised by one of our threads.  If more
271            than one has thrown an exception, only one will be rethrown, I think;
272            but then, if that happens something has gone badly wrong.
273         */
274         rethrow ();
275
276         auto const position = time.frames_floor(_film->video_frame_rate());
277
278         if (_writer.can_fake_write(position)) {
279                 /* We can fake-write this frame */
280                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
281                 _writer.fake_write(position, pv->eyes ());
282                 frame_done ();
283         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
284                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
285                 /* This frame already has J2K data, so just write it */
286                 _writer.write(pv->j2k(), position, pv->eyes ());
287                 frame_done ();
288         } else if (_last_player_video[pv->eyes()] && _writer.can_repeat(position) && pv->same(_last_player_video[pv->eyes()])) {
289                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
290                 _writer.repeat(position, pv->eyes());
291         } else {
292                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
293                 /* Queue this new frame for encoding */
294                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
295                 auto dcpv = DCPVideo(
296                                 pv,
297                                 position,
298                                 _film->video_frame_rate(),
299                                 _film->j2k_bandwidth(),
300                                 _film->resolution()
301                                 );
302                 _queue.push_back (dcpv);
303
304                 /* The queue might not be empty any more, so notify anything which is
305                    waiting on that.
306                 */
307                 _empty_condition.notify_all ();
308         }
309
310         _last_player_video[pv->eyes()] = pv;
311         _last_player_video_time = time;
312 }
313
314
315 void
316 J2KEncoder::terminate_threads ()
317 {
318         boost::mutex::scoped_lock lm(_threads_mutex);
319         boost::this_thread::disable_interruption dis;
320
321         for (auto& thread: _threads) {
322                 thread->stop();
323         }
324
325         _threads.clear();
326         _ending = true;
327 }
328
329
330 void
331 #ifdef DCPOMATIC_GROK
332 J2KEncoder::remake_threads(int cpu, int gpu, list<EncodeServerDescription> servers)
333 #else
334 J2KEncoder::remake_threads(int cpu, int, list<EncodeServerDescription> servers)
335 #endif
336 {
337         boost::mutex::scoped_lock lm (_threads_mutex);
338         if (_ending) {
339                 return;
340         }
341
342         auto remove_threads = [this](int wanted, int current, std::function<bool (shared_ptr<J2KEncoderThread>)> predicate) {
343                 for (auto i = wanted; i < current; ++i) {
344                         auto iter = std::find_if(_threads.begin(), _threads.end(), predicate);
345                         if (iter != _threads.end()) {
346                                 (*iter)->stop();
347                                 _threads.erase(iter);
348                         }
349                 }
350         };
351
352
353         /* CPU */
354
355         auto const is_cpu_thread = [](shared_ptr<J2KEncoderThread> thread) {
356                 return static_cast<bool>(dynamic_pointer_cast<CPUJ2KEncoderThread>(thread));
357         };
358
359         auto const current_cpu_threads = std::count_if(_threads.begin(), _threads.end(), is_cpu_thread);
360
361         for (auto i = current_cpu_threads; i < cpu; ++i) {
362                 auto thread = make_shared<CPUJ2KEncoderThread>(*this);
363                 thread->start();
364                 _threads.push_back(thread);
365         }
366
367         remove_threads(cpu, current_cpu_threads, is_cpu_thread);
368
369 #ifdef DCPOMATIC_GROK
370         /* GPU */
371
372         auto const is_grok_thread = [](shared_ptr<J2KEncoderThread> thread) {
373                 return static_cast<bool>(dynamic_pointer_cast<GrokJ2KEncoderThread>(thread));
374         };
375
376         auto const current_gpu_threads = std::count_if(_threads.begin(), _threads.end(), is_grok_thread);
377
378         for (auto i = current_gpu_threads; i < gpu; ++i) {
379                 auto thread = make_shared<GrokJ2KEncoderThread>(*this, _context);
380                 thread->start();
381                 _threads.push_back(thread);
382         }
383
384         remove_threads(gpu, current_gpu_threads, is_grok_thread);
385 #endif
386
387         /* Remote */
388
389         for (auto const& server: servers) {
390                 if (!server.current_link_version()) {
391                         continue;
392                 }
393
394                 auto is_remote_thread = [server](shared_ptr<J2KEncoderThread> thread) {
395                         auto remote = dynamic_pointer_cast<RemoteJ2KEncoderThread>(thread);
396                         return remote && remote->server().host_name() == server.host_name();
397                 };
398
399                 auto const current_threads = std::count_if(_threads.begin(), _threads.end(), is_remote_thread);
400
401                 auto const wanted_threads = server.threads();
402
403                 if (wanted_threads > current_threads) {
404                         LOG_GENERAL(N_("Adding %1 worker threads for remote %2"), wanted_threads - current_threads, server.host_name());
405                 } else if (wanted_threads < current_threads) {
406                         LOG_GENERAL(N_("Removing %1 worker threads for remote %2"), current_threads - wanted_threads, server.host_name());
407                 }
408
409                 for (auto i = current_threads; i < wanted_threads; ++i) {
410                         auto thread = make_shared<RemoteJ2KEncoderThread>(*this, server);
411                         thread->start();
412                         _threads.push_back(thread);
413                 }
414
415                 remove_threads(wanted_threads, current_threads, is_remote_thread);
416         }
417
418         _writer.set_encoder_threads(_threads.size());
419 }
420
421
422 DCPVideo
423 J2KEncoder::pop()
424 {
425         boost::mutex::scoped_lock lock(_queue_mutex);
426         while (_queue.empty()) {
427                 _empty_condition.wait (lock);
428         }
429
430         LOG_TIMING("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
431
432         auto vf = _queue.front();
433         _queue.pop_front();
434
435         _full_condition.notify_all();
436         return vf;
437 }
438
439
440 void
441 J2KEncoder::retry(DCPVideo video)
442 {
443         boost::mutex::scoped_lock lock(_queue_mutex);
444         _queue.push_front(video);
445         _empty_condition.notify_all();
446 }
447
448
449 void
450 J2KEncoder::write(shared_ptr<const dcp::Data> data, int index, Eyes eyes)
451 {
452         _writer.write(data, index, eyes);
453         frame_done();
454 }