Fix deadlock when removing routes (initialization issues)
[ardour.git] / libs / ardour / graph.cc
1 /*
2  * Copyright (C) 2010 Paul Davis
3  * Copyright (C) 2017-2019 Robin Gareus <robin@gareus.org>
4  * incl. some work from Torben Hohn
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19  */
20
21 #include <cmath>
22 #include <stdio.h>
23
24 #include "pbd/compose.h"
25 #include "pbd/debug_rt_alloc.h"
26 #include "pbd/pthread_utils.h"
27
28 #include "ardour/audioengine.h"
29 #include "ardour/debug.h"
30 #include "ardour/graph.h"
31 #include "ardour/process_thread.h"
32 #include "ardour/route.h"
33 #include "ardour/session.h"
34 #include "ardour/types.h"
35
36 #include "pbd/i18n.h"
37
38 using namespace ARDOUR;
39 using namespace PBD;
40 using namespace std;
41
42 #ifdef DEBUG_RT_ALLOC
43 static Graph* graph = 0;
44
45 extern "C" {
46
47 int
48 alloc_allowed ()
49 {
50         return !graph->in_process_thread ();
51 }
52 }
53 #endif
54
55 #define g_atomic_uint_get(x) static_cast<guint> (g_atomic_int_get (x))
56
57 Graph::Graph (Session& session)
58         : SessionHandleRef (session)
59         , _execution_sem ("graph_execution", 0)
60         , _callback_start_sem ("graph_start", 0)
61         , _callback_done_sem ("graph_done", 0)
62         , _graph_empty (true)
63         , _current_chain (0)
64         , _pending_chain (0)
65         , _setup_chain (1)
66 {
67         g_atomic_int_set (&_terminal_refcnt, 0);
68         g_atomic_int_set (&_terminate, 0);
69         g_atomic_int_set (&_n_workers, 0);
70         g_atomic_int_set (&_idle_thread_cnt, 0);
71         g_atomic_int_set (&_trigger_queue_size, 0);
72
73         /* pre-allocate memory */
74         _trigger_queue.reserve (1024);
75
76         ARDOUR::AudioEngine::instance ()->Running.connect_same_thread (engine_connections, boost::bind (&Graph::reset_thread_list, this));
77         ARDOUR::AudioEngine::instance ()->Stopped.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
78         ARDOUR::AudioEngine::instance ()->Halted.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
79
80         reset_thread_list ();
81
82 #ifdef DEBUG_RT_ALLOC
83         graph             = this;
84         pbd_alloc_allowed = &::alloc_allowed;
85 #endif
86 }
87
88 void
89 Graph::engine_stopped ()
90 {
91 #ifndef NDEBUG
92         cerr << "Graph::engine_stopped. n_thread: " << AudioEngine::instance ()->process_thread_count () << endl;
93 #endif
94         if (AudioEngine::instance ()->process_thread_count () != 0) {
95                 drop_threads ();
96         }
97 }
98
99 /** Set up threads for running the graph */
100 void
101 Graph::reset_thread_list ()
102 {
103         uint32_t num_threads = how_many_dsp_threads ();
104         guint    n_workers   = g_atomic_uint_get (&_n_workers);
105
106         /* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
107         assert (num_threads > 1);
108         assert (AudioEngine::instance ()->process_thread_count () == n_workers);
109
110         /* don't bother doing anything here if we already have the right
111          * number of threads.
112          */
113
114         if (AudioEngine::instance ()->process_thread_count () == num_threads) {
115                 return;
116         }
117
118         Glib::Threads::Mutex::Lock lm (_session.engine ().process_lock ());
119
120         if (n_workers > 0) {
121                 drop_threads ();
122         }
123
124         /* Allow threads to run */
125         g_atomic_int_set (&_terminate, 0);
126
127         if (AudioEngine::instance ()->create_process_thread (boost::bind (&Graph::main_thread, this)) != 0) {
128                 throw failed_constructor ();
129         }
130
131         for (uint32_t i = 1; i < num_threads; ++i) {
132                 if (AudioEngine::instance ()->create_process_thread (boost::bind (&Graph::helper_thread, this))) {
133                         throw failed_constructor ();
134                 }
135         }
136
137         while (g_atomic_uint_get (&_n_workers) + 1 != num_threads) {
138                 sched_yield ();
139         }
140 }
141
142 void
143 Graph::session_going_away ()
144 {
145         drop_threads ();
146
147         // now drop all references on the nodes.
148         _nodes_rt[0].clear ();
149         _nodes_rt[1].clear ();
150         _init_trigger_list[0].clear ();
151         _init_trigger_list[1].clear ();
152         g_atomic_int_set (&_trigger_queue_size, 0);
153         _trigger_queue.clear ();
154 }
155
156 void
157 Graph::drop_threads ()
158 {
159         Glib::Threads::Mutex::Lock ls (_swap_mutex);
160
161         /* Flag threads to terminate */
162         g_atomic_int_set (&_terminate, 1);
163
164         /* Wake-up sleeping threads */
165         guint tc = g_atomic_uint_get (&_idle_thread_cnt);
166         assert (tc == g_atomic_uint_get (&_n_workers));
167         for (guint i = 0; i < tc; ++i) {
168                 _execution_sem.signal ();
169         }
170
171         /* and the main thread */
172         _callback_start_sem.signal ();
173
174         /* join process threads */
175         AudioEngine::instance ()->join_process_threads ();
176
177         g_atomic_int_set (&_n_workers, 0);
178         g_atomic_int_set (&_idle_thread_cnt, 0);
179
180         /* signal main process thread if it's waiting for an already terminated thread */
181         _callback_done_sem.signal ();
182
183         /* reset semaphores.
184          * This is somewhat ugly, yet if a thread is killed (e.g jackd terminates
185          * abnormally), some semaphores are still unlocked.
186          */
187 #ifndef NDEBUG
188         int d1 = _execution_sem.reset ();
189         int d2 = _callback_start_sem.reset ();
190         int d3 = _callback_done_sem.reset ();
191         cerr << "Graph::drop_threads() sema-counts: " << d1 << ", " << d2 << ", " << d3 << endl;
192 #else
193         _execution_sem.reset ();
194         _callback_start_sem.reset ();
195         _callback_done_sem.reset ();
196 #endif
197 }
198
199 /* special case route removal -- called from Session::remove_routes */
200 void
201 Graph::clear_other_chain ()
202 {
203         Glib::Threads::Mutex::Lock ls (_swap_mutex);
204
205         while (1) {
206                 if (_setup_chain != _pending_chain) {
207                         for (node_list_t::iterator ni = _nodes_rt[_setup_chain].begin (); ni != _nodes_rt[_setup_chain].end (); ++ni) {
208                                 (*ni)->_activation_set[_setup_chain].clear ();
209                         }
210
211                         _nodes_rt[_setup_chain].clear ();
212                         _init_trigger_list[_setup_chain].clear ();
213                         break;
214                 }
215                 /* setup chain == pending chain - we have
216                  * to wait till this is no longer true.
217                  */
218                 _cleanup_cond.wait (_swap_mutex);
219         }
220 }
221
222 void
223 Graph::prep ()
224 {
225         if (_swap_mutex.trylock ()) {
226                 /* swap mutex acquired */
227                 if (_current_chain != _pending_chain) {
228                         /* use new chain */
229                         _setup_chain   = _current_chain;
230                         _current_chain = _pending_chain;
231                         /* ensure that all nodes can be queued */
232                         _trigger_queue.reserve (_nodes_rt[_current_chain].size ());
233                         assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
234                         _cleanup_cond.signal ();
235                 }
236                 _swap_mutex.unlock ();
237         }
238
239         _graph_empty = true;
240
241         int chain = _current_chain;
242
243         node_list_t::iterator i;
244         for (i = _nodes_rt[chain].begin (); i != _nodes_rt[chain].end (); ++i) {
245                 (*i)->prep (chain);
246                 _graph_empty = false;
247         }
248
249         assert (_graph_empty != (_n_terminal_nodes[chain] > 0));
250
251         g_atomic_int_set (&_terminal_refcnt, _n_terminal_nodes[chain]);
252
253         /* Trigger the initial nodes for processing, which are the ones at the `input' end */
254         for (i = _init_trigger_list[chain].begin (); i != _init_trigger_list[chain].end (); i++) {
255                 g_atomic_int_inc (&_trigger_queue_size);
256                 _trigger_queue.push_back (i->get ());
257         }
258 }
259
260 void
261 Graph::trigger (GraphNode* n)
262 {
263         g_atomic_int_inc (&_trigger_queue_size);
264         _trigger_queue.push_back (n);
265 }
266
267 /** Called when a node at the `output' end of the chain (ie one that has no-one to feed)
268  *  is finished.
269  */
270 void
271 Graph::reached_terminal_node ()
272 {
273         if (g_atomic_int_dec_and_test (&_terminal_refcnt)) {
274         again:
275
276                 /* We have run all the nodes that are at the `output' end of
277                  * the graph, so there is nothing more to do this time around.
278                  */
279                 assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
280
281                 /* Notify caller */
282                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 cycle done.\n", pthread_name ()));
283
284                 _callback_done_sem.signal ();
285
286                 /* Ensure that all background threads are idle.
287                  * When freewheeling there may be an immediate restart:
288                  * If there are more threads than CPU cores, some worker-
289                  * threads may only be "on the way" to become idle.
290                  */
291                 guint n_workers = g_atomic_uint_get (&_n_workers);
292                 while (g_atomic_uint_get (&_idle_thread_cnt) != n_workers) {
293                         sched_yield ();
294                 }
295
296                 /* Block until the a process callback */
297                 _callback_start_sem.wait ();
298
299                 if (g_atomic_int_get (&_terminate)) {
300                         return;
301                 }
302
303                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 prepare new cycle.\n", pthread_name ()));
304
305                 /* Prepare next cycle:
306                  *  - Reset terminal reference count
307                  *  - queue initial nodes
308                  */
309                 prep ();
310
311                 if (_graph_empty && !g_atomic_int_get (&_terminate)) {
312                         goto again;
313                 }
314                 /* .. continue in worker-thread */
315         }
316 }
317
318 /** Rechain our stuff using a list of routes (which can be in any order) and
319  *  a directed graph of their interconnections, which is guaranteed to be
320  *  acyclic.
321  */
322 void
323 Graph::rechain (boost::shared_ptr<RouteList> routelist, GraphEdges const& edges)
324 {
325         Glib::Threads::Mutex::Lock ls (_swap_mutex);
326
327         int chain = _setup_chain;
328         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
329
330         /* This will become the number of nodes that do not feed any other node;
331          * once we have processed this number of those nodes, we have finished.
332          */
333         _n_terminal_nodes[chain] = 0;
334
335         /* This will become a list of nodes that are not fed by another node, ie
336          * those at the `input' end.
337          */
338         _init_trigger_list[chain].clear ();
339
340         _nodes_rt[chain].clear ();
341
342         /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
343         for (RouteList::iterator ri = routelist->begin (); ri != routelist->end (); ri++) {
344                 (*ri)->_init_refcount[chain] = 0;
345                 (*ri)->_activation_set[chain].clear ();
346                 _nodes_rt[chain].push_back (*ri);
347         }
348
349         // now add refs for the connections.
350
351         for (node_list_t::iterator ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
352                 boost::shared_ptr<Route> r = boost::dynamic_pointer_cast<Route> (*ni);
353
354                 /* The routes that are directly fed by r */
355                 set<GraphVertex> fed_from_r = edges.from (r);
356
357                 /* Hence whether r has an output */
358                 bool const has_output = !fed_from_r.empty ();
359
360                 /* Set up r's activation set */
361                 for (set<GraphVertex>::iterator i = fed_from_r.begin (); i != fed_from_r.end (); ++i) {
362                         r->_activation_set[chain].insert (*i);
363                 }
364
365                 /* r has an input if there are some incoming edges to r in the graph */
366                 bool const has_input = !edges.has_none_to (r);
367
368                 /* Increment the refcount of any route that we directly feed */
369                 for (node_set_t::iterator ai = r->_activation_set[chain].begin (); ai != r->_activation_set[chain].end (); ai++) {
370                         (*ai)->_init_refcount[chain] += 1;
371                 }
372
373                 if (!has_input) {
374                         /* no input, so this node needs to be triggered initially to get things going */
375                         _init_trigger_list[chain].push_back (*ni);
376                 }
377
378                 if (!has_output) {
379                         /* no output, so this is one of the nodes that we can count off to decide
380                          * if we've finished
381                          */
382                         _n_terminal_nodes[chain] += 1;
383                 }
384         }
385
386         _pending_chain = chain;
387         dump (chain);
388 }
389
390 /** Called by both the main thread and all helpers. */
391 void
392 Graph::run_one ()
393 {
394         GraphNode* to_run = NULL;
395
396         if (g_atomic_int_get (&_terminate)) {
397                 return;
398         }
399
400         if (_trigger_queue.pop_front (to_run)) {
401                 /* Wake up idle threads, but at most as many as there's
402                  * work in the trigger queue that can be processed by
403                  * other threads.
404                  * This thread as not yet decreased _trigger_queue_size.
405                  */
406                 guint idle_cnt   = g_atomic_uint_get (&_idle_thread_cnt);
407                 guint work_avail = g_atomic_uint_get (&_trigger_queue_size);
408                 guint wakeup     = std::min (idle_cnt + 1, work_avail);
409
410                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 signals %2 threads\n", pthread_name (), wakeup));
411                 for (guint i = 1; i < wakeup; ++i) {
412                         _execution_sem.signal ();
413                 }
414         }
415
416         while (!to_run) {
417                 /* Wait for work, fall asleep */
418                 g_atomic_int_inc (&_idle_thread_cnt);
419                 assert (g_atomic_uint_get (&_idle_thread_cnt) <= _n_workers);
420
421                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_name ()));
422                 _execution_sem.wait ();
423
424                 if (g_atomic_int_get (&_terminate)) {
425                         return;
426                 }
427
428                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_name ()));
429
430                 g_atomic_int_dec_and_test (&_idle_thread_cnt);
431
432                 /* Try to find some work to do */
433                 _trigger_queue.pop_front (to_run);
434         }
435
436         /* Process the graph-node */
437         g_atomic_int_dec_and_test (&_trigger_queue_size);
438         to_run->run (_current_chain);
439
440         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_name ()));
441 }
442
443 void
444 Graph::helper_thread ()
445 {
446         g_atomic_int_inc (&_n_workers);
447         guint id = g_atomic_uint_get (&_n_workers);
448
449         /* This is needed for ARDOUR::Session requests called from rt-processors
450          * in particular Lua scripts may do cross-thread calls */
451         if (!SessionEvent::has_per_thread_pool ()) {
452                 char name[64];
453                 snprintf (name, 64, "RT-%u-%p", id, (void*)DEBUG_THREAD_SELF);
454                 pthread_set_name (name);
455                 SessionEvent::create_per_thread_pool (name, 64);
456                 PBD::notify_event_loops_about_thread_creation (pthread_self (), name, 64);
457         }
458
459         suspend_rt_malloc_checks ();
460         ProcessThread* pt = new ProcessThread ();
461         resume_rt_malloc_checks ();
462
463         pt->get_buffers ();
464
465         while (!g_atomic_int_get (&_terminate)) {
466                 run_one ();
467         }
468
469         pt->drop_buffers ();
470         delete pt;
471 }
472
473 /** Here's the main graph thread */
474 void
475 Graph::main_thread ()
476 {
477         /* first time setup */
478
479         suspend_rt_malloc_checks ();
480         ProcessThread* pt = new ProcessThread ();
481
482         /* This is needed for ARDOUR::Session requests called from rt-processors
483          * in particular Lua scripts may do cross-thread calls */
484         if (!SessionEvent::has_per_thread_pool ()) {
485                 char name[64];
486                 snprintf (name, 64, "RT-main-%p", (void*)DEBUG_THREAD_SELF);
487                 pthread_set_name (name);
488                 SessionEvent::create_per_thread_pool (name, 64);
489                 PBD::notify_event_loops_about_thread_creation (pthread_self (), name, 64);
490         }
491         resume_rt_malloc_checks ();
492
493         pt->get_buffers ();
494
495         /* Wait for initial process callback */
496 again:
497         _callback_start_sem.wait ();
498
499         DEBUG_TRACE (DEBUG::ProcessThreads, "main thread is awake\n");
500
501         if (g_atomic_int_get (&_terminate)) {
502                 pt->drop_buffers ();
503                 delete (pt);
504                 return;
505         }
506
507         /* Bootstrap the trigger-list
508          * (later this is done by Graph_reached_terminal_node) */
509         prep ();
510
511         if (_graph_empty && !g_atomic_int_get (&_terminate)) {
512                 _callback_done_sem.signal ();
513                 DEBUG_TRACE (DEBUG::ProcessThreads, "main thread sees graph done, goes back to sleep\n");
514                 goto again;
515         }
516
517         /* After setup, the main-thread just becomes a normal worker */
518         while (!g_atomic_int_get (&_terminate)) {
519                 run_one ();
520         }
521
522         pt->drop_buffers ();
523         delete (pt);
524 }
525
526 void
527 Graph::dump (int chain)
528 {
529 #ifndef NDEBUG
530         node_list_t::iterator ni;
531         node_set_t::iterator  ai;
532
533         chain = _pending_chain;
534
535         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
536         for (ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
537                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route> (*ni);
538                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name ().c_str (), (*ni)->_init_refcount[chain]));
539                 for (ai = (*ni)->_activation_set[chain].begin (); ai != (*ni)->_activation_set[chain].end (); ai++) {
540                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route> (*ai)->name ().c_str ()));
541                 }
542         }
543
544         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
545         for (ni = _init_trigger_list[chain].begin (); ni != _init_trigger_list[chain].end (); ni++) {
546                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route> (*ni)->name ().c_str (), (*ni)->_init_refcount[chain]));
547         }
548
549         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _n_terminal_nodes[chain]));
550 #endif
551 }
552
553 int
554 Graph::process_routes (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool& need_butler)
555 {
556         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
557
558         if (g_atomic_int_get (&_terminate)) {
559                 return 0;
560         }
561
562         _process_nframes      = nframes;
563         _process_start_sample = start_sample;
564         _process_end_sample   = end_sample;
565
566         _process_noroll      = false;
567         _process_retval      = 0;
568         _process_need_butler = false;
569
570         DEBUG_TRACE (DEBUG::ProcessThreads, "wake graph for non-silent process\n");
571         _callback_start_sem.signal ();
572         _callback_done_sem.wait ();
573         DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
574
575         need_butler = _process_need_butler;
576
577         return _process_retval;
578 }
579
580 int
581 Graph::routes_no_roll (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool non_rt_pending)
582 {
583         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
584
585         if (g_atomic_int_get (&_terminate)) {
586                 return 0;
587         }
588
589         _process_nframes        = nframes;
590         _process_start_sample   = start_sample;
591         _process_end_sample     = end_sample;
592         _process_non_rt_pending = non_rt_pending;
593
594         _process_noroll      = true;
595         _process_retval      = 0;
596         _process_need_butler = false;
597
598         DEBUG_TRACE (DEBUG::ProcessThreads, "wake graph for no-roll process\n");
599         _callback_start_sem.signal ();
600         _callback_done_sem.wait ();
601         DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
602
603         return _process_retval;
604 }
605 void
606 Graph::process_one_route (Route* route)
607 {
608         bool need_butler = false;
609         int  retval;
610
611         assert (route);
612
613         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_name (), route->name ()));
614
615         if (_process_noroll) {
616                 retval = route->no_roll (_process_nframes, _process_start_sample, _process_end_sample, _process_non_rt_pending);
617         } else {
618                 retval = route->roll (_process_nframes, _process_start_sample, _process_end_sample, need_butler);
619         }
620
621         if (retval) {
622                 _process_retval = retval;
623         }
624
625         if (need_butler) {
626                 _process_need_butler = true;
627         }
628 }
629
630 bool
631 Graph::in_process_thread () const
632 {
633         return AudioEngine::instance ()->in_process_thread ();
634 }