2 * Copyright (C) 2010-2012 Carl Hetherington <carl@carlh.net>
3 * Copyright (C) 2010-2017 Paul Davis <paul@linuxaudiosystems.com>
4 * Copyright (C) 2013 John Emmas <john@creativepost.co.uk>
5 * Copyright (C) 2013 Tim Mayberry <mojofunk@gmail.com>
6 * Copyright (C) 2015-2019 Robin Gareus <robin@gareus.org>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
26 #include "pbd/compose.h"
27 #include "pbd/debug_rt_alloc.h"
28 #include "pbd/pthread_utils.h"
30 #include "ardour/audioengine.h"
31 #include "ardour/debug.h"
32 #include "ardour/graph.h"
33 #include "ardour/process_thread.h"
34 #include "ardour/route.h"
35 #include "ardour/session.h"
36 #include "ardour/types.h"
40 using namespace ARDOUR;
45 static Graph* graph = 0;
52 return !graph->in_process_thread ();
57 #define g_atomic_uint_get(x) static_cast<guint> (g_atomic_int_get (x))
59 Graph::Graph (Session& session)
60 : SessionHandleRef (session)
61 , _execution_sem ("graph_execution", 0)
62 , _callback_start_sem ("graph_start", 0)
63 , _callback_done_sem ("graph_done", 0)
69 g_atomic_int_set (&_terminal_refcnt, 0);
70 g_atomic_int_set (&_terminate, 0);
71 g_atomic_int_set (&_n_workers, 0);
72 g_atomic_int_set (&_idle_thread_cnt, 0);
73 g_atomic_int_set (&_trigger_queue_size, 0);
75 _n_terminal_nodes[0] = 0;
76 _n_terminal_nodes[1] = 0;
78 /* pre-allocate memory */
79 _trigger_queue.reserve (1024);
81 ARDOUR::AudioEngine::instance ()->Running.connect_same_thread (engine_connections, boost::bind (&Graph::reset_thread_list, this));
82 ARDOUR::AudioEngine::instance ()->Stopped.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
83 ARDOUR::AudioEngine::instance ()->Halted.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
89 pbd_alloc_allowed = &::alloc_allowed;
94 Graph::engine_stopped ()
97 cerr << "Graph::engine_stopped. n_thread: " << AudioEngine::instance ()->process_thread_count () << endl;
99 if (AudioEngine::instance ()->process_thread_count () != 0) {
104 /** Set up threads for running the graph */
106 Graph::reset_thread_list ()
108 uint32_t num_threads = how_many_dsp_threads ();
109 guint n_workers = g_atomic_uint_get (&_n_workers);
111 /* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
112 assert (num_threads > 1);
114 /* don't bother doing anything here if we already have the right
118 if (AudioEngine::instance ()->process_thread_count () == num_threads) {
122 Glib::Threads::Mutex::Lock lm (_session.engine ().process_lock ());
128 /* Allow threads to run */
129 g_atomic_int_set (&_terminate, 0);
131 if (AudioEngine::instance ()->create_process_thread (boost::bind (&Graph::main_thread, this)) != 0) {
132 throw failed_constructor ();
135 for (uint32_t i = 1; i < num_threads; ++i) {
136 if (AudioEngine::instance ()->create_process_thread (boost::bind (&Graph::helper_thread, this))) {
137 throw failed_constructor ();
141 while (g_atomic_uint_get (&_n_workers) + 1 != num_threads) {
147 Graph::session_going_away ()
151 // now drop all references on the nodes.
152 _nodes_rt[0].clear ();
153 _nodes_rt[1].clear ();
154 _init_trigger_list[0].clear ();
155 _init_trigger_list[1].clear ();
156 g_atomic_int_set (&_trigger_queue_size, 0);
157 _trigger_queue.clear ();
161 Graph::drop_threads ()
163 Glib::Threads::Mutex::Lock ls (_swap_mutex);
165 /* Flag threads to terminate */
166 g_atomic_int_set (&_terminate, 1);
168 /* Wake-up sleeping threads */
169 guint tc = g_atomic_uint_get (&_idle_thread_cnt);
170 assert (tc == g_atomic_uint_get (&_n_workers));
171 for (guint i = 0; i < tc; ++i) {
172 _execution_sem.signal ();
175 /* and the main thread */
176 _callback_start_sem.signal ();
178 /* join process threads */
179 AudioEngine::instance ()->join_process_threads ();
181 g_atomic_int_set (&_n_workers, 0);
182 g_atomic_int_set (&_idle_thread_cnt, 0);
184 /* signal main process thread if it's waiting for an already terminated thread */
185 _callback_done_sem.signal ();
188 * This is somewhat ugly, yet if a thread is killed (e.g jackd terminates
189 * abnormally), some semaphores are still unlocked.
192 int d1 = _execution_sem.reset ();
193 int d2 = _callback_start_sem.reset ();
194 int d3 = _callback_done_sem.reset ();
195 cerr << "Graph::drop_threads() sema-counts: " << d1 << ", " << d2 << ", " << d3 << endl;
197 _execution_sem.reset ();
198 _callback_start_sem.reset ();
199 _callback_done_sem.reset ();
203 /* special case route removal -- called from Session::remove_routes */
205 Graph::clear_other_chain ()
207 Glib::Threads::Mutex::Lock ls (_swap_mutex);
210 if (_setup_chain != _pending_chain) {
211 for (node_list_t::iterator ni = _nodes_rt[_setup_chain].begin (); ni != _nodes_rt[_setup_chain].end (); ++ni) {
212 (*ni)->_activation_set[_setup_chain].clear ();
215 _nodes_rt[_setup_chain].clear ();
216 _init_trigger_list[_setup_chain].clear ();
219 /* setup chain == pending chain - we have
220 * to wait till this is no longer true.
222 _cleanup_cond.wait (_swap_mutex);
229 if (_swap_mutex.trylock ()) {
230 /* swap mutex acquired */
231 if (_current_chain != _pending_chain) {
233 _setup_chain = _current_chain;
234 _current_chain = _pending_chain;
235 /* ensure that all nodes can be queued */
236 _trigger_queue.reserve (_nodes_rt[_current_chain].size ());
237 assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
238 _cleanup_cond.signal ();
240 _swap_mutex.unlock ();
245 int chain = _current_chain;
247 node_list_t::iterator i;
248 for (i = _nodes_rt[chain].begin (); i != _nodes_rt[chain].end (); ++i) {
250 _graph_empty = false;
253 assert (_graph_empty != (_n_terminal_nodes[chain] > 0));
255 g_atomic_int_set (&_terminal_refcnt, _n_terminal_nodes[chain]);
257 /* Trigger the initial nodes for processing, which are the ones at the `input' end */
258 for (i = _init_trigger_list[chain].begin (); i != _init_trigger_list[chain].end (); i++) {
259 g_atomic_int_inc (&_trigger_queue_size);
260 _trigger_queue.push_back (i->get ());
265 Graph::trigger (GraphNode* n)
267 g_atomic_int_inc (&_trigger_queue_size);
268 _trigger_queue.push_back (n);
271 /** Called when a node at the `output' end of the chain (ie one that has no-one to feed)
275 Graph::reached_terminal_node ()
277 if (g_atomic_int_dec_and_test (&_terminal_refcnt)) {
280 /* We have run all the nodes that are at the `output' end of
281 * the graph, so there is nothing more to do this time around.
283 assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
286 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 cycle done.\n", pthread_name ()));
288 _callback_done_sem.signal ();
290 /* Ensure that all background threads are idle.
291 * When freewheeling there may be an immediate restart:
292 * If there are more threads than CPU cores, some worker-
293 * threads may only be "on the way" to become idle.
295 guint n_workers = g_atomic_uint_get (&_n_workers);
296 while (g_atomic_uint_get (&_idle_thread_cnt) != n_workers) {
300 /* Block until the a process callback */
301 _callback_start_sem.wait ();
303 if (g_atomic_int_get (&_terminate)) {
307 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 prepare new cycle.\n", pthread_name ()));
309 /* Prepare next cycle:
310 * - Reset terminal reference count
311 * - queue initial nodes
315 if (_graph_empty && !g_atomic_int_get (&_terminate)) {
318 /* .. continue in worker-thread */
322 /** Rechain our stuff using a list of routes (which can be in any order) and
323 * a directed graph of their interconnections, which is guaranteed to be
327 Graph::rechain (boost::shared_ptr<RouteList> routelist, GraphEdges const& edges)
329 Glib::Threads::Mutex::Lock ls (_swap_mutex);
331 int chain = _setup_chain;
332 DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
334 /* This will become the number of nodes that do not feed any other node;
335 * once we have processed this number of those nodes, we have finished.
337 _n_terminal_nodes[chain] = 0;
339 /* This will become a list of nodes that are not fed by another node, ie
340 * those at the `input' end.
342 _init_trigger_list[chain].clear ();
344 _nodes_rt[chain].clear ();
346 /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
347 for (RouteList::iterator ri = routelist->begin (); ri != routelist->end (); ri++) {
348 (*ri)->_init_refcount[chain] = 0;
349 (*ri)->_activation_set[chain].clear ();
350 _nodes_rt[chain].push_back (*ri);
353 // now add refs for the connections.
355 for (node_list_t::iterator ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
356 boost::shared_ptr<Route> r = boost::dynamic_pointer_cast<Route> (*ni);
358 /* The routes that are directly fed by r */
359 set<GraphVertex> fed_from_r = edges.from (r);
361 /* Hence whether r has an output */
362 bool const has_output = !fed_from_r.empty ();
364 /* Set up r's activation set */
365 for (set<GraphVertex>::iterator i = fed_from_r.begin (); i != fed_from_r.end (); ++i) {
366 r->_activation_set[chain].insert (*i);
369 /* r has an input if there are some incoming edges to r in the graph */
370 bool const has_input = !edges.has_none_to (r);
372 /* Increment the refcount of any route that we directly feed */
373 for (node_set_t::iterator ai = r->_activation_set[chain].begin (); ai != r->_activation_set[chain].end (); ai++) {
374 (*ai)->_init_refcount[chain] += 1;
378 /* no input, so this node needs to be triggered initially to get things going */
379 _init_trigger_list[chain].push_back (*ni);
383 /* no output, so this is one of the nodes that we can count off to decide
386 _n_terminal_nodes[chain] += 1;
390 _pending_chain = chain;
394 /** Called by both the main thread and all helpers. */
398 GraphNode* to_run = NULL;
400 if (g_atomic_int_get (&_terminate)) {
404 if (_trigger_queue.pop_front (to_run)) {
405 /* Wake up idle threads, but at most as many as there's
406 * work in the trigger queue that can be processed by
408 * This thread as not yet decreased _trigger_queue_size.
410 guint idle_cnt = g_atomic_uint_get (&_idle_thread_cnt);
411 guint work_avail = g_atomic_uint_get (&_trigger_queue_size);
412 guint wakeup = std::min (idle_cnt + 1, work_avail);
414 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 signals %2 threads\n", pthread_name (), wakeup));
415 for (guint i = 1; i < wakeup; ++i) {
416 _execution_sem.signal ();
421 /* Wait for work, fall asleep */
422 g_atomic_int_inc (&_idle_thread_cnt);
423 assert (g_atomic_uint_get (&_idle_thread_cnt) <= _n_workers);
425 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_name ()));
426 _execution_sem.wait ();
428 if (g_atomic_int_get (&_terminate)) {
432 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_name ()));
434 g_atomic_int_dec_and_test (&_idle_thread_cnt);
436 /* Try to find some work to do */
437 _trigger_queue.pop_front (to_run);
440 /* Process the graph-node */
441 g_atomic_int_dec_and_test (&_trigger_queue_size);
442 to_run->run (_current_chain);
444 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_name ()));
448 Graph::helper_thread ()
450 g_atomic_int_inc (&_n_workers);
451 guint id = g_atomic_uint_get (&_n_workers);
453 /* This is needed for ARDOUR::Session requests called from rt-processors
454 * in particular Lua scripts may do cross-thread calls */
455 if (!SessionEvent::has_per_thread_pool ()) {
457 snprintf (name, 64, "RT-%u-%p", id, (void*)DEBUG_THREAD_SELF);
458 pthread_set_name (name);
459 SessionEvent::create_per_thread_pool (name, 64);
460 PBD::notify_event_loops_about_thread_creation (pthread_self (), name, 64);
463 suspend_rt_malloc_checks ();
464 ProcessThread* pt = new ProcessThread ();
465 resume_rt_malloc_checks ();
469 while (!g_atomic_int_get (&_terminate)) {
477 /** Here's the main graph thread */
479 Graph::main_thread ()
481 /* first time setup */
483 suspend_rt_malloc_checks ();
484 ProcessThread* pt = new ProcessThread ();
486 /* This is needed for ARDOUR::Session requests called from rt-processors
487 * in particular Lua scripts may do cross-thread calls */
488 if (!SessionEvent::has_per_thread_pool ()) {
490 snprintf (name, 64, "RT-main-%p", (void*)DEBUG_THREAD_SELF);
491 pthread_set_name (name);
492 SessionEvent::create_per_thread_pool (name, 64);
493 PBD::notify_event_loops_about_thread_creation (pthread_self (), name, 64);
495 resume_rt_malloc_checks ();
499 /* Wait for initial process callback */
501 _callback_start_sem.wait ();
503 DEBUG_TRACE (DEBUG::ProcessThreads, "main thread is awake\n");
505 if (g_atomic_int_get (&_terminate)) {
511 /* Bootstrap the trigger-list
512 * (later this is done by Graph_reached_terminal_node) */
515 if (_graph_empty && !g_atomic_int_get (&_terminate)) {
516 _callback_done_sem.signal ();
517 DEBUG_TRACE (DEBUG::ProcessThreads, "main thread sees graph done, goes back to sleep\n");
521 /* After setup, the main-thread just becomes a normal worker */
522 while (!g_atomic_int_get (&_terminate)) {
531 Graph::dump (int chain) const
534 node_list_t::const_iterator ni;
535 node_set_t::const_iterator ai;
537 chain = _pending_chain;
539 DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
540 for (ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
541 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route> (*ni);
542 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", rp->name ().c_str (), (*ni)->_init_refcount[chain]));
543 for (ai = (*ni)->_activation_set[chain].begin (); ai != (*ni)->_activation_set[chain].end (); ai++) {
544 DEBUG_TRACE (DEBUG::Graph, string_compose (" triggers: %1\n", boost::dynamic_pointer_cast<Route> (*ai)->name ().c_str ()));
548 DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
549 for (ni = _init_trigger_list[chain].begin (); ni != _init_trigger_list[chain].end (); ni++) {
550 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", boost::dynamic_pointer_cast<Route> (*ni)->name ().c_str (), (*ni)->_init_refcount[chain]));
553 DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _n_terminal_nodes[chain]));
558 Graph::plot (std::string const& file_name) const
560 Glib::Threads::Mutex::Lock ls (_swap_mutex);
561 int chain = _current_chain;
563 node_list_t::const_iterator ni;
564 node_set_t::const_iterator ai;
568 ss << " node [shape = ellipse];\n";
570 for (ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
571 boost::shared_ptr<Route> sr = boost::dynamic_pointer_cast<Route> (*ni);
572 std::string sn = string_compose ("%1 (%2)", sr->name (), (*ni)->_init_refcount[chain]);
573 if ((*ni)->_init_refcount[chain] == 0 && (*ni)->_activation_set[chain].size() == 0) {
574 ss << " \"" << sn << "\"[style=filled,fillcolor=gold1];\n";
575 } else if ((*ni)->_init_refcount[chain] == 0) {
576 ss << " \"" << sn << "\"[style=filled,fillcolor=lightskyblue1];\n";
577 } else if ((*ni)->_activation_set[chain].size() == 0) {
578 ss << " \"" << sn << "\"[style=filled,fillcolor=aquamarine2];\n";
580 for (ai = (*ni)->_activation_set[chain].begin (); ai != (*ni)->_activation_set[chain].end (); ai++) {
581 boost::shared_ptr<Route> dr = boost::dynamic_pointer_cast<Route> (*ai);
582 std::string dn = string_compose ("%1 (%2)", dr->name (), (*ai)->_init_refcount[chain]);
583 bool sends_only = false;
584 sr->direct_feeds_according_to_reality (dr, &sends_only);
586 ss << " edge [style=dashed];\n";
588 ss << " \"" << sn << "\" -> \"" << dn << "\"\n";
590 ss << " edge [style=solid];\n";
597 if (!g_file_set_contents (file_name.c_str(), ss.str().c_str(), -1, &err)) {
599 error << string_compose (_("Could not graph to file (%1)"), err->message) << endmsg;
608 Graph::process_routes (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool& need_butler)
610 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
612 if (g_atomic_int_get (&_terminate)) {
616 _process_nframes = nframes;
617 _process_start_sample = start_sample;
618 _process_end_sample = end_sample;
620 _process_noroll = false;
622 _process_need_butler = false;
624 DEBUG_TRACE (DEBUG::ProcessThreads, "wake graph for non-silent process\n");
625 _callback_start_sem.signal ();
626 _callback_done_sem.wait ();
627 DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
629 need_butler = _process_need_butler;
631 return _process_retval;
635 Graph::routes_no_roll (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool non_rt_pending)
637 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
639 if (g_atomic_int_get (&_terminate)) {
643 _process_nframes = nframes;
644 _process_start_sample = start_sample;
645 _process_end_sample = end_sample;
646 _process_non_rt_pending = non_rt_pending;
648 _process_noroll = true;
650 _process_need_butler = false;
652 DEBUG_TRACE (DEBUG::ProcessThreads, "wake graph for no-roll process\n");
653 _callback_start_sem.signal ();
654 _callback_done_sem.wait ();
655 DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
657 return _process_retval;
660 Graph::process_one_route (Route* route)
662 bool need_butler = false;
667 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_name (), route->name ()));
669 if (_process_noroll) {
670 retval = route->no_roll (_process_nframes, _process_start_sample, _process_end_sample, _process_non_rt_pending);
672 retval = route->roll (_process_nframes, _process_start_sample, _process_end_sample, need_butler);
676 _process_retval = retval;
680 _process_need_butler = true;
685 Graph::in_process_thread () const
687 return AudioEngine::instance ()->in_process_thread ();