Add lock-free multiple producer multiple reader queue
[ardour.git] / libs / pbd / pbd / mpmc_queue.h
1 /*
2  * (C) 2017, 2019 Robin Gareus <robin@gareus.org>
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17  */
18
19 #ifndef _pbd_mpc_queue_h_
20 #define _pbd_mpc_queue_h_
21
22 #include <cassert>
23 #include <glib.h>
24 #include <stdint.h>
25
26 namespace PBD {
27
28 /** Lock free multiple producer, multiple consumer queue
29  *
30  * inspired by http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
31  * Kudos to Dmitry Vyukov
32  */
33 template <typename T>
34 class /*LIBPBD_API*/ MPMCQueue
35 {
36 public:
37         MPMCQueue (size_t buffer_size = 8)
38                 : _buffer (0)
39                 , _buffer_mask (0)
40         {
41                 reserve (buffer_size);
42         }
43
44         ~MPMCQueue ()
45         {
46                 delete[] _buffer;
47         }
48
49         static size_t
50         power_of_two_size (size_t sz)
51         {
52                 int32_t power_of_two;
53                 for (power_of_two = 1; 1U << power_of_two < sz; ++power_of_two) ;
54                 return 1U << power_of_two;
55         }
56
57         void
58         reserve (size_t buffer_size)
59         {
60                 buffer_size = power_of_two_size (buffer_size);
61                 assert ((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0));
62                 if (_buffer_mask >= buffer_size - 1) {
63                         return;
64                 }
65                 delete[] _buffer;
66                 _buffer      = new cell_t[buffer_size];
67                 _buffer_mask = buffer_size - 1;
68                 clear ();
69         }
70
71         void
72         clear ()
73         {
74                 for (size_t i = 0; i <= _buffer_mask; ++i) {
75                         g_atomic_int_set (&_buffer[i]._sequence, i);
76                 }
77                 g_atomic_int_set (&_enqueue_pos, 0);
78                 g_atomic_int_set (&_dequeue_pos, 0);
79         }
80
81         bool
82         push_back (T const& data)
83         {
84                 cell_t* cell;
85                 guint   pos = g_atomic_int_get (&_enqueue_pos);
86                 for (;;) {
87                         cell         = &_buffer[pos & _buffer_mask];
88                         guint    seq = g_atomic_int_get (&cell->_sequence);
89                         intptr_t dif = (intptr_t)seq - (intptr_t)pos;
90                         if (dif == 0) {
91                                 if (g_atomic_int_compare_and_exchange (&_enqueue_pos, pos, pos + 1)) {
92                                         break;
93                                 }
94                         } else if (dif < 0) {
95                                 assert (0);
96                                 return false;
97                         } else {
98                                 pos = g_atomic_int_get (&_enqueue_pos);
99                         }
100                 }
101
102                 cell->_data = data;
103                 g_atomic_int_set (&cell->_sequence, pos + 1);
104                 return true;
105         }
106
107         bool
108         pop_front (T& data)
109         {
110                 cell_t* cell;
111                 guint   pos = g_atomic_int_get (&_dequeue_pos);
112                 for (;;) {
113                         cell         = &_buffer[pos & _buffer_mask];
114                         guint    seq = g_atomic_int_get (&cell->_sequence);
115                         intptr_t dif = (intptr_t)seq - (intptr_t) (pos + 1);
116                         if (dif == 0) {
117                                 if (g_atomic_int_compare_and_exchange (&_dequeue_pos, pos, pos + 1)) {
118                                         break;
119                                 }
120                         } else if (dif < 0) {
121                                 return false;
122                         } else {
123                                 pos = g_atomic_int_get (&_dequeue_pos);
124                         }
125                 }
126
127                 data = cell->_data;
128                 g_atomic_int_set (&cell->_sequence, pos + _buffer_mask + 1);
129                 return true;
130         }
131
132 private:
133         struct cell_t {
134                 volatile guint _sequence;
135                 T              _data;
136         };
137
138         cell_t* _buffer;
139         size_t  _buffer_mask;
140
141         volatile guint _enqueue_pos;
142         volatile guint _dequeue_pos;
143 };
144
145 } /* end namespace */
146
147 #endif