Prepare buffer for seeking
authorRobin Gareus <robin@gareus.org>
Wed, 6 Feb 2019 18:00:15 +0000 (19:00 +0100)
committerRobin Gareus <robin@gareus.org>
Wed, 6 Feb 2019 18:00:15 +0000 (19:00 +0100)
Keep track of safe reservation:
Data has been read (or was skipped) previously can be read again
up to the allocated "reservation" (which is never overwritten).

libs/pbd/pbd/playback_buffer.h

index d9f2232144a2a030d0d8f9d2980fcf6724af30f2..3ec7d57897cf6afbfd30e9781a301aae7fa9ca34 100644 (file)
@@ -34,7 +34,7 @@ class /*LIBPBD_API*/ PlaybackBuffer
 public:
        PlaybackBuffer (int32_t sz, guint res = 8191)
        : reservation (res)
-       , _writepos_lock ()
+       , _reservation_lock ()
        {
                sz += reservation;
 
@@ -46,24 +46,28 @@ public:
                buf = new T[size];
 
                read_idx = 0;
-               reset (0);
+               reset ();
        }
 
        virtual ~PlaybackBuffer () {
                delete [] buf;
        }
 
-       /* non-linear write needs to reset() the buffer and set the
-        * position that write() will commence at */
-       void reset (int64_t start = 0) {
+       /* init (mlock) */
+       T *buffer () { return buf; }
+       /* init (mlock) */
+       guint bufsize () const { return size; }
+
+       /* write-thread */
+       void reset () {
                /* writer, when seeking, may block */
                Glib::Threads::Mutex::Lock lm (_reset_lock);
-               SpinLock sl (_writepos_lock);
-               write_pos = start;
-
+               SpinLock sl (_reservation_lock);
                g_atomic_int_set (&write_idx, g_atomic_int_get (&read_idx));
+               g_atomic_int_set (&reserved, 0);
        }
 
+       /* write-thread */
        guint write_space () const {
                guint w, r;
 
@@ -92,6 +96,7 @@ public:
                return 0;
        }
 
+       /* read-thread */
        guint read_space () const {
                guint w, r;
 
@@ -105,37 +110,77 @@ public:
                }
        }
 
+       /* read-thead */
        guint read  (T *dest, guint cnt, bool commit = true);
+
+       /* write-thead */
        guint write (T const * src, guint cnt);
+       /* write-thead */
        guint write_zero (guint cnt);
 
-       T *buffer () { return buf; }
-       guint bufsize () const { return size; }
+       /* read-thead */
+       void read_flush ()
+       {
+               SpinLock sl (_reservation_lock);
+               g_atomic_int_set (&read_idx, g_atomic_int_get (&write_idx));
+               g_atomic_int_set (&reserved, 0);
+       }
+
+       /* read-thead */
+       guint decrement_read_ptr (guint cnt)
+       {
+               SpinLock sl (_reservation_lock);
+               guint r = g_atomic_int_get (&read_idx);
+               guint res = g_atomic_int_get (&reserved);
+
+               cnt = std::min (cnt, res);
 
-       guint get_write_idx () const { return g_atomic_int_get (&write_idx); }
-       guint get_read_idx () const { return g_atomic_int_get (&read_idx); }
+               r = (r + size - cnt) & size_mask;
+               res -= cnt;
 
-       void read_flush () { g_atomic_int_set (&read_idx, g_atomic_int_get (&write_idx)); }
+               g_atomic_int_set (&read_idx, r);
+               g_atomic_int_set (&reserved, res);
+
+               return cnt;
+       }
 
-       void increment_read_ptr (guint cnt) {
+       /* read-thead */
+       guint increment_read_ptr (guint cnt)
+       {
                cnt = std::min (cnt, read_space ());
+
+               SpinLock sl (_reservation_lock);
                g_atomic_int_set (&read_idx, (g_atomic_int_get (&read_idx) + cnt) & size_mask);
+               g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + cnt));
+
+               return cnt;
+       }
+
+       /* read-thead */
+       bool can_seek (int cnt) {
+               if (cnt > 0) {
+                       return read_space() >= cnt;
+               }
+               else if (cnt < 0) {
+                       return g_atomic_int_get (&reserved) >= cnt;
+               }
+               else {
+                       return true;
+               }
        }
 
-protected:
+private:
        T *buf;
        guint reservation;
        guint size;
        guint size_mask;
 
-       int64_t write_pos; // samplepos_t
-
-       mutable gint write_idx; // corresponds to (write_pos)
+       mutable gint write_idx;
        mutable gint read_idx;
+       mutable gint reserved;
 
-private:
-       /* spinlock will be used to update write_pos and write_idx in sync */
-       mutable spinlock_t   _writepos_lock;
+       /* spinlock will be used to update write_idx and reserved in sync */
+       spinlock_t _reservation_lock;
        /* reset_lock is used to prevent concurrent reading and reset (seek, transport reversal etc). */
        Glib::Threads::Mutex _reset_lock;
 };
@@ -170,11 +215,7 @@ PlaybackBuffer<T>::write (T const *src, guint cnt)
                w = n2;
        }
 
-       {
-               SpinLock sl (_writepos_lock);
-               write_pos += to_write;
-               g_atomic_int_set (&write_idx, w);
-       }
+       g_atomic_int_set (&write_idx, w);
        return to_write;
 }
 
@@ -208,11 +249,7 @@ PlaybackBuffer<T>::write_zero (guint cnt)
                w = n2;
        }
 
-       {
-               SpinLock sl (_writepos_lock);
-               write_pos += to_write;
-               g_atomic_int_set (&write_idx, w);
-       }
+       g_atomic_int_set (&write_idx, w);
        return to_write;
 }
 
@@ -251,10 +288,11 @@ PlaybackBuffer<T>::read (T *dest, guint cnt, bool commit)
        }
 
        if (commit) {
-               /* set read-pointer to position of last read's end */
+               SpinLock sl (_reservation_lock);
                g_atomic_int_set (&read_idx, r);
+               g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + to_read));
        }
-       return cnt;
+       return to_read;
 }
 
 } /* end namespace */