Add writer class to pull some stuff out of Encoder.
authorCarl Hetherington <cth@carlh.net>
Fri, 18 Jan 2013 00:40:49 +0000 (00:40 +0000)
committerCarl Hetherington <cth@carlh.net>
Fri, 18 Jan 2013 00:40:49 +0000 (00:40 +0000)
NOTES [new file with mode: 0644]
src/lib/encoder.cc
src/lib/encoder.h
src/lib/writer.cc [new file with mode: 0644]
src/lib/writer.h [new file with mode: 0644]
src/lib/wscript

diff --git a/NOTES b/NOTES
new file mode 100644 (file)
index 0000000..5be074c
--- /dev/null
+++ b/NOTES
@@ -0,0 +1,6 @@
+
+Encoder to write audio mxf; and sort out creation of those mxfs
+DCP job still to write XML; presumably.
+Write hashes of frames after successful write.
+Make check of hashes optional
+
index b37a3c098d9a245381b05ac13a61ec7439509c75..bad420f353531e3d011abb6e45d7900f485dfb39 100644 (file)
@@ -37,6 +37,7 @@
 #include "server.h"
 #include "format.h"
 #include "cross.h"
+#include "writer.h"
 
 using std::pair;
 using std::string;
@@ -48,7 +49,6 @@ using std::make_pair;
 using namespace boost;
 
 int const Encoder::_history_size = 25;
-unsigned int const Encoder::_maximum_frames_in_memory = 8;
 
 /** @param f Film that we are encoding.
  *  @param o Options.
@@ -65,9 +65,6 @@ Encoder::Encoder (shared_ptr<const Film> f)
 #endif
        , _have_a_real_frame (false)
        , _terminate_encoder (false)
-       , _writer_thread (0)
-       , _finish_writer (false)
-       , _last_written_frame (-1)
 {
        if (_film->audio_stream()) {
                /* Create sound output files with .tmp suffixes; we will rename
@@ -92,7 +89,9 @@ Encoder::~Encoder ()
 {
        close_sound_files ();
        terminate_worker_threads ();
-       finish_writer_thread ();
+       if (_writer) {
+               _writer->finish ();
+       }
 }
 
 void
@@ -139,18 +138,7 @@ Encoder::process_begin ()
                }
        }
 
-       /* XXX! */
-       _picture_asset.reset (
-               new libdcp::MonoPictureAsset (
-                       _film->dir (_film->dcp_name()),
-                       String::compose ("video_%1.mxf", 0),
-                       DCPFrameRate (_film->frames_per_second()).frames_per_second,
-                       _film->format()->dcp_size()
-                       )
-               );
-       
-       _picture_asset_writer = _picture_asset->start_write ();
-       _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
+       _writer.reset (new Writer (_film));
 }
 
 
@@ -222,20 +210,15 @@ Encoder::process_end ()
        for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
                _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
                try {
-                       shared_ptr<EncodedData> e = (*i)->encode_locally ();
-                       {
-                               boost::mutex::scoped_lock lock2 (_writer_mutex);
-                               _write_queue.push_back (make_pair (e, (*i)->frame ()));
-                               _writer_condition.notify_all ();
-                       }
+                       _writer->write ((*i)->encode_locally(), (*i)->frame ());
                        frame_done ();
                } catch (std::exception& e) {
                        _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
                }
        }
 
-       finish_writer_thread ();
-       _picture_asset_writer->finalize ();
+       _writer->finish ();
+       _writer.reset ();
 }      
 
 /** @return an estimate of the current number of frames we are encoding per second,
@@ -328,12 +311,8 @@ Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Su
        }
 
        if (same && _have_a_real_frame) {
-               /* Use the last frame that we encoded.  We do this by putting a null encoded
-                  frame straight onto the writer's queue.  It will know to duplicate the previous frame
-                  in this case.
-               */
-               boost::mutex::scoped_lock lock2 (_writer_mutex);
-               _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
+               /* Use the last frame that we encoded. */
+               _writer->repeat (_video_frames_out);
                frame_done ();
        } else {
                /* Queue this new frame for encoding */
@@ -357,8 +336,7 @@ Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Su
        ++_video_frames_out;
 
        if (dfr.repeat) {
-               boost::mutex::scoped_lock lock2 (_writer_mutex);
-               _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
+               _writer->repeat (_video_frames_out);
                ++_video_frames_out;
                frame_done ();
        }
@@ -447,23 +425,6 @@ Encoder::terminate_worker_threads ()
        }
 }
 
