Missing signal of _full_condition mutex.
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 #include <fstream>
21 #include <cerrno>
22 #include <libdcp/mono_picture_asset.h>
23 #include <libdcp/stereo_picture_asset.h>
24 #include <libdcp/sound_asset.h>
25 #include <libdcp/reel.h>
26 #include <libdcp/dcp.h>
27 #include <libdcp/cpl.h>
28 #include "writer.h"
29 #include "compose.hpp"
30 #include "film.h"
31 #include "ratio.h"
32 #include "log.h"
33 #include "dcp_video_frame.h"
34 #include "dcp_content_type.h"
35 #include "player.h"
36 #include "audio_mapping.h"
37 #include "config.h"
38 #include "job.h"
39 #include "cross.h"
40
41 #include "i18n.h"
42
43 using std::make_pair;
44 using std::pair;
45 using std::string;
46 using std::list;
47 using std::cout;
48 using boost::shared_ptr;
49 using boost::weak_ptr;
50
51 int const Writer::_maximum_frames_in_memory = Config::instance()->num_local_encoding_threads() + 4;
52
53 Writer::Writer (shared_ptr<const Film> f, weak_ptr<Job> j)
54         : _film (f)
55         , _job (j)
56         , _first_nonexistant_frame (0)
57         , _thread (0)
58         , _finish (false)
59         , _queued_full_in_memory (0)
60         , _last_written_frame (-1)
61         , _last_written_eyes (EYES_RIGHT)
62         , _full_written (0)
63         , _fake_written (0)
64         , _repeat_written (0)
65         , _pushed_to_disk (0)
66 {
67         /* Remove any old DCP */
68         boost::filesystem::remove_all (_film->dir (_film->dcp_name ()));
69
70         shared_ptr<Job> job = _job.lock ();
71         assert (job);
72
73         job->sub (_("Checking existing image data"));
74         check_existing_picture_mxf ();
75
76         /* Create our picture asset in a subdirectory, named according to those
77            film's parameters which affect the video output.  We will hard-link
78            it into the DCP later.
79         */
80
81         if (_film->three_d ()) {
82                 _picture_asset.reset (new libdcp::StereoPictureAsset (_film->internal_video_mxf_dir (), _film->internal_video_mxf_filename ()));
83         } else {
84                 _picture_asset.reset (new libdcp::MonoPictureAsset (_film->internal_video_mxf_dir (), _film->internal_video_mxf_filename ()));
85         }
86
87         _picture_asset->set_edit_rate (_film->video_frame_rate ());
88         _picture_asset->set_size (fit_ratio_within (_film->container()->ratio(), _film->full_frame ()));
89
90         if (_film->encrypted ()) {
91                 _picture_asset->set_key (_film->key ());
92         }
93         
94         /* Write the sound asset into the film directory so that we leave the creation
95            of the DCP directory until the last minute.  Some versions of windows inexplicably
96            don't like overwriting existing files here, so try to remove it using boost.
97         */
98         boost::system::error_code ec;
99         boost::filesystem::remove_all (_film->file (_film->audio_mxf_filename ()), ec);
100         if (ec) {
101                 _film->log()->log (
102                         String::compose (
103                                 "Could not remove existing audio MXF file %1 (%2)",
104                                 _film->file (_film->audio_mxf_filename ()),
105                                 ec.value ())
106                         );
107         }
108
109         _picture_asset_writer = _picture_asset->start_write (_first_nonexistant_frame > 0);
110
111         _sound_asset.reset (new libdcp::SoundAsset (_film->directory (), _film->audio_mxf_filename ()));
112         _sound_asset->set_edit_rate (_film->video_frame_rate ());
113         _sound_asset->set_channels (_film->audio_channels ());
114         _sound_asset->set_sampling_rate (_film->audio_frame_rate ());
115
116         if (_film->encrypted ()) {
117                 _sound_asset->set_key (_film->key ());
118         }
119         
120         _sound_asset_writer = _sound_asset->start_write ();
121
122         _thread = new boost::thread (boost::bind (&Writer::thread, this));
123
124         job->sub (_("Encoding image data"));
125 }
126
127 void
128 Writer::write (shared_ptr<const EncodedData> encoded, int frame, Eyes eyes)
129 {
130         boost::mutex::scoped_lock lock (_mutex);
131
132         while (_queued_full_in_memory > _maximum_frames_in_memory) {
133                 _full_condition.wait (lock);
134         }
135
136         QueueItem qi;
137         qi.type = QueueItem::FULL;
138         qi.encoded = encoded;
139         qi.frame = frame;
140
141         if (_film->three_d() && eyes == EYES_BOTH) {
142                 /* 2D material in a 3D DCP; fake the 3D */
143                 qi.eyes = EYES_LEFT;
144                 _queue.push_back (qi);
145                 ++_queued_full_in_memory;
146                 qi.eyes = EYES_RIGHT;
147                 _queue.push_back (qi);
148                 ++_queued_full_in_memory;
149         } else {
150                 qi.eyes = eyes;
151                 _queue.push_back (qi);
152                 ++_queued_full_in_memory;
153         }
154         
155         _empty_condition.notify_all ();
156 }
157
158 void
159 Writer::fake_write (int frame, Eyes eyes)
160 {
161         boost::mutex::scoped_lock lock (_mutex);
162
163         while (_queued_full_in_memory > _maximum_frames_in_memory) {
164                 _full_condition.wait (lock);
165         }
166         
167         FILE* ifi = fopen_boost (_film->info_path (frame, eyes), "r");
168         libdcp::FrameInfo info (ifi);
169         fclose (ifi);
170         
171         QueueItem qi;
172         qi.type = QueueItem::FAKE;
173         qi.size = info.size;
174         qi.frame = frame;
175         if (_film->three_d() && eyes == EYES_BOTH) {
176                 qi.eyes = EYES_LEFT;
177                 _queue.push_back (qi);
178                 qi.eyes = EYES_RIGHT;
179                 _queue.push_back (qi);
180         } else {
181                 qi.eyes = eyes;
182                 _queue.push_back (qi);
183         }
184
185         _empty_condition.notify_all ();
186 }
187
188 /** This method is not thread safe */
189 void
190 Writer::write (shared_ptr<const AudioBuffers> audio)
191 {
192         _sound_asset_writer->write (audio->data(), audio->frames());
193 }
194
195 /** This must be called from Writer::thread() with an appropriate lock held */
196 bool
197 Writer::have_sequenced_image_at_queue_head ()
198 {
199         if (_queue.empty ()) {
200                 return false;
201         }
202
203         _queue.sort ();
204
205         /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
206
207         if (_queue.front().eyes == EYES_BOTH) {
208                 /* 2D */
209                 return _queue.front().frame == (_last_written_frame + 1);
210         }
211
212         /* 3D */
213
214         if (_last_written_eyes == EYES_LEFT && _queue.front().frame == _last_written_frame && _queue.front().eyes == EYES_RIGHT) {
215                 return true;
216         }
217
218         if (_last_written_eyes == EYES_RIGHT && _queue.front().frame == (_last_written_frame + 1) && _queue.front().eyes == EYES_LEFT) {
219                 return true;
220         }
221
222         return false;
223 }
224
225 void
226 Writer::thread ()
227 try
228 {
229         while (1)
230         {
231                 boost::mutex::scoped_lock lock (_mutex);
232
233                 while (1) {
234                         
235                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
236                                 break;
237                         }
238
239                         TIMING (N_("writer sleeps with a queue of %1"), _queue.size());
240                         _empty_condition.wait (lock);
241                         TIMING (N_("writer wakes with a queue of %1"), _queue.size());
242                 }
243
244                 if (_finish && _queue.empty()) {
245                         return;
246                 }
247
248                 /* Write any frames that we can write; i.e. those that are in sequence. */
249                 while (have_sequenced_image_at_queue_head ()) {
250                         QueueItem qi = _queue.front ();
251                         _queue.pop_front ();
252                         if (qi.type == QueueItem::FULL && qi.encoded) {
253                                 --_queued_full_in_memory;
254                         }
255
256                         lock.unlock ();
257                         switch (qi.type) {
258                         case QueueItem::FULL:
259                         {
260                                 _film->log()->log (String::compose (N_("Writer FULL-writes %1 to MXF"), qi.frame));
261                                 if (!qi.encoded) {
262                                         qi.encoded.reset (new EncodedData (_film->j2c_path (qi.frame, qi.eyes, false)));
263                                 }
264
265                                 libdcp::FrameInfo fin = _picture_asset_writer->write (qi.encoded->data(), qi.encoded->size());
266                                 qi.encoded->write_info (_film, qi.frame, qi.eyes, fin);
267                                 _last_written[qi.eyes] = qi.encoded;
268                                 ++_full_written;
269                                 break;
270                         }
271                         case QueueItem::FAKE:
272                                 _film->log()->log (String::compose (N_("Writer FAKE-writes %1 to MXF"), qi.frame));
273                                 _picture_asset_writer->fake_write (qi.size);
274                                 _last_written[qi.eyes].reset ();
275                                 ++_fake_written;
276                                 break;
277                         case QueueItem::REPEAT:
278                         {
279                                 _film->log()->log (String::compose (N_("Writer REPEAT-writes %1 to MXF"), qi.frame));
280                                 libdcp::FrameInfo fin = _picture_asset_writer->write (
281                                         _last_written[qi.eyes]->data(),
282                                         _last_written[qi.eyes]->size()
283                                         );
284                                 
285                                 _last_written[qi.eyes]->write_info (_film, qi.frame, qi.eyes, fin);
286                                 ++_repeat_written;
287                                 break;
288                         }
289                         }
290                         lock.lock ();
291
292                         _last_written_frame = qi.frame;
293                         _last_written_eyes = qi.eyes;
294                         
295                         if (_film->length()) {
296                                 shared_ptr<Job> job = _job.lock ();
297                                 assert (job);
298                                 int total = _film->time_to_video_frames (_film->length ());
299                                 if (_film->three_d ()) {
300                                         /* _full_written and so on are incremented for each eye, so we need to double the total
301                                            frames to get the correct progress.
302                                         */
303                                         total *= 2;
304                                 }
305                                 job->set_progress (float (_full_written + _fake_written + _repeat_written) / total);
306                         }
307                 }
308
309                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
310                         /* Too many frames in memory which can't yet be written to the stream.
311                            Write some FULL frames to disk.
312                         */
313
314                         /* Find one from the back of the queue */
315                         _queue.sort ();
316                         list<QueueItem>::reverse_iterator i = _queue.rbegin ();
317                         while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
318                                 ++i;
319                         }
320
321                         assert (i != _queue.rend());
322                         QueueItem qi = *i;
323
324                         ++_pushed_to_disk;
325                         
326                         lock.unlock ();
327
328                         _film->log()->log (
329                                 String::compose (
330                                         "Writer full (awaiting %1 [last eye was %2]); pushes %3 to disk",
331                                         _last_written_frame + 1,
332                                         _last_written_eyes, qi.frame)
333                                 );
334                         
335                         qi.encoded->write (_film, qi.frame, qi.eyes);
336                         lock.lock ();
337                         qi.encoded.reset ();
338                         --_queued_full_in_memory;
339                 }
340
341                 _full_condition.notify_all ();
342         }
343 }
344 catch (...)
345 {
346         store_current ();
347 }
348
349 void
350 Writer::finish ()
351 {
352         if (!_thread) {
353                 return;
354         }
355         
356         boost::mutex::scoped_lock lock (_mutex);
357         _finish = true;
358         _empty_condition.notify_all ();
359         _full_condition.notify_all ();
360         lock.unlock ();
361
362         _thread->join ();
363         rethrow ();
364         
365         delete _thread;
366         _thread = 0;
367
368         _picture_asset_writer->finalize ();
369         _sound_asset_writer->finalize ();
370         
371         int const frames = _last_written_frame + 1;
372
373         _picture_asset->set_duration (frames);
374
375         /* Hard-link the video MXF into the DCP */
376         boost::filesystem::path video_from;
377         video_from /= _film->internal_video_mxf_dir();
378         video_from /= _film->internal_video_mxf_filename();
379         
380         boost::filesystem::path video_to;
381         video_to /= _film->dir (_film->dcp_name());
382         video_to /= _film->video_mxf_filename ();
383
384         boost::system::error_code ec;
385         boost::filesystem::create_hard_link (video_from, video_to, ec);
386         if (ec) {
387                 /* hard link failed; copy instead */
388                 boost::filesystem::copy_file (video_from, video_to);
389                 _film->log()->log ("Hard-link failed; fell back to copying");
390         }
391
392         /* And update the asset */
393
394         _picture_asset->set_directory (_film->dir (_film->dcp_name ()));
395         _picture_asset->set_file_name (_film->video_mxf_filename ());
396
397         /* Move the audio MXF into the DCP */
398
399         boost::filesystem::path audio_to;
400         audio_to /= _film->dir (_film->dcp_name ());
401         audio_to /= _film->audio_mxf_filename ();
402         
403         boost::filesystem::rename (_film->file (_film->audio_mxf_filename ()), audio_to, ec);
404         if (ec) {
405                 throw FileError (
406                         String::compose (_("could not move audio MXF into the DCP (%1)"), ec.value ()), _film->file (_film->audio_mxf_filename ())
407                         );
408         }
409
410         _sound_asset->set_directory (_film->dir (_film->dcp_name ()));
411         _sound_asset->set_duration (frames);
412         
413         libdcp::DCP dcp (_film->dir (_film->dcp_name()));
414
415         shared_ptr<libdcp::CPL> cpl (
416                 new libdcp::CPL (
417                         _film->dir (_film->dcp_name()),
418                         _film->dcp_name(),
419                         _film->dcp_content_type()->libdcp_kind (),
420                         frames,
421                         _film->video_frame_rate ()
422                         )
423                 );
424         
425         dcp.add_cpl (cpl);
426
427         cpl->add_reel (shared_ptr<libdcp::Reel> (new libdcp::Reel (
428                                                          _picture_asset,
429                                                          _sound_asset,
430                                                          shared_ptr<libdcp::SubtitleAsset> ()
431                                                          )
432                                ));
433
434         shared_ptr<Job> job = _job.lock ();
435         assert (job);
436
437         job->sub (_("Computing image digest"));
438         _picture_asset->compute_digest (boost::bind (&Job::set_progress, job.get(), _1, false));
439
440         job->sub (_("Computing audio digest"));
441         _sound_asset->compute_digest (boost::bind (&Job::set_progress, job.get(), _1, false));
442
443         libdcp::XMLMetadata meta = Config::instance()->dcp_metadata ();
444         meta.set_issue_date_now ();
445         dcp.write_xml (_film->interop (), meta, _film->is_signed() ? make_signer () : shared_ptr<const libdcp::Signer> ());
446
447         _film->log()->log (
448                 String::compose (N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT; %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk)
449                 );
450 }
451
452 /** Tell the writer that frame `f' should be a repeat of the frame before it */
453 void
454 Writer::repeat (int f, Eyes e)
455 {
456         boost::mutex::scoped_lock lock (_mutex);
457
458         while (_queued_full_in_memory > _maximum_frames_in_memory) {
459                 _full_condition.wait (lock);
460         }
461         
462         QueueItem qi;
463         qi.type = QueueItem::REPEAT;
464         qi.frame = f;
465         if (_film->three_d() && e == EYES_BOTH) {
466                 qi.eyes = EYES_LEFT;
467                 _queue.push_back (qi);
468                 qi.eyes = EYES_RIGHT;
469                 _queue.push_back (qi);
470         } else {
471                 qi.eyes = e;
472                 _queue.push_back (qi);
473         }
474
475         _empty_condition.notify_all ();
476 }
477
478 bool
479 Writer::check_existing_picture_mxf_frame (FILE* mxf, int f, Eyes eyes)
480 {
481         /* Read the frame info as written */
482         FILE* ifi = fopen_boost (_film->info_path (f, eyes), "r");
483         if (!ifi) {
484                 _film->log()->log (String::compose ("Existing frame %1 has no info file", f));
485                 return false;
486         }
487         
488         libdcp::FrameInfo info (ifi);
489         fclose (ifi);
490         if (info.size == 0) {
491                 _film->log()->log (String::compose ("Existing frame %1 has no info file", f));
492                 return false;
493         }
494         
495         /* Read the data from the MXF and hash it */
496         dcpomatic_fseek (mxf, info.offset, SEEK_SET);
497         EncodedData data (info.size);
498         size_t const read = fread (data.data(), 1, data.size(), mxf);
499         if (read != static_cast<size_t> (data.size ())) {
500                 _film->log()->log (String::compose ("Existing frame %1 is incomplete", f));
501                 return false;
502         }
503         
504         string const existing_hash = md5_digest (data.data(), data.size());
505         if (existing_hash != info.hash) {
506                 _film->log()->log (String::compose ("Existing frame %1 failed hash check", f));
507                 return false;
508         }
509
510         return true;
511 }
512
513 void
514 Writer::check_existing_picture_mxf ()
515 {
516         /* Try to open the existing MXF */
517         boost::filesystem::path p;
518         p /= _film->internal_video_mxf_dir ();
519         p /= _film->internal_video_mxf_filename ();
520         FILE* mxf = fopen_boost (p, "rb");
521         if (!mxf) {
522                 _film->log()->log (String::compose ("Could not open existing MXF at %1 (errno=%2)", p.string(), errno));
523                 return;
524         }
525
526         int N = 0;
527         for (boost::filesystem::directory_iterator i (_film->info_dir ()); i != boost::filesystem::directory_iterator (); ++i) {
528                 ++N;
529         }
530
531         while (1) {
532
533                 shared_ptr<Job> job = _job.lock ();
534                 assert (job);
535
536                 job->set_progress (float (_first_nonexistant_frame) / N);
537
538                 if (_film->three_d ()) {
539                         if (!check_existing_picture_mxf_frame (mxf, _first_nonexistant_frame, EYES_LEFT)) {
540                                 break;
541                         }
542                         if (!check_existing_picture_mxf_frame (mxf, _first_nonexistant_frame, EYES_RIGHT)) {
543                                 break;
544                         }
545                 } else {
546                         if (!check_existing_picture_mxf_frame (mxf, _first_nonexistant_frame, EYES_BOTH)) {
547                                 break;
548                         }
549                 }
550
551                 _film->log()->log (String::compose ("Have existing frame %1", _first_nonexistant_frame));
552                 ++_first_nonexistant_frame;
553         }
554
555         fclose (mxf);
556 }
557
558 /** @param frame Frame index.
559  *  @return true if we can fake-write this frame.
560  */
561 bool
562 Writer::can_fake_write (int frame) const
563 {
564         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
565            parameters in the MXF writer.
566         */
567         return (frame != 0 && frame < _first_nonexistant_frame);
568 }
569
570 bool
571 operator< (QueueItem const & a, QueueItem const & b)
572 {
573         if (a.frame != b.frame) {
574                 return a.frame < b.frame;
575         }
576
577         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
578 }
579
580 bool
581 operator== (QueueItem const & a, QueueItem const & b)
582 {
583         return a.frame == b.frame && a.eyes == b.eyes;
584 }