public:
PlaybackBuffer (int32_t sz, guint res = 8191)
: reservation (res)
- , _writepos_lock ()
+ , _reservation_lock ()
{
sz += reservation;
buf = new T[size];
read_idx = 0;
- reset (0);
+ reset ();
}
virtual ~PlaybackBuffer () {
delete [] buf;
}
- /* non-linear write needs to reset() the buffer and set the
- * position that write() will commence at */
- void reset (int64_t start = 0) {
+ /* init (mlock) */
+ T *buffer () { return buf; }
+ /* init (mlock) */
+ guint bufsize () const { return size; }
+
+ /* write-thread */
+ void reset () {
/* writer, when seeking, may block */
Glib::Threads::Mutex::Lock lm (_reset_lock);
- SpinLock sl (_writepos_lock);
- write_pos = start;
-
+ SpinLock sl (_reservation_lock);
g_atomic_int_set (&write_idx, g_atomic_int_get (&read_idx));
+ g_atomic_int_set (&reserved, 0);
}
+ /* write-thread */
guint write_space () const {
guint w, r;
return 0;
}
+ /* read-thread */
guint read_space () const {
guint w, r;
}
}
+ /* read-thead */
guint read (T *dest, guint cnt, bool commit = true);
+
+ /* write-thead */
guint write (T const * src, guint cnt);
+ /* write-thead */
guint write_zero (guint cnt);
- T *buffer () { return buf; }
- guint bufsize () const { return size; }
+ /* read-thead */
+ void read_flush ()
+ {
+ SpinLock sl (_reservation_lock);
+ g_atomic_int_set (&read_idx, g_atomic_int_get (&write_idx));
+ g_atomic_int_set (&reserved, 0);
+ }
+
+ /* read-thead */
+ guint decrement_read_ptr (guint cnt)
+ {
+ SpinLock sl (_reservation_lock);
+ guint r = g_atomic_int_get (&read_idx);
+ guint res = g_atomic_int_get (&reserved);
+
+ cnt = std::min (cnt, res);
- guint get_write_idx () const { return g_atomic_int_get (&write_idx); }
- guint get_read_idx () const { return g_atomic_int_get (&read_idx); }
+ r = (r + size - cnt) & size_mask;
+ res -= cnt;
- void read_flush () { g_atomic_int_set (&read_idx, g_atomic_int_get (&write_idx)); }
+ g_atomic_int_set (&read_idx, r);
+ g_atomic_int_set (&reserved, res);
+
+ return cnt;
+ }
- void increment_read_ptr (guint cnt) {
+ /* read-thead */
+ guint increment_read_ptr (guint cnt)
+ {
cnt = std::min (cnt, read_space ());
+
+ SpinLock sl (_reservation_lock);
g_atomic_int_set (&read_idx, (g_atomic_int_get (&read_idx) + cnt) & size_mask);
+ g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + cnt));
+
+ return cnt;
+ }
+
+ /* read-thead */
+ bool can_seek (int cnt) {
+ if (cnt > 0) {
+ return read_space() >= cnt;
+ }
+ else if (cnt < 0) {
+ return g_atomic_int_get (&reserved) >= cnt;
+ }
+ else {
+ return true;
+ }
}
-protected:
+private:
T *buf;
guint reservation;
guint size;
guint size_mask;
- int64_t write_pos; // samplepos_t
-
- mutable gint write_idx; // corresponds to (write_pos)
+ mutable gint write_idx;
mutable gint read_idx;
+ mutable gint reserved;
-private:
- /* spinlock will be used to update write_pos and write_idx in sync */
- mutable spinlock_t _writepos_lock;
+ /* spinlock will be used to update write_idx and reserved in sync */
+ spinlock_t _reservation_lock;
/* reset_lock is used to prevent concurrent reading and reset (seek, transport reversal etc). */
Glib::Threads::Mutex _reset_lock;
};
w = n2;
}
- {
- SpinLock sl (_writepos_lock);
- write_pos += to_write;
- g_atomic_int_set (&write_idx, w);
- }
+ g_atomic_int_set (&write_idx, w);
return to_write;
}
w = n2;
}
- {
- SpinLock sl (_writepos_lock);
- write_pos += to_write;
- g_atomic_int_set (&write_idx, w);
- }
+ g_atomic_int_set (&write_idx, w);
return to_write;
}
}
if (commit) {
- /* set read-pointer to position of last read's end */
+ SpinLock sl (_reservation_lock);
g_atomic_int_set (&read_idx, r);
+ g_atomic_int_set (&reserved, std::min (reservation, g_atomic_int_get (&reserved) + to_read));
}
- return cnt;
+ return to_read;
}
} /* end namespace */