add AudioBackendNativeThread to serve the same role as jack_native_thread_t
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20 #include <stdio.h>
21 #include <cmath>
22
23 #include "pbd/compose.h"
24 #include "pbd/debug_rt_alloc.h"
25
26 #include "ardour/debug.h"
27 #include "ardour/graph.h"
28 #include "ardour/types.h"
29 #include "ardour/session.h"
30 #include "ardour/route.h"
31 #include "ardour/process_thread.h"
32 #include "ardour/audioengine.h"
33
34 #include <jack/thread.h>
35
36 #include "i18n.h"
37
38 using namespace ARDOUR;
39 using namespace PBD;
40 using namespace std;
41
42 #ifdef DEBUG_RT_ALLOC
43 static Graph* graph = 0;
44
45 extern "C" {
46
47 int alloc_allowed ()
48 {
49         return !graph->in_process_thread ();
50 }
51
52 }
53 #endif
54
55 Graph::Graph (Session & session)
56         : SessionHandleRef (session)
57         , _quit_threads (false)
58         , _execution_sem ("graph_execution", 0)
59         , _callback_start_sem ("graph_start", 0)
60         , _callback_done_sem ("graph_done", 0)
61         , _cleanup_sem ("graph_cleanup", 0)
62 {
63         pthread_mutex_init( &_trigger_mutex, NULL);
64
65         /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
66            memory in the RT thread.
67         */
68         _trigger_queue.reserve (8192);
69
70         _execution_tokens = 0;
71
72         _current_chain = 0;
73         _pending_chain = 0;
74         _setup_chain   = 1;
75         _quit_threads = false;
76         _graph_empty = true;
77
78         reset_thread_list ();
79
80 #ifdef DEBUG_RT_ALLOC
81         graph = this;
82         pbd_alloc_allowed = &::alloc_allowed;
83 #endif
84 }
85
86 /** Set up threads for running the graph */
87 void
88 Graph::reset_thread_list ()
89 {
90         uint32_t num_threads = how_many_dsp_threads ();
91
92         /* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
93         assert (num_threads > 1);
94
95         /* don't bother doing anything here if we already have the right
96            number of threads.
97         */
98
99         if (_thread_list.size() == num_threads) {
100                 return;
101         }
102
103         Glib::Threads::Mutex::Lock lm (_session.engine().process_lock());
104         AudioBackendNativeThread a_thread;
105
106         if (!_thread_list.empty()) {
107                 drop_threads ();
108         }
109
110         if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this), &a_thread, 100000) != 0) {
111                 throw failed_constructor ();
112         }
113
114         _thread_list.push_back (a_thread);
115
116         for (uint32_t i = 1; i < num_threads; ++i) {
117                 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this), &a_thread, 100000) != 0) {
118                         throw failed_constructor ();
119                 }
120                 
121                 _thread_list.push_back (a_thread);
122         }
123 }
124
125 void
126 Graph::session_going_away()
127 {
128         drop_threads ();
129
130         // now drop all references on the nodes.
131         _nodes_rt[0].clear();
132         _nodes_rt[1].clear();
133         _init_trigger_list[0].clear();
134         _init_trigger_list[1].clear();
135         _trigger_queue.clear();
136 }
137
138 void
139 Graph::drop_threads ()
140 {
141         _quit_threads = true;
142
143         for (unsigned int i=0; i< _thread_list.size(); i++) {
144                 _execution_sem.signal ();
145         }
146
147         _callback_start_sem.signal ();
148
149         for (list<AudioBackendNativeThread>::iterator i = _thread_list.begin(); i != _thread_list.end(); ++i) {
150                 AudioEngine::instance()->wait_for_process_thread_exit (*i);
151         }
152
153         _thread_list.clear ();
154
155         _execution_tokens = 0;
156
157         _quit_threads = false;
158 }
159
160 void
161 Graph::clear_other_chain ()
162 {
163         Glib::Threads::Mutex::Lock ls (_swap_mutex);
164
165         while (1) {
166                 if (_setup_chain != _pending_chain) {
167
168                         for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
169                                 (*ni)->_activation_set[_setup_chain].clear();
170                         }
171
172                         _nodes_rt[_setup_chain].clear ();
173                         _init_trigger_list[_setup_chain].clear ();
174                         break;
175                 }
176                 /* setup chain == pending chain - we have
177                    to wait till this is no longer true.
178                 */
179                 _cleanup_cond.wait (_swap_mutex);
180         }
181 }
182
183 void
184 Graph::prep()
185 {
186         node_list_t::iterator i;
187         int chain;
188
189         if (_swap_mutex.trylock()) {
190                 // we got the swap mutex.
191                 if (_current_chain != _pending_chain)
192                 {
193                         // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
194                         _setup_chain = _current_chain;
195                         _current_chain = _pending_chain;
196                         _cleanup_cond.signal ();
197                 }
198                 _swap_mutex.unlock ();
199         }
200
201         chain = _current_chain;
202
203         _graph_empty = true;
204         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
205                 (*i)->prep( chain);
206                 _graph_empty = false;
207         }
208         _finished_refcount = _init_finished_refcount[chain];
209
210         /* Trigger the initial nodes for processing, which are the ones at the `input' end */
211         pthread_mutex_lock (&_trigger_mutex);
212         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
213                 /* don't use ::trigger here, as we have already locked the mutex */
214                 _trigger_queue.push_back (i->get ());
215         }
216         pthread_mutex_unlock (&_trigger_mutex);
217 }
218
219 void
220 Graph::trigger (GraphNode* n)
221 {
222         pthread_mutex_lock (&_trigger_mutex);
223         _trigger_queue.push_back (n);
224         pthread_mutex_unlock (&_trigger_mutex);
225 }
226
227 /** Called when a node at the `output' end of the chain (ie one that has no-one to feed)
228  *  is finished.
229  */
230 void
231 Graph::dec_ref()
232 {
233         if (g_atomic_int_dec_and_test (&_finished_refcount)) {
234
235                 /* We have run all the nodes that are at the `output' end of
236                    the graph, so there is nothing more to do this time around.
237                 */
238
239                 restart_cycle ();
240         }
241 }
242
243 void
244 Graph::restart_cycle()
245 {
246         // we are through. wakeup our caller.
247
248   again:
249         _callback_done_sem.signal ();
250
251         /* Block until the a process callback triggers us */
252         _callback_start_sem.wait();
253
254         if (_quit_threads) {
255                 return;
256         }
257
258         prep ();
259
260         if (_graph_empty) {
261                 goto again;
262         }
263
264         // returning will restart the cycle.
265         // starting with waking up the others.
266 }
267
268 /** Rechain our stuff using a list of routes (which can be in any order) and
269  *  a directed graph of their interconnections, which is guaranteed to be
270  *  acyclic.
271  */
272
273 void
274 Graph::rechain (boost::shared_ptr<RouteList> routelist, GraphEdges const & edges)
275 {
276         Glib::Threads::Mutex::Lock ls (_swap_mutex);
277
278         int chain = _setup_chain;
279         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
280
281         /* This will become the number of nodes that do not feed any other node;
282            once we have processed this number of those nodes, we have finished.
283         */
284         _init_finished_refcount[chain] = 0;
285
286         /* This will become a list of nodes that are not fed by another node, ie
287            those at the `input' end.
288         */
289         _init_trigger_list[chain].clear();
290
291         _nodes_rt[chain].clear();
292
293         /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
294         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
295                 (*ri)->_init_refcount[chain] = 0;
296                 (*ri)->_activation_set[chain].clear();
297                 _nodes_rt[chain].push_back (*ri);
298         }
299
300         // now add refs for the connections.
301
302         for (node_list_t::iterator ni = _nodes_rt[chain].begin(); ni != _nodes_rt[chain].end(); ni++) {
303
304                 boost::shared_ptr<Route> r = boost::dynamic_pointer_cast<Route> (*ni);
305
306                 /* The routes that are directly fed by r */
307                 set<GraphVertex> fed_from_r = edges.from (r);
308
309                 /* Hence whether r has an output */
310                 bool const has_output = !fed_from_r.empty ();
311
312                 /* Set up r's activation set */
313                 for (set<GraphVertex>::iterator i = fed_from_r.begin(); i != fed_from_r.end(); ++i) {
314                         r->_activation_set[chain].insert (*i);
315                 }
316
317                 /* r has an input if there are some incoming edges to r in the graph */
318                 bool const has_input = !edges.has_none_to (r);
319
320                 /* Increment the refcount of any route that we directly feed */
321                 for (node_set_t::iterator ai = r->_activation_set[chain].begin(); ai != r->_activation_set[chain].end(); ai++) {
322                         (*ai)->_init_refcount[chain] += 1;
323                 }
324
325                 if (!has_input) {
326                         /* no input, so this node needs to be triggered initially to get things going */
327                         _init_trigger_list[chain].push_back (*ni);
328                 }
329
330                 if (!has_output) {
331                         /* no output, so this is one of the nodes that we can count off to decide
332                            if we've finished
333                         */
334                         _init_finished_refcount[chain] += 1;
335                 }
336         }
337
338         _pending_chain = chain;
339         dump(chain);
340 }
341
342 /** Called by both the main thread and all helpers.
343  *  @return true to quit, false to carry on.
344  */
345 bool
346 Graph::run_one()
347 {
348         GraphNode* to_run;
349
350         pthread_mutex_lock (&_trigger_mutex);
351         if (_trigger_queue.size()) {
352                 to_run = _trigger_queue.back();
353                 _trigger_queue.pop_back();
354         } else {
355                 to_run = 0;
356         }
357
358         /* the number of threads that are asleep */
359         int et = _execution_tokens;
360         /* the number of nodes that need to be run */
361         int ts = _trigger_queue.size();
362
363         /* hence how many threads to wake up */
364         int wakeup = min (et, ts);
365         /* update the number of threads that will still be sleeping */
366         _execution_tokens -= wakeup;
367
368         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 signals %2\n", pthread_self(), wakeup));
369
370         for (int i = 0; i < wakeup; i++) {
371                 _execution_sem.signal ();
372         }
373
374         while (to_run == 0) {
375                 _execution_tokens += 1;
376                 pthread_mutex_unlock (&_trigger_mutex);
377                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
378                 _execution_sem.wait ();
379                 if (_quit_threads) {
380                         return true;
381                 }
382                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
383                 pthread_mutex_lock (&_trigger_mutex);
384                 if (_trigger_queue.size()) {
385                         to_run = _trigger_queue.back();
386                         _trigger_queue.pop_back();
387                 }
388         }
389         pthread_mutex_unlock (&_trigger_mutex);
390
391         to_run->process();
392         to_run->finish (_current_chain);
393
394         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_self()));
395
396         return false;
397 }
398
399 void
400 Graph::helper_thread()
401 {
402         suspend_rt_malloc_checks ();
403         ProcessThread* pt = new ProcessThread ();
404         resume_rt_malloc_checks ();
405
406         pt->get_buffers();
407
408         while(1) {
409                 if (run_one()) {
410                         break;
411                 }
412         }
413
414         pt->drop_buffers();
415 }
416
417 /** Here's the main graph thread */
418 void
419 Graph::main_thread()
420 {
421         suspend_rt_malloc_checks ();
422         ProcessThread* pt = new ProcessThread ();
423         resume_rt_malloc_checks ();
424
425         pt->get_buffers();
426
427   again:
428         _callback_start_sem.wait ();
429         
430         DEBUG_TRACE(DEBUG::ProcessThreads, "main thread is awake\n");
431
432         if (_quit_threads) {
433                 return;
434         }
435
436         prep ();
437
438         if (_graph_empty && !_quit_threads) {
439                 _callback_done_sem.signal ();
440                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread sees graph done, goes back to sleep\n");
441                 goto again;
442         }
443
444         /* This loop will run forever */
445         while (1) {
446                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread runs one graph node\n");
447                 if (run_one()) {
448                         break;
449                 }
450         }
451
452         pt->drop_buffers();
453 }
454
455 void
456 Graph::dump (int chain)
457 {
458 #ifndef NDEBUG
459         node_list_t::iterator ni;
460         node_set_t::iterator ai;
461
462         chain = _pending_chain;
463
464         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
465         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
466                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
467                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
468                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
469                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
470                 }
471         }
472
473         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
474         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
475                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
476         }
477
478         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
479 #endif
480 }
481
482 int
483 Graph::silent_process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, bool& need_butler)
484 {
485         _process_nframes = nframes;
486         _process_start_frame = start_frame;
487         _process_end_frame = end_frame;
488
489         _process_silent = true;
490         _process_noroll = false;
491         _process_retval = 0;
492         _process_need_butler = false;
493
494         if (!_graph_empty) {
495                 DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for silent process\n");
496                 _callback_start_sem.signal ();
497                 _callback_done_sem.wait ();
498         }
499
500         need_butler = _process_need_butler;
501
502         return _process_retval;
503 }
504
505 int
506 Graph::process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick, bool& need_butler)
507 {
508         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
509
510         _process_nframes = nframes;
511         _process_start_frame = start_frame;
512         _process_end_frame = end_frame;
513         _process_declick = declick;
514
515         _process_silent = false;
516         _process_noroll = false;
517         _process_retval = 0;
518         _process_need_butler = false;
519
520         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for non-silent process\n");
521         _callback_start_sem.signal ();
522         _callback_done_sem.wait ();
523
524         DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
525
526         need_butler = _process_need_butler;
527
528         return _process_retval;
529 }
530
531 int
532 Graph::routes_no_roll (pframes_t nframes, framepos_t start_frame, framepos_t end_frame,
533                        bool non_rt_pending, int declick)
534 {
535         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
536
537         _process_nframes = nframes;
538         _process_start_frame = start_frame;
539         _process_end_frame = end_frame;
540         _process_declick = declick;
541         _process_non_rt_pending = non_rt_pending;
542
543         _process_silent = false;
544         _process_noroll = true;
545         _process_retval = 0;
546         _process_need_butler = false;
547
548         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for no-roll process\n");
549         _callback_start_sem.signal ();
550         _callback_done_sem.wait ();
551
552         return _process_retval;
553 }
554 void
555 Graph::process_one_route (Route* route)
556 {
557         bool need_butler = false;
558         int retval;
559
560         assert (route);
561
562         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
563
564         if (_process_silent) {
565                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, need_butler);
566         } else if (_process_noroll) {
567                 route->set_pending_declick (_process_declick);
568                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending);
569         } else {
570                 route->set_pending_declick (_process_declick);
571                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, need_butler);
572         }
573
574         if (retval) {
575                 _process_retval = retval;
576         }
577
578         if (need_butler) {
579                 _process_need_butler = true;
580         }
581 }
582
583 bool
584 Graph::in_process_thread () const
585 {
586         for (list<AudioBackendNativeThread>::const_iterator i = _thread_list.begin (); i != _thread_list.end(); ++i) {
587                 if (self_thread_equal (*i)) {
588                         return true;
589                 }
590         }
591         return false;
592 }