-void
-Encoder::finish_writer_thread ()
-{
-       if (!_writer_thread) {
-               return;
-       }
-       
-       boost::mutex::scoped_lock lock (_writer_mutex);
-       _finish_writer = true;
-       _writer_condition.notify_all ();
-       lock.unlock ();
-
-       _writer_thread->join ();
-       delete _writer_thread;
-       _writer_thread = 0;
-}
-
 void
 Encoder::encoder_thread (ServerDescription* server)
 {
@@ -528,9 +489,7 @@ Encoder::encoder_thread (ServerDescription* server)
                }
 
                if (encoded) {
-                       boost::mutex::scoped_lock lock2 (_writer_mutex);
-                       _write_queue.push_back (make_pair (encoded, vf->frame ()));
-                       _writer_condition.notify_all ();
+                       _writer->write (encoded, vf->frame ());
                        frame_done ();
                } else {
                        lock.lock ();
@@ -549,110 +508,3 @@ Encoder::encoder_thread (ServerDescription* server)
                _worker_condition.notify_all ();
        }
 }
-
-void
-Encoder::link (string a, string b) const
-{
-#ifdef DVDOMATIC_POSIX                 
-       int const r = symlink (a.c_str(), b.c_str());
-       if (r) {
-               throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
-       }
-#endif
-       
-#ifdef DVDOMATIC_WINDOWS
-       boost::filesystem::copy_file (a, b);
-#endif                 
-}
-
-struct WriteQueueSorter
-{
-       bool operator() (pair<shared_ptr<EncodedData>, int> const & a, pair<shared_ptr<EncodedData>, int> const & b) {
-               return a.second < b.second;
-       }
-};
-
-void
-Encoder::writer_thread ()
-{
-       while (1)
-       {
-               boost::mutex::scoped_lock lock (_writer_mutex);
-
-               while (1) {
-                       if (_finish_writer ||
-                           _write_queue.size() > _maximum_frames_in_memory ||
-                           (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1))) {
-                                   
-                                   break;
-                           }
-
-                           TIMING ("writer sleeps with a queue of %1; %2 pending", _write_queue.size(), _pending.size());
-                           _writer_condition.wait (lock);
-                           TIMING ("writer wakes with a queue of %1", _write_queue.size());
-
-                           _write_queue.sort (WriteQueueSorter ());
-               }
-
-               if (_finish_writer && _write_queue.empty() && _pending.empty()) {
-                       return;
-               }
-
-               /* Write any frames that we can write; i.e. those that are in sequence */
-               while (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1)) {
-                       pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
-                       _write_queue.pop_front ();
-
-                       lock.unlock ();
-                       _film->log()->log (String::compose ("Writer writes %1 to MXF", encoded.second));
-                       if (encoded.first) {
-                               _picture_asset_writer->write (encoded.first->data(), encoded.first->size());
-                               _last_written = encoded.first;
-                       } else {
-                               _picture_asset_writer->write (_last_written->data(), _last_written->size());
-                       }
-                       lock.lock ();
-
-                       ++_last_written_frame;
-               }
-
-               while (_write_queue.size() > _maximum_frames_in_memory) {
-                       /* Too many frames in memory which can't yet be written to the stream.
-                          Put some to disk.
-                       */
-
-                       pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.back ();
-                       _write_queue.pop_back ();
-                       if (!encoded.first) {
-                               /* This is a `repeat-last' frame, so no need to write it to disk */
-                               continue;
-                       }
-
-                       lock.unlock ();
-                       _film->log()->log (String::compose ("Writer full (awaiting %1); pushes %2 to disk", _last_written_frame + 1, encoded.second));
-                       encoded.first->write (_film, encoded.second);
-                       lock.lock ();
-
-                       _pending.push_back (encoded.second);
-               }
-
-               while (_write_queue.size() < _maximum_frames_in_memory && !_pending.empty()) {
-                       /* We have some space in memory.  Fetch some frames back off disk. */
-
-                       _pending.sort ();
-                       int const fetch = _pending.front ();
-
-                       lock.unlock ();
-                       _film->log()->log (String::compose ("Writer pulls %1 back from disk", fetch));
-                       shared_ptr<EncodedData> encoded;
-                       if (boost::filesystem::exists (_film->frame_out_path (fetch, false))) {
-                               /* It's an actual frame (not a repeat-last); load it in */
-                               encoded.reset (new EncodedData (_film->frame_out_path (fetch, false)));
-                       }
-                       lock.lock ();
-
-                       _write_queue.push_back (make_pair (encoded, fetch));
-                       _pending.remove (fetch);
-               }
-       }
-}
index 3e2b5d9578f153faed59477f3d5d3df67464c1a7..a277aca51d85f387926485a50ba7593e2a5041e2 100644 (file)
@@ -51,11 +51,7 @@ class Film;
 class ServerDescription;
 class DCPVideoFrame;
 class EncodedData;
