7cddd522e1275b295317d3a2356ac1994d2e0259
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20 #include <stdio.h>
21 #include <cmath>
22
23 #include "pbd/compose.h"
24 #include "pbd/cpus.h"
25 #include "pbd/debug_rt_alloc.h"
26
27 #include "ardour/debug.h"
28 #include "ardour/graph.h"
29 #include "ardour/types.h"
30 #include "ardour/session.h"
31 #include "ardour/route.h"
32 #include "ardour/process_thread.h"
33 #include "ardour/audioengine.h"
34
35 #include <jack/thread.h>
36
37 #include "i18n.h"
38
39 using namespace ARDOUR;
40 using namespace PBD;
41 using namespace std;
42
43 #ifdef DEBUG_RT_ALLOC
44 static Graph* graph = 0;
45 extern "C" {
46
47 int alloc_allowed ()
48 {
49         return !graph->in_process_thread ();
50 }
51
52 }
53 #endif
54
55 Graph::Graph (Session & session) 
56         : SessionHandleRef (session) 
57         , _execution_sem ("graph_execution", 0)
58         , _callback_start_sem ("graph_start", 0)
59         , _callback_done_sem ("graph_done", 0)
60         , _cleanup_sem ("graph_cleanup", 0)
61 {
62         pthread_mutex_init( &_trigger_mutex, NULL);
63
64         /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
65            memory in the RT thread.
66         */
67         _trigger_queue.reserve (8192);
68
69         _execution_tokens = 0;
70
71         _current_chain = 0;
72         _pending_chain = 0;
73         _setup_chain   = 1;
74         _quit_threads = false;
75         _graph_empty = true;
76
77         int num_cpu = hardware_concurrency();
78         int num_threads = num_cpu;
79         int pu = Config->get_processor_usage ();
80
81         if (pu < 0) {
82                 /* pu is negative: use "pu" less cores for DSP than appear to be available
83                  */
84
85                 if (-pu < num_threads) {
86                         num_threads += pu; 
87                 } else {
88                         num_threads = 1;
89                 }
90         } else {
91                 /* use "pu" cores, if available
92                  */
93
94                 if (pu <= num_threads) {
95                         num_threads = pu;
96                 } 
97         }
98
99         info << string_compose (_("Using %2 threads on %1 CPUs"), num_cpu, num_threads) << endmsg;
100
101         pthread_t a_thread;
102
103         if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this), &a_thread, 100000) == 0) {
104                 _thread_list.push_back (a_thread);
105         }
106
107         for (int i = 1; i < num_threads; ++i) {
108                 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this), &a_thread, 100000) == 0) {
109                         _thread_list.push_back (a_thread);
110                 }
111         }
112
113 #ifdef DEBUG_RT_ALLOC   
114         graph = this;
115         pbd_alloc_allowed = &::alloc_allowed;
116 #endif  
117 }
118
119 void
120 Graph::session_going_away()
121 {
122         _quit_threads = true;
123
124         for (unsigned int i=0; i<_thread_list.size(); i++) {
125                 _execution_sem.signal ();
126         }
127
128         _callback_start_sem.signal ();
129
130         for (list<pthread_t>::iterator i = _thread_list.begin(); i != _thread_list.end(); i++) {
131                 void* status;
132                 pthread_join (*i, &status);
133         }
134
135         // now drop all references on the nodes.
136         _nodes_rt[0].clear();
137         _nodes_rt[1].clear();
138         _init_trigger_list[0].clear();
139         _init_trigger_list[1].clear();
140         _trigger_queue.clear();
141 }
142
143 void
144 Graph::clear_other_chain ()
145 {
146         Glib::Mutex::Lock ls (_swap_mutex);
147
148         while (1) {
149                 if (_setup_chain != _pending_chain) {
150
151                         for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
152                                 (*ni)->_activation_set[_setup_chain].clear();
153                         }
154
155                         _nodes_rt[_setup_chain].clear ();
156                         _init_trigger_list[_setup_chain].clear ();
157                         break;
158                 }
159                 /* setup chain == pending chain - we have
160                    to wait till this is no longer true.
161                 */
162                 _cleanup_cond.wait (_swap_mutex);                
163         }
164 }
165
166 void
167 Graph::prep()
168 {
169         node_list_t::iterator i;
170         int chain;
171
172         if (_swap_mutex.trylock()) {
173                 // we got the swap mutex.
174                 if (_current_chain != _pending_chain)
175                 {
176                         // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
177                         _setup_chain = _current_chain;
178                         _current_chain = _pending_chain;
179                         _cleanup_cond.signal ();
180                 }
181                 _swap_mutex.unlock ();
182         }
183
184         chain = _current_chain;
185
186         _graph_empty = true;
187         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
188                 (*i)->prep( chain);
189                 _graph_empty = false;
190         }
191         _finished_refcount = _init_finished_refcount[chain];
192
193         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
194                 this->trigger( i->get() );
195         }
196 }
197
198 void
199 Graph::trigger (GraphNode* n)
200 {
201         pthread_mutex_lock (&_trigger_mutex);
202         _trigger_queue.push_back( n);
203         pthread_mutex_unlock (&_trigger_mutex);
204 }
205
206 void
207 Graph::dec_ref()
208 {
209         if (g_atomic_int_dec_and_test (&_finished_refcount)) {
210
211                 // ok... this cycle is finished now.
212                 // we are the only thread alive.
213         
214                 this->restart_cycle();
215         }
216 }
217
218 void
219 Graph::restart_cycle()
220 {
221         //printf( "cycle_done chain: %d\n", _current_chain);
222
223         // we are through. wakeup our caller.
224   again:
225         _callback_done_sem.signal ();
226
227         // block until we are triggered.
228         _callback_start_sem.wait();
229         if (_quit_threads)
230                 return;
231
232         //printf( "cycle_start\n" );
233
234         this->prep();
235         if (_graph_empty)
236                 goto again;
237         //printf( "cycle_start chain: %d\n", _current_chain);
238
239         // returning will restart the cycle.
240         //  starting with waking up the others.
241 }
242
243 static bool
244 is_feedback (boost::shared_ptr<RouteList> routelist, Route* from, boost::shared_ptr<Route> to)
245 {
246         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
247                 if ((*ri).get() == from)
248                         return false;
249                 if ((*ri) == to)
250                         return true;
251         }
252         assert(0);
253         return false;
254 }
255
256 static bool
257 is_feedback (boost::shared_ptr<RouteList> routelist, boost::shared_ptr<Route> from, Route* to)
258 {
259         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
260                 if ((*ri).get() == to)
261                         return true;
262                 if ((*ri) == from)
263                         return false;
264         }
265         assert(0);
266         return false;
267 }
268
269 void
270 Graph::rechain (boost::shared_ptr<RouteList> routelist)
271 {
272         node_list_t::iterator ni;
273         Glib::Mutex::Lock ls (_swap_mutex);
274
275         int chain = _setup_chain;
276         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
277         // set all refcounts to 0;
278
279         _init_finished_refcount[chain] = 0;
280         _init_trigger_list[chain].clear();
281
282         _nodes_rt[chain].clear();
283
284         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
285                 node_ptr_t n = boost::dynamic_pointer_cast<GraphNode> (*ri);
286
287                 n->_init_refcount[chain] = 0;
288                 n->_activation_set[chain].clear();
289                 _nodes_rt[chain].push_back(n);
290         }
291
292         // now add refs for the connections.
293
294         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
295                 bool has_input  = false;
296                 bool has_output = false;
297
298                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
299
300                 for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
301                         if (rp->direct_feeds (*ri)) {
302                                 if (is_feedback (routelist, rp.get(), *ri)) {
303                                         continue; 
304                                 }
305
306                                 has_output = true;
307                                 (*ni)->_activation_set[chain].insert (boost::dynamic_pointer_cast<GraphNode> (*ri) );
308                         }
309                 }
310
311                 for (Route::FedBy::iterator fi=rp->fed_by().begin(); fi!=rp->fed_by().end(); fi++) {
312                         if (boost::shared_ptr<Route> r = fi->r.lock()) {
313                                 if (!is_feedback (routelist, r, rp.get())) {
314                                         has_input = true;
315                                 }
316                         }
317                 }
318
319                 for (node_set_t::iterator ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
320                         (*ai)->_init_refcount[chain] += 1;
321                 }
322
323                 if (!has_input)
324                         _init_trigger_list[chain].push_back (*ni);
325
326                 if (!has_output)
327                         _init_finished_refcount[chain] += 1;
328         } 
329
330         _pending_chain = chain;
331         dump(chain);
332 }
333
334 bool
335 Graph::run_one()
336 {
337         GraphNode* to_run;
338
339         pthread_mutex_lock (&_trigger_mutex);
340         if (_trigger_queue.size()) {
341                 to_run = _trigger_queue.back();
342                 _trigger_queue.pop_back();
343         } else {
344                 to_run = 0;
345         }
346
347         int et = _execution_tokens;
348         int ts = _trigger_queue.size();
349
350         int wakeup = min (et, ts);
351         _execution_tokens -= wakeup;
352
353         for (int i=0; i<wakeup; i++ ) {
354                 _execution_sem.signal ();
355         }
356
357         while (to_run == 0) {
358                 _execution_tokens += 1;
359                 pthread_mutex_unlock (&_trigger_mutex);
360                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
361                 _execution_sem.wait ();
362                 if (_quit_threads)
363                         return true;
364                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
365                 pthread_mutex_lock (&_trigger_mutex);
366                 if (_trigger_queue.size()) {
367                         to_run = _trigger_queue.back();
368                         _trigger_queue.pop_back();
369                 }
370         }
371         pthread_mutex_unlock (&_trigger_mutex);
372
373         to_run->process();
374         to_run->finish (_current_chain);
375
376         return false;
377 }
378
379 static void get_rt()
380 {
381         if (!jack_is_realtime (AudioEngine::instance()->jack())) {
382                 return;
383         }
384
385         int priority = jack_client_real_time_priority (AudioEngine::instance()->jack());
386
387         if (priority) {
388                 struct sched_param rtparam;
389         
390                 memset (&rtparam, 0, sizeof (rtparam));
391                 rtparam.sched_priority = priority;
392         
393                 pthread_setschedparam (pthread_self(), SCHED_FIFO, &rtparam);
394         }
395 }
396
397 void
398 Graph::helper_thread()
399 {
400         ProcessThread* pt = new ProcessThread ();
401
402         pt->get_buffers();
403         get_rt();
404
405         while(1) {
406                 if (run_one()) {
407                         break;
408                 }
409         }
410
411         pt->drop_buffers();
412 }
413
414 void
415 Graph::main_thread()
416 {
417         ProcessThread* pt = new ProcessThread ();
418
419         pt->get_buffers();
420         get_rt();
421
422   again:
423         _callback_start_sem.wait ();
424         DEBUG_TRACE(DEBUG::Graph, "main thread is awake\n");
425         this->prep();
426
427         if (_graph_empty && !_quit_threads) {
428                 _callback_done_sem.signal ();
429                 goto again;
430         }
431
432         while (1) {
433                 DEBUG_TRACE(DEBUG::Graph, "main thread runs one graph node\n");
434                 if (run_one()) {
435                         break;
436                 }
437         }
438
439         pt->drop_buffers();
440 }
441
442 void
443 Graph::dump (int chain)
444 {
445 #ifndef NDEBUG
446         node_list_t::iterator ni;
447         node_set_t::iterator ai;
448
449         chain = _pending_chain;
450
451         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
452         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
453                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
454                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
455                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
456                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
457                 }
458         }
459
460         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
461         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
462                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
463         }
464
465         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
466 #endif
467 }
468
469 int
470 Graph::silent_process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame,
471                               bool can_record, bool rec_monitors_input, bool& need_butler)
472 {
473         _process_nframes = nframes;
474         _process_start_frame = start_frame;
475         _process_end_frame = end_frame;
476         _process_can_record = can_record;
477         _process_rec_monitors_input = rec_monitors_input;
478
479         _process_silent = true;
480         _process_noroll = false;
481         _process_retval = 0;
482         _process_need_butler = false;
483
484         if (!_graph_empty) {
485                 DEBUG_TRACE(DEBUG::Graph, "wake graph for silent process\n");
486                 _callback_start_sem.signal ();
487                 _callback_done_sem.wait ();
488         }
489
490         need_butler = _process_need_butler;
491
492         return _process_retval;
493 }
494
495 int
496 Graph::process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick,
497                        bool can_record, bool rec_monitors_input, bool& need_butler)
498 {
499         DEBUG_TRACE (DEBUG::Graph, string_compose ("graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
500
501         _process_nframes = nframes;
502         _process_start_frame = start_frame;
503         _process_end_frame = end_frame;
504         _process_can_record = can_record;
505         _process_rec_monitors_input = rec_monitors_input;
506         _process_declick = declick;
507
508         _process_silent = false;
509         _process_noroll = false;
510         _process_retval = 0;
511         _process_need_butler = false;
512
513         DEBUG_TRACE(DEBUG::Graph, "wake graph for non-silent process\n");
514         _callback_start_sem.signal ();
515         _callback_done_sem.wait ();
516
517         DEBUG_TRACE (DEBUG::Graph, "graph execution complete\n");
518
519         need_butler = _process_need_butler;
520
521         return _process_retval;
522 }
523
524 int
525 Graph::routes_no_roll (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, 
526                        bool non_rt_pending, bool can_record, int declick)
527 {
528         DEBUG_TRACE (DEBUG::Graph, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
529
530         _process_nframes = nframes;
531         _process_start_frame = start_frame;
532         _process_end_frame = end_frame;
533         _process_can_record = can_record;
534         _process_declick = declick;
535         _process_non_rt_pending = non_rt_pending;
536
537         _process_silent = false;
538         _process_noroll = true;
539         _process_retval = 0;
540         _process_need_butler = false;
541
542         DEBUG_TRACE(DEBUG::Graph, "wake graph for no-roll process\n");
543         _callback_start_sem.signal ();
544         _callback_done_sem.wait ();
545
546         return _process_retval;
547 }
548 void
549 Graph::process_one_route (Route* route)
550 {
551         bool need_butler = false;
552         int retval;
553
554         assert (route);
555
556         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
557
558         if (_process_silent) {
559                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_can_record, _process_rec_monitors_input, need_butler);
560         } else if (_process_noroll) {
561                 route->set_pending_declick (_process_declick);
562                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending, _process_can_record, _process_declick);
563         } else {
564                 route->set_pending_declick (_process_declick);
565                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, _process_can_record, _process_rec_monitors_input, need_butler);
566         }
567
568         if (retval) {
569                 _process_retval = retval;
570         }
571     
572         if (need_butler) {
573                 _process_need_butler = true;
574         }
575 }
576
577 bool
578 Graph::in_process_thread () const
579 {
580         list<pthread_t>::const_iterator i = _thread_list.begin ();
581         while (i != _thread_list.end() && *i != pthread_self ()) {
582                 ++i;
583         }
584
585         return i != _thread_list.end ();
586 }