Merge branch 'master' into cairocanvas
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20 #include <stdio.h>
21 #include <cmath>
22
23 #include "pbd/compose.h"
24 #include "pbd/debug_rt_alloc.h"
25
26 #include "ardour/debug.h"
27 #include "ardour/graph.h"
28 #include "ardour/types.h"
29 #include "ardour/session.h"
30 #include "ardour/route.h"
31 #include "ardour/process_thread.h"
32 #include "ardour/audioengine.h"
33
34 #include <jack/thread.h>
35
36 #include "i18n.h"
37
38 using namespace ARDOUR;
39 using namespace PBD;
40 using namespace std;
41
42 #ifdef DEBUG_RT_ALLOC
43 static Graph* graph = 0;
44
45 extern "C" {
46
47 int alloc_allowed ()
48 {
49         return !graph->in_process_thread ();
50 }
51
52 }
53 #endif
54
55 Graph::Graph (Session & session)
56         : SessionHandleRef (session)
57         , _quit_threads (false)
58         , _execution_sem ("graph_execution", 0)
59         , _callback_start_sem ("graph_start", 0)
60         , _callback_done_sem ("graph_done", 0)
61         , _cleanup_sem ("graph_cleanup", 0)
62 {
63         pthread_mutex_init( &_trigger_mutex, NULL);
64
65         /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
66            memory in the RT thread.
67         */
68         _trigger_queue.reserve (8192);
69
70         _execution_tokens = 0;
71
72         _current_chain = 0;
73         _pending_chain = 0;
74         _setup_chain   = 1;
75         _quit_threads = false;
76         _graph_empty = true;
77
78         reset_thread_list ();
79
80 #ifdef DEBUG_RT_ALLOC
81         graph = this;
82         pbd_alloc_allowed = &::alloc_allowed;
83 #endif
84 }
85
86 /** Set up threads for running the graph */
87 void
88 Graph::reset_thread_list ()
89 {
90         uint32_t num_threads = how_many_dsp_threads ();
91
92         /* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
93         assert (num_threads > 1);
94
95         /* don't bother doing anything here if we already have the right
96            number of threads.
97         */
98
99         if (AudioEngine::instance()->process_thread_count() == num_threads) {
100                 return;
101         }
102
103         Glib::Threads::Mutex::Lock lm (_session.engine().process_lock());
104
105         if (AudioEngine::instance()->process_thread_count() != 0) {
106                 drop_threads ();
107         }
108
109         if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this)) != 0) {
110                 throw failed_constructor ();
111         }
112
113         for (uint32_t i = 1; i < num_threads; ++i) {
114                 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this))) {
115                         throw failed_constructor ();
116                 }
117         }
118 }
119
120 void
121 Graph::session_going_away()
122 {
123         drop_threads ();
124
125         // now drop all references on the nodes.
126         _nodes_rt[0].clear();
127         _nodes_rt[1].clear();
128         _init_trigger_list[0].clear();
129         _init_trigger_list[1].clear();
130         _trigger_queue.clear();
131 }
132
133 void
134 Graph::drop_threads ()
135 {
136         _quit_threads = true;
137
138         uint32_t thread_count = AudioEngine::instance()->process_thread_count ();
139
140         for (unsigned int i=0; i < thread_count; i++) {
141                 _execution_sem.signal ();
142         }
143
144         _callback_start_sem.signal ();
145
146         AudioEngine::instance()->join_process_threads ();
147
148         _execution_tokens = 0;
149
150         _quit_threads = false;
151 }
152
153 void
154 Graph::clear_other_chain ()
155 {
156         Glib::Threads::Mutex::Lock ls (_swap_mutex);
157
158         while (1) {
159                 if (_setup_chain != _pending_chain) {
160
161                         for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
162                                 (*ni)->_activation_set[_setup_chain].clear();
163                         }
164
165                         _nodes_rt[_setup_chain].clear ();
166                         _init_trigger_list[_setup_chain].clear ();
167                         break;
168                 }
169                 /* setup chain == pending chain - we have
170                    to wait till this is no longer true.
171                 */
172                 _cleanup_cond.wait (_swap_mutex);
173         }
174 }
175
176 void
177 Graph::prep()
178 {
179         node_list_t::iterator i;
180         int chain;
181
182         if (_swap_mutex.trylock()) {
183                 // we got the swap mutex.
184                 if (_current_chain != _pending_chain)
185                 {
186                         // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
187                         _setup_chain = _current_chain;
188                         _current_chain = _pending_chain;
189                         _cleanup_cond.signal ();
190                 }
191                 _swap_mutex.unlock ();
192         }
193
194         chain = _current_chain;
195
196         _graph_empty = true;
197         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
198                 (*i)->prep( chain);
199                 _graph_empty = false;
200         }
201         _finished_refcount = _init_finished_refcount[chain];
202
203         /* Trigger the initial nodes for processing, which are the ones at the `input' end */
204         pthread_mutex_lock (&_trigger_mutex);
205         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
206                 /* don't use ::trigger here, as we have already locked the mutex */
207                 _trigger_queue.push_back (i->get ());
208         }
209         pthread_mutex_unlock (&_trigger_mutex);
210 }
211
212 void
213 Graph::trigger (GraphNode* n)
214 {
215         pthread_mutex_lock (&_trigger_mutex);
216         _trigger_queue.push_back (n);
217         pthread_mutex_unlock (&_trigger_mutex);
218 }
219
220 /** Called when a node at the `output' end of the chain (ie one that has no-one to feed)
221  *  is finished.
222  */
223 void
224 Graph::dec_ref()
225 {
226         if (g_atomic_int_dec_and_test (const_cast<gint*> (&_finished_refcount))) {
227
228                 /* We have run all the nodes that are at the `output' end of
229                    the graph, so there is nothing more to do this time around.
230                 */
231
232                 restart_cycle ();
233         }
234 }
235
236 void
237 Graph::restart_cycle()
238 {
239         // we are through. wakeup our caller.
240
241   again:
242         _callback_done_sem.signal ();
243
244         /* Block until the a process callback triggers us */
245         _callback_start_sem.wait();
246
247         if (_quit_threads) {
248                 return;
249         }
250
251         prep ();
252
253         if (_graph_empty) {
254                 goto again;
255         }
256
257         // returning will restart the cycle.
258         // starting with waking up the others.
259 }
260
261 /** Rechain our stuff using a list of routes (which can be in any order) and
262  *  a directed graph of their interconnections, which is guaranteed to be
263  *  acyclic.
264  */
265
266 void
267 Graph::rechain (boost::shared_ptr<RouteList> routelist, GraphEdges const & edges)
268 {
269         Glib::Threads::Mutex::Lock ls (_swap_mutex);
270
271         int chain = _setup_chain;
272         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
273
274         /* This will become the number of nodes that do not feed any other node;
275            once we have processed this number of those nodes, we have finished.
276         */
277         _init_finished_refcount[chain] = 0;
278
279         /* This will become a list of nodes that are not fed by another node, ie
280            those at the `input' end.
281         */
282         _init_trigger_list[chain].clear();
283
284         _nodes_rt[chain].clear();
285
286         /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
287         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
288                 (*ri)->_init_refcount[chain] = 0;
289                 (*ri)->_activation_set[chain].clear();
290                 _nodes_rt[chain].push_back (*ri);
291         }
292
293         // now add refs for the connections.
294
295         for (node_list_t::iterator ni = _nodes_rt[chain].begin(); ni != _nodes_rt[chain].end(); ni++) {
296
297                 boost::shared_ptr<Route> r = boost::dynamic_pointer_cast<Route> (*ni);
298
299                 /* The routes that are directly fed by r */
300                 set<GraphVertex> fed_from_r = edges.from (r);
301
302                 /* Hence whether r has an output */
303                 bool const has_output = !fed_from_r.empty ();
304
305                 /* Set up r's activation set */
306                 for (set<GraphVertex>::iterator i = fed_from_r.begin(); i != fed_from_r.end(); ++i) {
307                         r->_activation_set[chain].insert (*i);
308                 }
309
310                 /* r has an input if there are some incoming edges to r in the graph */
311                 bool const has_input = !edges.has_none_to (r);
312
313                 /* Increment the refcount of any route that we directly feed */
314                 for (node_set_t::iterator ai = r->_activation_set[chain].begin(); ai != r->_activation_set[chain].end(); ai++) {
315                         (*ai)->_init_refcount[chain] += 1;
316                 }
317
318                 if (!has_input) {
319                         /* no input, so this node needs to be triggered initially to get things going */
320                         _init_trigger_list[chain].push_back (*ni);
321                 }
322
323                 if (!has_output) {
324                         /* no output, so this is one of the nodes that we can count off to decide
325                            if we've finished
326                         */
327                         _init_finished_refcount[chain] += 1;
328                 }
329         }
330
331         _pending_chain = chain;
332         dump(chain);
333 }
334
335 /** Called by both the main thread and all helpers.
336  *  @return true to quit, false to carry on.
337  */
338 bool
339 Graph::run_one()
340 {
341         GraphNode* to_run;
342
343         pthread_mutex_lock (&_trigger_mutex);
344         if (_trigger_queue.size()) {
345                 to_run = _trigger_queue.back();
346                 _trigger_queue.pop_back();
347         } else {
348                 to_run = 0;
349         }
350
351         /* the number of threads that are asleep */
352         int et = _execution_tokens;
353         /* the number of nodes that need to be run */
354         int ts = _trigger_queue.size();
355
356         /* hence how many threads to wake up */
357         int wakeup = min (et, ts);
358         /* update the number of threads that will still be sleeping */
359         _execution_tokens -= wakeup;
360
361         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 signals %2\n", pthread_self(), wakeup));
362
363         for (int i = 0; i < wakeup; i++) {
364                 _execution_sem.signal ();
365         }
366
367         while (to_run == 0) {
368                 _execution_tokens += 1;
369                 pthread_mutex_unlock (&_trigger_mutex);
370                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
371                 _execution_sem.wait ();
372                 if (_quit_threads) {
373                         return true;
374                 }
375                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
376                 pthread_mutex_lock (&_trigger_mutex);
377                 if (_trigger_queue.size()) {
378                         to_run = _trigger_queue.back();
379                         _trigger_queue.pop_back();
380                 }
381         }
382         pthread_mutex_unlock (&_trigger_mutex);
383
384         to_run->process();
385         to_run->finish (_current_chain);
386
387         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_self()));
388
389         return false;
390 }
391
392 void
393 Graph::helper_thread()
394 {
395         suspend_rt_malloc_checks ();
396         ProcessThread* pt = new ProcessThread ();
397         resume_rt_malloc_checks ();
398
399         pt->get_buffers();
400
401         while(1) {
402                 if (run_one()) {
403                         break;
404                 }
405         }
406
407         pt->drop_buffers();
408 }
409
410 /** Here's the main graph thread */
411 void
412 Graph::main_thread()
413 {
414         suspend_rt_malloc_checks ();
415         ProcessThread* pt = new ProcessThread ();
416         resume_rt_malloc_checks ();
417
418         pt->get_buffers();
419
420   again:
421         _callback_start_sem.wait ();
422         
423         DEBUG_TRACE(DEBUG::ProcessThreads, "main thread is awake\n");
424
425         if (_quit_threads) {
426                 return;
427         }
428
429         prep ();
430
431         if (_graph_empty && !_quit_threads) {
432                 _callback_done_sem.signal ();
433                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread sees graph done, goes back to sleep\n");
434                 goto again;
435         }
436
437         /* This loop will run forever */
438         while (1) {
439                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread runs one graph node\n");
440                 if (run_one()) {
441                         break;
442                 }
443         }
444
445         pt->drop_buffers();
446 }
447
448 void
449 Graph::dump (int chain)
450 {
451 #ifndef NDEBUG
452         node_list_t::iterator ni;
453         node_set_t::iterator ai;
454
455         chain = _pending_chain;
456
457         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
458         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
459                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
460                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
461                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
462                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
463                 }
464         }
465
466         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
467         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
468                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
469         }
470
471         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
472 #endif
473 }
474
475 int
476 Graph::silent_process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, bool& need_butler)
477 {
478         _process_nframes = nframes;
479         _process_start_frame = start_frame;
480         _process_end_frame = end_frame;
481
482         _process_silent = true;
483         _process_noroll = false;
484         _process_retval = 0;
485         _process_need_butler = false;
486
487         if (!_graph_empty) {
488                 DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for silent process\n");
489                 _callback_start_sem.signal ();
490                 _callback_done_sem.wait ();
491         }
492
493         need_butler = _process_need_butler;
494
495         return _process_retval;
496 }
497
498 int
499 Graph::process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick, bool& need_butler)
500 {
501         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
502
503         _process_nframes = nframes;
504         _process_start_frame = start_frame;
505         _process_end_frame = end_frame;
506         _process_declick = declick;
507
508         _process_silent = false;
509         _process_noroll = false;
510         _process_retval = 0;
511         _process_need_butler = false;
512
513         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for non-silent process\n");
514         _callback_start_sem.signal ();
515         _callback_done_sem.wait ();
516
517         DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
518
519         need_butler = _process_need_butler;
520
521         return _process_retval;
522 }
523
524 int
525 Graph::routes_no_roll (pframes_t nframes, framepos_t start_frame, framepos_t end_frame,
526                        bool non_rt_pending, int declick)
527 {
528         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
529
530         _process_nframes = nframes;
531         _process_start_frame = start_frame;
532         _process_end_frame = end_frame;
533         _process_declick = declick;
534         _process_non_rt_pending = non_rt_pending;
535
536         _process_silent = false;
537         _process_noroll = true;
538         _process_retval = 0;
539         _process_need_butler = false;
540
541         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for no-roll process\n");
542         _callback_start_sem.signal ();
543         _callback_done_sem.wait ();
544
545         return _process_retval;
546 }
547 void
548 Graph::process_one_route (Route* route)
549 {
550         bool need_butler = false;
551         int retval;
552
553         assert (route);
554
555         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
556
557         if (_process_silent) {
558                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, need_butler);
559         } else if (_process_noroll) {
560                 route->set_pending_declick (_process_declick);
561                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending);
562         } else {
563                 route->set_pending_declick (_process_declick);
564                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, need_butler);
565         }
566
567         if (retval) {
568                 _process_retval = retval;
569         }
570
571         if (need_butler) {
572                 _process_need_butler = true;
573         }
574 }
575
576 bool
577 Graph::in_process_thread () const
578 {
579         return AudioEngine::instance()->in_process_thread ();
580 }