-
-namespace libdcp {
-       class MonoPictureAsset;
-       class MonoPictureAssetWriter;
-}
+class Writer;
 
 /** @class Encoder
  *  @brief Encoder to J2K and WAV for DCP.
@@ -100,7 +96,6 @@ private:
 
        void encoder_thread (ServerDescription *);
        void terminate_worker_threads ();
-       void link (std::string, std::string) const;
 
        /** Film that we are encoding */
        boost::shared_ptr<const Film> _film;
@@ -125,9 +120,6 @@ private:
        /** Number of audio frames written for the DCP so far */
        int64_t _audio_frames_out;
 
-       void writer_thread ();
-       void finish_writer_thread ();
-
 #if HAVE_SWRESAMPLE    
        SwrContext* _swr_context;
 #endif
@@ -141,18 +133,7 @@ private:
        mutable boost::mutex _worker_mutex;
        boost::condition _worker_condition;
 
-       boost::thread* _writer_thread;
-       bool _finish_writer;
-       std::list<std::pair<boost::shared_ptr<EncodedData>, int> > _write_queue;
-       mutable boost::mutex _writer_mutex;
-       boost::condition _writer_condition;
-       boost::shared_ptr<EncodedData> _last_written;
-       std::list<int> _pending;
-       int _last_written_frame;
-       static const unsigned int _maximum_frames_in_memory;
-
-       boost::shared_ptr<libdcp::MonoPictureAsset> _picture_asset;
-       boost::shared_ptr<libdcp::MonoPictureAssetWriter> _picture_asset_writer;
+       boost::shared_ptr<Writer> _writer;
 };
 
 #endif
