Simplify writer a bit and fix it when frames are arriving quickly.
authorCarl Hetherington <cth@carlh.net>
Thu, 7 Feb 2013 20:33:46 +0000 (20:33 +0000)
committerCarl Hetherington <cth@carlh.net>
Thu, 7 Feb 2013 20:33:46 +0000 (20:33 +0000)
src/lib/writer.cc
src/lib/writer.h

index 94072e6e8acd3cef1d2c15676ce7a73e7948ebae..91a692ba0d2e8140f9a9301fb3ed58896efe8b6f 100644 (file)
@@ -33,16 +33,18 @@ using std::make_pair;
 using std::pair;
 using std::string;
 using std::ifstream;
+using std::list;
 using std::cout;
 using boost::shared_ptr;
 
-unsigned int const Writer::_maximum_frames_in_memory = 8;
+int const Writer::_maximum_frames_in_memory = 8;
 
 Writer::Writer (shared_ptr<Film> f)
        : _film (f)
        , _first_nonexistant_frame (0)
        , _thread (0)
        , _finish (false)
+       , _queued_full_in_memory (0)
        , _last_written_frame (-1)
 {
        /* Remove any old DCP */
@@ -93,6 +95,7 @@ Writer::write (shared_ptr<const EncodedData> encoded, int frame)
        qi.encoded = encoded;
        qi.frame = frame;
        _queue.push_back (qi);
+       ++_queued_full_in_memory;
 
        _condition.notify_all ();
 }
@@ -129,21 +132,22 @@ Writer::thread ()
                boost::mutex::scoped_lock lock (_mutex);
 
                while (1) {
+                       
+                       _queue.sort ();
+                       
                        if (_finish ||
-                           _queue.size() > _maximum_frames_in_memory ||
+                           _queued_full_in_memory > _maximum_frames_in_memory ||
                            (!_queue.empty() && _queue.front().frame == (_last_written_frame + 1))) {
-                                   
-                                   break;
-                           }
-
-                           TIMING ("writer sleeps with a queue of %1; %2 pending", _queue.size(), _pending.size());
-                           _condition.wait (lock);
-                           TIMING ("writer wakes with a queue of %1", _queue.size());
-
-                           _queue.sort ();
+                               
+                               break;
+                       }
+                       
+                       TIMING ("writer sleeps with a queue of %1", _queue.size());
+                       _condition.wait (lock);
+                       TIMING ("writer wakes with a queue of %1", _queue.size());
                }
 
-               if (_finish && _queue.empty() && _pending.empty()) {
+               if (_finish && _queue.empty()) {
                        return;
                }
 
@@ -151,12 +155,18 @@ Writer::thread ()
                while (!_queue.empty() && _queue.front().frame == (_last_written_frame + 1)) {
                        QueueItem qi = _queue.front ();
                        _queue.pop_front ();
+                       if (qi.type == QueueItem::FULL && qi.encoded) {
+                               --_queued_full_in_memory;
+                       }
 
                        lock.unlock ();
                        switch (qi.type) {
                        case QueueItem::FULL:
                        {
                                _film->log()->log (String::compose ("Writer FULL-writes %1 to MXF", qi.frame));
+                               if (!qi.encoded) {
+                                       qi.encoded.reset (new EncodedData (_film->j2c_path (qi.frame, false)));
+                               }
                                libdcp::FrameInfo const fin = _picture_asset_writer->write (qi.encoded->data(), qi.encoded->size());
                                qi.encoded->write_info (_film, qi.frame, fin);
                                _last_written = qi.encoded;
@@ -180,41 +190,26 @@ Writer::thread ()
                        ++_last_written_frame;
                }
 
-               while (_queue.size() > _maximum_frames_in_memory) {
+               while (_queued_full_in_memory > _maximum_frames_in_memory) {
                        /* Too many frames in memory which can't yet be written to the stream.
-                          Put some in our pending list (and write FULL queue items' data to disk)
+                          Write some FULL frames to disk.
                        */
 
-                       QueueItem qi = _queue.back ();
-                       _queue.pop_back ();
-
-                       if (qi.type == QueueItem::FULL) {
-                               lock.unlock ();
-                               _film->log()->log (String::compose ("Writer full (awaiting %1); pushes %2 to disk", _last_written_frame + 1, qi.frame));
-                               qi.encoded->write (_film, qi.frame);
-                               lock.lock ();
-                               qi.encoded.reset ();
+                       /* Find one */
+                       list<QueueItem>::reverse_iterator i = _queue.rbegin ();
+                       while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
+                               ++i;
                        }
 
-                       _pending.push_back (qi);
-               }
+                       assert (i != _queue.rend());
+                       QueueItem qi = *i;
 
-               while (_queue.size() < _maximum_frames_in_memory && !_pending.empty()) {
-                       /* We have some space in memory.  Fetch some frames back off disk. */
-
-                       _pending.sort ();
-                       QueueItem qi = _pending.front ();
-
-                       if (qi.type == QueueItem::FULL) {
-                               lock.unlock ();
-                               _film->log()->log (String::compose ("Writer pulls %1 back from disk", qi.frame));
-                               shared_ptr<const EncodedData> encoded;
-                               qi.encoded.reset (new EncodedData (_film->j2c_path (qi.frame, false)));
-                               lock.lock ();
-                       }
-
-                       _queue.push_back (qi);
-                       _pending.remove (qi);
+                       lock.unlock ();
+                       _film->log()->log (String::compose ("Writer full (awaiting %1); pushes %2 to disk", _last_written_frame + 1, qi.frame));
+                       qi.encoded->write (_film, qi.frame);
+                       lock.lock ();
+                       qi.encoded.reset ();
+                       --_queued_full_in_memory;
                }
        }
 
index 57609825d06bde2d1b8f85ea23d0a416ca1a3415..68e422a502731c6d2791dbfaffc5a857ab6d7351 100644 (file)
@@ -77,12 +77,12 @@ private:
        boost::thread* _thread;
        bool _finish;
        std::list<QueueItem> _queue;
+       int _queued_full_in_memory;
        mutable boost::mutex _mutex;
        boost::condition _condition;
        boost::shared_ptr<const EncodedData> _last_written;
-       std::list<QueueItem> _pending;
        int _last_written_frame;
-       static const unsigned int _maximum_frames_in_memory;
+       static const int _maximum_frames_in_memory;
 
        boost::shared_ptr<libdcp::MonoPictureAsset> _picture_asset;
        boost::shared_ptr<libdcp::MonoPictureAssetWriter> _picture_asset_writer;