2 * Copyright (C) 2010 Paul Davis
3 * Copyright (C) 2017-2019 Robin Gareus <robin@gareus.org>
4 * incl. some work from Torben Hohn
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
24 #include "pbd/compose.h"
25 #include "pbd/debug_rt_alloc.h"
26 #include "pbd/pthread_utils.h"
28 #include "ardour/audioengine.h"
29 #include "ardour/debug.h"
30 #include "ardour/graph.h"
31 #include "ardour/process_thread.h"
32 #include "ardour/route.h"
33 #include "ardour/session.h"
34 #include "ardour/types.h"
38 using namespace ARDOUR;
43 static Graph* graph = 0;
50 return !graph->in_process_thread ();
55 #define g_atomic_uint_get(x) static_cast<guint> (g_atomic_int_get (x))
57 Graph::Graph (Session& session)
58 : SessionHandleRef (session)
59 , _execution_sem ("graph_execution", 0)
60 , _callback_start_sem ("graph_start", 0)
61 , _callback_done_sem ("graph_done", 0)
67 g_atomic_int_set (&_terminal_refcnt, 0);
68 g_atomic_int_set (&_terminate, 0);
69 g_atomic_int_set (&_n_workers, 0);
70 g_atomic_int_set (&_idle_thread_cnt, 0);
71 g_atomic_int_set (&_trigger_queue_size, 0);
73 /* pre-allocate memory */
74 _trigger_queue.reserve (1024);
76 ARDOUR::AudioEngine::instance ()->Running.connect_same_thread (engine_connections, boost::bind (&Graph::reset_thread_list, this));
77 ARDOUR::AudioEngine::instance ()->Stopped.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
78 ARDOUR::AudioEngine::instance ()->Halted.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
84 pbd_alloc_allowed = &::alloc_allowed;
89 Graph::engine_stopped ()
92 cerr << "Graph::engine_stopped. n_thread: " << AudioEngine::instance ()->process_thread_count () << endl;
94 if (AudioEngine::instance ()->process_thread_count () != 0) {
99 /** Set up threads for running the graph */
101 Graph::reset_thread_list ()
103 uint32_t num_threads = how_many_dsp_threads ();
104 guint n_workers = g_atomic_uint_get (&_n_workers);
106 /* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
107 assert (num_threads > 1);
108 assert (AudioEngine::instance ()->process_thread_count () == n_workers);
110 /* don't bother doing anything here if we already have the right
114 if (AudioEngine::instance ()->process_thread_count () == num_threads) {
118 Glib::Threads::Mutex::Lock lm (_session.engine ().process_lock ());
124 /* Allow threads to run */
125 g_atomic_int_set (&_terminate, 0);
127 if (AudioEngine::instance ()->create_process_thread (boost::bind (&Graph::main_thread, this)) != 0) {
128 throw failed_constructor ();
131 for (uint32_t i = 1; i < num_threads; ++i) {
132 if (AudioEngine::instance ()->create_process_thread (boost::bind (&Graph::helper_thread, this))) {
133 throw failed_constructor ();
137 while (g_atomic_uint_get (&_n_workers) + 1 != num_threads) {
143 Graph::session_going_away ()
147 // now drop all references on the nodes.
148 _nodes_rt[0].clear ();
149 _nodes_rt[1].clear ();
150 _init_trigger_list[0].clear ();
151 _init_trigger_list[1].clear ();
152 g_atomic_int_set (&_trigger_queue_size, 0);
153 _trigger_queue.clear ();
157 Graph::drop_threads ()
159 Glib::Threads::Mutex::Lock ls (_swap_mutex);
161 /* Flag threads to terminate */
162 g_atomic_int_set (&_terminate, 1);
164 /* Wake-up sleeping threads */
165 guint tc = g_atomic_uint_get (&_idle_thread_cnt);
166 assert (tc == g_atomic_uint_get (&_n_workers));
167 for (guint i = 0; i < tc; ++i) {
168 _execution_sem.signal ();
171 /* and the main thread */
172 _callback_start_sem.signal ();
174 /* join process threads */
175 AudioEngine::instance ()->join_process_threads ();
177 g_atomic_int_set (&_n_workers, 0);
178 g_atomic_int_set (&_idle_thread_cnt, 0);
180 /* signal main process thread if it's waiting for an already terminated thread */
181 _callback_done_sem.signal ();
184 * This is somewhat ugly, yet if a thread is killed (e.g jackd terminates
185 * abnormally), some semaphores are still unlocked.
188 int d1 = _execution_sem.reset ();
189 int d2 = _callback_start_sem.reset ();
190 int d3 = _callback_done_sem.reset ();
191 cerr << "Graph::drop_threads() sema-counts: " << d1 << ", " << d2 << ", " << d3 << endl;
193 _execution_sem.reset ();
194 _callback_start_sem.reset ();
195 _callback_done_sem.reset ();
199 /* special case route removal -- called from Session::remove_routes */
201 Graph::clear_other_chain ()
203 Glib::Threads::Mutex::Lock ls (_swap_mutex);
206 if (_setup_chain != _pending_chain) {
207 for (node_list_t::iterator ni = _nodes_rt[_setup_chain].begin (); ni != _nodes_rt[_setup_chain].end (); ++ni) {
208 (*ni)->_activation_set[_setup_chain].clear ();
211 _nodes_rt[_setup_chain].clear ();
212 _init_trigger_list[_setup_chain].clear ();
215 /* setup chain == pending chain - we have
216 * to wait till this is no longer true.
218 _cleanup_cond.wait (_swap_mutex);
225 if (_swap_mutex.trylock ()) {
226 /* swap mutex acquired */
227 if (_current_chain != _pending_chain) {
229 _setup_chain = _current_chain;
230 _current_chain = _pending_chain;
231 /* ensure that all nodes can be queued */
232 _trigger_queue.reserve (_nodes_rt[_current_chain].size ());
233 assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
234 _cleanup_cond.signal ();
236 _swap_mutex.unlock ();
241 int chain = _current_chain;
243 node_list_t::iterator i;
244 for (i = _nodes_rt[chain].begin (); i != _nodes_rt[chain].end (); ++i) {
246 _graph_empty = false;
249 assert (_graph_empty != (_n_terminal_nodes[chain] > 0));
251 g_atomic_int_set (&_terminal_refcnt, _n_terminal_nodes[chain]);
253 /* Trigger the initial nodes for processing, which are the ones at the `input' end */
254 for (i = _init_trigger_list[chain].begin (); i != _init_trigger_list[chain].end (); i++) {
255 g_atomic_int_inc (&_trigger_queue_size);
256 _trigger_queue.push_back (i->get ());
261 Graph::trigger (GraphNode* n)
263 g_atomic_int_inc (&_trigger_queue_size);
264 _trigger_queue.push_back (n);
267 /** Called when a node at the `output' end of the chain (ie one that has no-one to feed)
271 Graph::reached_terminal_node ()
273 if (g_atomic_int_dec_and_test (&_terminal_refcnt)) {
276 /* We have run all the nodes that are at the `output' end of
277 * the graph, so there is nothing more to do this time around.
279 assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
282 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 cycle done.\n", pthread_name ()));
284 _callback_done_sem.signal ();
286 /* Ensure that all background threads are idle.
287 * When freewheeling there may be an immediate restart:
288 * If there are more threads than CPU cores, some worker-
289 * threads may only be "on the way" to become idle.
291 guint n_workers = g_atomic_uint_get (&_n_workers);
292 while (g_atomic_uint_get (&_idle_thread_cnt) != n_workers) {
296 /* Block until the a process callback */
297 _callback_start_sem.wait ();
299 if (g_atomic_int_get (&_terminate)) {
303 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 prepare new cycle.\n", pthread_name ()));
305 /* Prepare next cycle:
306 * - Reset terminal reference count
307 * - queue initial nodes
311 if (_graph_empty && !g_atomic_int_get (&_terminate)) {
314 /* .. continue in worker-thread */
318 /** Rechain our stuff using a list of routes (which can be in any order) and
319 * a directed graph of their interconnections, which is guaranteed to be
323 Graph::rechain (boost::shared_ptr<RouteList> routelist, GraphEdges const& edges)
325 Glib::Threads::Mutex::Lock ls (_swap_mutex);
327 int chain = _setup_chain;
328 DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
330 /* This will become the number of nodes that do not feed any other node;
331 * once we have processed this number of those nodes, we have finished.
333 _n_terminal_nodes[chain] = 0;
335 /* This will become a list of nodes that are not fed by another node, ie
336 * those at the `input' end.
338 _init_trigger_list[chain].clear ();
340 _nodes_rt[chain].clear ();
342 /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
343 for (RouteList::iterator ri = routelist->begin (); ri != routelist->end (); ri++) {
344 (*ri)->_init_refcount[chain] = 0;
345 (*ri)->_activation_set[chain].clear ();
346 _nodes_rt[chain].push_back (*ri);
349 // now add refs for the connections.
351 for (node_list_t::iterator ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
352 boost::shared_ptr<Route> r = boost::dynamic_pointer_cast<Route> (*ni);
354 /* The routes that are directly fed by r */
355 set<GraphVertex> fed_from_r = edges.from (r);
357 /* Hence whether r has an output */
358 bool const has_output = !fed_from_r.empty ();
360 /* Set up r's activation set */
361 for (set<GraphVertex>::iterator i = fed_from_r.begin (); i != fed_from_r.end (); ++i) {
362 r->_activation_set[chain].insert (*i);
365 /* r has an input if there are some incoming edges to r in the graph */
366 bool const has_input = !edges.has_none_to (r);
368 /* Increment the refcount of any route that we directly feed */
369 for (node_set_t::iterator ai = r->_activation_set[chain].begin (); ai != r->_activation_set[chain].end (); ai++) {
370 (*ai)->_init_refcount[chain] += 1;
374 /* no input, so this node needs to be triggered initially to get things going */
375 _init_trigger_list[chain].push_back (*ni);
379 /* no output, so this is one of the nodes that we can count off to decide
382 _n_terminal_nodes[chain] += 1;
386 _pending_chain = chain;
390 /** Called by both the main thread and all helpers. */
394 GraphNode* to_run = NULL;
396 if (g_atomic_int_get (&_terminate)) {
400 if (_trigger_queue.pop_front (to_run)) {
401 /* Wake up idle threads, but at most as many as there's
402 * work in the trigger queue that can be processed by
404 * This thread as not yet decreased _trigger_queue_size.
406 guint idle_cnt = g_atomic_uint_get (&_idle_thread_cnt);
407 guint work_avail = g_atomic_uint_get (&_trigger_queue_size);
408 guint wakeup = std::min (idle_cnt + 1, work_avail);
410 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 signals %2 threads\n", pthread_name (), wakeup));
411 for (guint i = 1; i < wakeup; ++i) {
412 _execution_sem.signal ();
417 /* Wait for work, fall asleep */
418 g_atomic_int_inc (&_idle_thread_cnt);
419 assert (g_atomic_uint_get (&_idle_thread_cnt) <= _n_workers);
421 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_name ()));
422 _execution_sem.wait ();
424 if (g_atomic_int_get (&_terminate)) {
428 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_name ()));
430 g_atomic_int_dec_and_test (&_idle_thread_cnt);
432 /* Try to find some work to do */
433 _trigger_queue.pop_front (to_run);
436 /* Process the graph-node */
437 g_atomic_int_dec_and_test (&_trigger_queue_size);
438 to_run->run (_current_chain);
440 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_name ()));
444 Graph::helper_thread ()
446 g_atomic_int_inc (&_n_workers);
447 guint id = g_atomic_uint_get (&_n_workers);
449 /* This is needed for ARDOUR::Session requests called from rt-processors
450 * in particular Lua scripts may do cross-thread calls */
451 if (!SessionEvent::has_per_thread_pool ()) {
453 snprintf (name, 64, "RT-%u-%p", id, (void*)DEBUG_THREAD_SELF);
454 pthread_set_name (name);
455 SessionEvent::create_per_thread_pool (name, 64);
456 PBD::notify_event_loops_about_thread_creation (pthread_self (), name, 64);
459 suspend_rt_malloc_checks ();
460 ProcessThread* pt = new ProcessThread ();
461 resume_rt_malloc_checks ();
465 while (!g_atomic_int_get (&_terminate)) {
473 /** Here's the main graph thread */
475 Graph::main_thread ()
477 /* first time setup */
479 suspend_rt_malloc_checks ();
480 ProcessThread* pt = new ProcessThread ();
482 /* This is needed for ARDOUR::Session requests called from rt-processors
483 * in particular Lua scripts may do cross-thread calls */
484 if (!SessionEvent::has_per_thread_pool ()) {
486 snprintf (name, 64, "RT-main-%p", (void*)DEBUG_THREAD_SELF);
487 pthread_set_name (name);
488 SessionEvent::create_per_thread_pool (name, 64);
489 PBD::notify_event_loops_about_thread_creation (pthread_self (), name, 64);
491 resume_rt_malloc_checks ();
495 /* Wait for initial process callback */
497 _callback_start_sem.wait ();
499 DEBUG_TRACE (DEBUG::ProcessThreads, "main thread is awake\n");
501 if (g_atomic_int_get (&_terminate)) {
507 /* Bootstrap the trigger-list
508 * (later this is done by Graph_reached_terminal_node) */
511 if (_graph_empty && !g_atomic_int_get (&_terminate)) {
512 _callback_done_sem.signal ();
513 DEBUG_TRACE (DEBUG::ProcessThreads, "main thread sees graph done, goes back to sleep\n");
517 /* After setup, the main-thread just becomes a normal worker */
518 while (!g_atomic_int_get (&_terminate)) {
527 Graph::dump (int chain)
530 node_list_t::iterator ni;
531 node_set_t::iterator ai;
533 chain = _pending_chain;
535 DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
536 for (ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
537 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route> (*ni);
538 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", rp->name ().c_str (), (*ni)->_init_refcount[chain]));
539 for (ai = (*ni)->_activation_set[chain].begin (); ai != (*ni)->_activation_set[chain].end (); ai++) {
540 DEBUG_TRACE (DEBUG::Graph, string_compose (" triggers: %1\n", boost::dynamic_pointer_cast<Route> (*ai)->name ().c_str ()));
544 DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
545 for (ni = _init_trigger_list[chain].begin (); ni != _init_trigger_list[chain].end (); ni++) {
546 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", boost::dynamic_pointer_cast<Route> (*ni)->name ().c_str (), (*ni)->_init_refcount[chain]));
549 DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _n_terminal_nodes[chain]));
554 Graph::process_routes (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool& need_butler)
556 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
558 if (g_atomic_int_get (&_terminate)) {
562 _process_nframes = nframes;
563 _process_start_sample = start_sample;
564 _process_end_sample = end_sample;
566 _process_noroll = false;
568 _process_need_butler = false;
570 DEBUG_TRACE (DEBUG::ProcessThreads, "wake graph for non-silent process\n");
571 _callback_start_sem.signal ();
572 _callback_done_sem.wait ();
573 DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
575 need_butler = _process_need_butler;
577 return _process_retval;
581 Graph::routes_no_roll (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool non_rt_pending)
583 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
585 if (g_atomic_int_get (&_terminate)) {
589 _process_nframes = nframes;
590 _process_start_sample = start_sample;
591 _process_end_sample = end_sample;
592 _process_non_rt_pending = non_rt_pending;
594 _process_noroll = true;
596 _process_need_butler = false;
598 DEBUG_TRACE (DEBUG::ProcessThreads, "wake graph for no-roll process\n");
599 _callback_start_sem.signal ();
600 _callback_done_sem.wait ();
601 DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
603 return _process_retval;
606 Graph::process_one_route (Route* route)
608 bool need_butler = false;
613 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_name (), route->name ()));
615 if (_process_noroll) {
616 retval = route->no_roll (_process_nframes, _process_start_sample, _process_end_sample, _process_non_rt_pending);
618 retval = route->roll (_process_nframes, _process_start_sample, _process_end_sample, need_butler);
622 _process_retval = retval;
626 _process_need_butler = true;
631 Graph::in_process_thread () const
633 return AudioEngine::instance ()->in_process_thread ();