diff --git a/src/lib/writer.cc b/src/lib/writer.cc
new file mode 100644 (file)
index 0000000..0381ee3
--- /dev/null
@@ -0,0 +1,180 @@
+/*
+    Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#include <libdcp/picture_asset.h>
+#include "writer.h"
+#include "compose.hpp"
+#include "film.h"
+#include "format.h"
+#include "log.h"
+#include "dcp_video_frame.h"
+
+using std::make_pair;
+using std::pair;
+using boost::shared_ptr;
+
+unsigned int const Writer::_maximum_frames_in_memory = 8;
+
+Writer::Writer (shared_ptr<const Film> f)
+       : _film (f)
+       , _thread (0)
+       , _finish (false)
+       , _last_written_frame (-1)
+{
+       _picture_asset.reset (
+               new libdcp::MonoPictureAsset (
+                       _film->dir (_film->dcp_name()),
+                       String::compose ("video_%1.mxf", 0),
+                       DCPFrameRate (_film->frames_per_second()).frames_per_second,
+                       _film->format()->dcp_size()
+                       )
+               );
+       
+       _picture_asset_writer = _picture_asset->start_write ();
+
+       _thread = new boost::thread (boost::bind (&Writer::thread, this));
+}
+
+void
+Writer::write (shared_ptr<EncodedData> encoded, int frame)
+{
+       boost::mutex::scoped_lock lock (_mutex);
+       _queue.push_back (make_pair (encoded, frame));
+       _condition.notify_all ();
+}
+
+struct QueueSorter
+{
+       bool operator() (pair<shared_ptr<EncodedData>, int> const & a, pair<shared_ptr<EncodedData>, int> const & b) {
+               return a.second < b.second;
+       }
+};
+
+void
+Writer::thread ()
+{
+       while (1)
+       {
+               boost::mutex::scoped_lock lock (_mutex);
+
+               while (1) {
+                       if (_finish ||
+                           _queue.size() > _maximum_frames_in_memory ||
+                           (!_queue.empty() && _queue.front().second == (_last_written_frame + 1))) {
+                                   
+                                   break;
+                           }
+
+                           TIMING ("writer sleeps with a queue of %1; %2 pending", _queue.size(), _pending.size());
+                           _condition.wait (lock);
+                           TIMING ("writer wakes with a queue of %1", _queue.size());
+
+                           _queue.sort (QueueSorter ());
+               }
+
+               if (_finish && _queue.empty() && _pending.empty()) {
+                       return;
+               }
+
+               /* Write any frames that we can write; i.e. those that are in sequence */
+               while (!_queue.empty() && _queue.front().second == (_last_written_frame + 1)) {
+                       pair<boost::shared_ptr<EncodedData>, int> encoded = _queue.front ();
+                       _queue.pop_front ();
+
+                       lock.unlock ();
+                       _film->log()->log (String::compose ("Writer writes %1 to MXF", encoded.second));
+                       if (encoded.first) {
+                               _picture_asset_writer->write (encoded.first->data(), encoded.first->size());
+                               _last_written = encoded.first;
+                       } else {
+                               _picture_asset_writer->write (_last_written->data(), _last_written->size());
+                       }
+                       lock.lock ();
+
+                       ++_last_written_frame;
+               }
+
+               while (_queue.size() > _maximum_frames_in_memory) {
+                       /* Too many frames in memory which can't yet be written to the stream.
+                          Put some to disk.
+                       */
+
+                       pair<boost::shared_ptr<EncodedData>, int> encoded = _queue.back ();
+                       _queue.pop_back ();
+                       if (!encoded.first) {
+                               /* This is a `repeat-last' frame, so no need to write it to disk */
+                               continue;
+                       }
+
+                       lock.unlock ();
+                       _film->log()->log (String::compose ("Writer full (awaiting %1); pushes %2 to disk", _last_written_frame + 1, encoded.second));
+                       encoded.first->write (_film, encoded.second);
+                       lock.lock ();
+
+                       _pending.push_back (encoded.second);
+               }
+
+               while (_queue.size() < _maximum_frames_in_memory && !_pending.empty()) {
+                       /* We have some space in memory.  Fetch some frames back off disk. */
+
+                       _pending.sort ();
+                       int const fetch = _pending.front ();
+
+                       lock.unlock ();
+                       _film->log()->log (String::compose ("Writer pulls %1 back from disk", fetch));
+                       shared_ptr<EncodedData> encoded;
+                       if (boost::filesystem::exists (_film->frame_out_path (fetch, false))) {
+                               /* It's an actual frame (not a repeat-last); load it in */
+                               encoded.reset (new EncodedData (_film->frame_out_path (fetch, false)));
+                       }
+                       lock.lock ();
+
+                       _queue.push_back (make_pair (encoded, fetch));
+                       _pending.remove (fetch);
+               }
+       }
+
+}
+
+void
+Writer::finish ()
+{
+       if (!_thread) {
+               return;
+       }
+       
+       boost::mutex::scoped_lock lock (_mutex);
+       _finish = true;
+       _condition.notify_all ();
+       lock.unlock ();
+
+       _thread->join ();
+       delete _thread;
+       _thread = 0;
+
+       _picture_asset_writer->finalize ();
+}
+
+/** Tell the writer that frame `f' should be a repeat of the frame before it */
+void
+Writer::repeat (int f)
+{
+       boost::mutex::scoped_lock lock (_mutex);
+       _queue.push_back (make_pair (shared_ptr<EncodedData> (), f));
+}
diff --git a/src/lib/writer.h b/src/lib/writer.h
new file mode 100644 (file)
index 0000000..8156308
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+    Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
+
+    This program is free software; you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation; either version 2 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program; if not, write to the Free Software
+    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+*/
+
+#include <list>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/condition.hpp>
+
+class Film;
+class EncodedData;
+
+namespace libdcp {
+       class MonoPictureAsset;
+       class MonoPictureAssetWriter;
+}
+
+class Writer
+{
+public:
+       Writer (boost::shared_ptr<const Film>);
+       
+       void write (boost::shared_ptr<EncodedData>, int);
+       void repeat (int f);
+       void finish ();
+
+private:
+
+       void thread ();
+
+       boost::shared_ptr<const Film> _film;
+
+       boost::thread* _thread;
+       bool _finish;
+       std::list<std::pair<boost::shared_ptr<EncodedData>, int> > _queue;
+       mutable boost::mutex _mutex;
+       boost::condition _condition;
+       boost::shared_ptr<EncodedData> _last_written;
+       std::list<int> _pending;
+       int _last_written_frame;
+       static const unsigned int _maximum_frames_in_memory;
+
+       boost::shared_ptr<libdcp::MonoPictureAsset> _picture_asset;
+       boost::shared_ptr<libdcp::MonoPictureAssetWriter> _picture_asset_writer;
+};
index b2b639f06d375928c9872e12b288136fa3f2da0f..6d72f6480f3e450786aef961e93896952924d6e6 100644 (file)
@@ -56,6 +56,7 @@ def build(bld):
                 version.cc
                  video_decoder.cc
                  video_source.cc
+                 writer.cc
                 """
 
     obj.target = 'dvdomatic'