54e619aef89d73adfed11e49fab87392df529f43
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20
21 #include <stdio.h>
22 #include <cmath>
23
24 #include "pbd/compose.h"
25 #include "pbd/cpus.h"
26
27 #include "ardour/debug.h"
28 #include "ardour/graph.h"
29 #include "ardour/types.h"
30 #include "ardour/session.h"
31 #include "ardour/route.h"
32 #include "ardour/process_thread.h"
33 #include "ardour/audioengine.h"
34
35 #include <jack/thread.h>
36
37 #include "i18n.h"
38
39 using namespace ARDOUR;
40 using namespace PBD;
41
42
43 Graph::Graph (Session & session) 
44         : SessionHandleRef (session) 
45 {
46         pthread_mutex_init( &_trigger_mutex, NULL);
47         sem_init( &_execution_sem, 0, 0 );
48
49         sem_init( &_callback_start_sem, 0, 0 );
50         sem_init( &_callback_done_sem,  0, 0 );
51
52         _execution_tokens = 0;
53
54         pthread_mutex_init (&_swap_mutex, NULL);
55         _current_chain = 0;
56         _pending_chain = 0;
57         _setup_chain   = 1;
58         _quit_threads = false;
59         _graph_empty = true;
60
61         int num_cpu = hardware_concurrency();
62         int num_threads = num_cpu;
63         int pu = Config->get_processor_usage ();
64
65         if (pu < 0) {
66                 /* use "pu" less cores for DSP than appear to be available
67                  */
68
69                 if (pu < num_threads) {
70                         num_threads += pu; // pu is negative
71                 } else {
72                         num_threads = 1;
73                 }
74         } else {
75                 /* use "pu" cores, if available
76                  */
77
78                 if (pu <= num_threads) {
79                         num_threads = pu;
80                 } 
81         }
82
83         info << string_compose (_("Using %2 threads on %1 CPUs"), num_cpu, num_threads) << endmsg;
84
85         _thread_list.push_back (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this), 100000));
86
87         for (int i = 1; i < num_threads; ++i) {
88                 _thread_list.push_back (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this), 100000));
89         }
90 }
91
92 void
93 Graph::session_going_away()
94 {
95         _quit_threads = true;
96
97         for (unsigned int i=0; i<_thread_list.size(); i++) {
98                 sem_post( &_execution_sem);
99         }
100
101         sem_post( &_callback_start_sem);
102
103         for (std::list<pthread_t>::iterator i = _thread_list.begin(); i != _thread_list.end(); i++) {
104                 void* status;
105                 pthread_join (*i, &status);
106         }
107
108         // now drop all references on the nodes.
109         _nodes_rt[0].clear();
110         _nodes_rt[1].clear();
111         _init_trigger_list[0].clear();
112         _init_trigger_list[1].clear();
113         _trigger_queue.clear();
114 }
115
116 void
117 Graph::prep()
118 {
119         node_list_t::iterator i;
120         int chain;
121
122         if (pthread_mutex_trylock (&_swap_mutex) == 0) {
123                 // we got the swap mutex.
124                 if (_current_chain != _pending_chain)
125                 {
126                         //printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
127                         _setup_chain = _current_chain;
128                         _current_chain = _pending_chain;
129                 }
130                 pthread_mutex_unlock (&_swap_mutex);
131         }
132
133         chain = _current_chain;
134
135         _graph_empty = true;
136         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
137                 (*i)->prep( chain);
138                 _graph_empty = false;
139         }
140         _finished_refcount = _init_finished_refcount[chain];
141
142         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
143                 this->trigger( i->get() );
144         }
145 }
146
147 void
148 Graph::trigger (GraphNode* n)
149 {
150         pthread_mutex_lock (&_trigger_mutex);
151         _trigger_queue.push_back( n);
152         pthread_mutex_unlock (&_trigger_mutex);
153 }
154
155 void
156 Graph::dec_ref()
157 {
158         if (g_atomic_int_dec_and_test (&_finished_refcount)) {
159
160                 // ok... this cycle is finished now.
161                 // we are the only thread alive.
162         
163                 this->restart_cycle();
164         }
165 }
166
167 void
168 Graph::restart_cycle()
169 {
170         //printf( "cycle_done chain: %d\n", _current_chain);
171
172         // we are through. wakeup our caller.
173   again:
174         sem_post( &_callback_done_sem);
175
176         // block until we are triggered.
177         sem_wait( &_callback_start_sem);
178         if (_quit_threads)
179                 return;
180
181         //printf( "cycle_start\n" );
182
183         this->prep();
184         if (_graph_empty)
185                 goto again;
186         //printf( "cycle_start chain: %d\n", _current_chain);
187
188         // returning will restart the cycle.
189         //  starting with waking up the others.
190 }
191
192 static bool
193 is_feedback (boost::shared_ptr<RouteList> routelist, Route* from, boost::shared_ptr<Route> to)
194 {
195         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
196                 if ((*ri).get() == from)
197                         return false;
198                 if ((*ri) == to)
199                         return true;
200         }
201         assert(0);
202         return false;
203 }
204
205 static bool
206 is_feedback (boost::shared_ptr<RouteList> routelist, boost::shared_ptr<Route> from, Route* to)
207 {
208         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
209                 if ((*ri).get() == to)
210                         return true;
211                 if ((*ri) == from)
212                         return false;
213         }
214         assert(0);
215         return false;
216 }
217
218 void
219 Graph::rechain (boost::shared_ptr<RouteList> routelist)
220 {
221         node_list_t::iterator ni;
222
223         pthread_mutex_lock (&_swap_mutex);
224         int chain = _setup_chain;
225         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
226         // set all refcounts to 0;
227
228         _init_finished_refcount[chain] = 0;
229         _init_trigger_list[chain].clear();
230
231         _nodes_rt[chain].clear();
232
233         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
234                 node_ptr_t n = boost::dynamic_pointer_cast<GraphNode> (*ri);
235
236                 n->_init_refcount[chain] = 0;
237                 n->_activation_set[chain].clear();
238                 _nodes_rt[chain].push_back(n);
239         }
240
241         // now add refs for the connections.
242
243         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
244                 bool has_input  = false;
245                 bool has_output = false;
246
247                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
248
249                 for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
250                         if (rp->direct_feeds (*ri)) {
251                                 if (is_feedback (routelist, rp.get(), *ri)) {
252                                         continue; 
253                                 }
254
255                                 has_output = true;
256                                 (*ni)->_activation_set[chain].insert (boost::dynamic_pointer_cast<GraphNode> (*ri) );
257                         }
258                 }
259
260                 for (Route::FedBy::iterator fi=rp->fed_by().begin(); fi!=rp->fed_by().end(); fi++) {
261                         if (boost::shared_ptr<Route> r = fi->r.lock()) {
262                                 if (!is_feedback (routelist, r, rp.get())) {
263                                         has_input = true;
264                                 }
265                         }
266                 }
267
268                 for (node_set_t::iterator ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
269                         (*ai)->_init_refcount[chain] += 1;
270                 }
271
272                 if (!has_input)
273                         _init_trigger_list[chain].push_back (*ni);
274
275                 if (!has_output)
276                         _init_finished_refcount[chain] += 1;
277         } 
278
279         _pending_chain = chain;
280         dump(chain);
281         pthread_mutex_unlock (&_swap_mutex);
282 }
283
284
285 bool
286 Graph::run_one()
287 {
288         GraphNode* to_run;
289
290         pthread_mutex_lock (&_trigger_mutex);
291         if (_trigger_queue.size()) {
292                 to_run = _trigger_queue.back();
293                 _trigger_queue.pop_back();
294         } else {
295                 to_run = 0;
296         }
297
298         int wakeup = std::min ((int) _execution_tokens, (int) _trigger_queue.size());
299         _execution_tokens -= wakeup;
300
301         for (int i=0; i<wakeup; i++ ) {
302                 sem_post (&_execution_sem);
303         }
304
305         while (to_run == 0) {
306                 _execution_tokens += 1;
307                 pthread_mutex_unlock (&_trigger_mutex);
308                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
309                 sem_wait (&_execution_sem);
310                 if (_quit_threads)
311                         return true;
312                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
313                 pthread_mutex_lock (&_trigger_mutex);
314                 if (_trigger_queue.size()) {
315                         to_run = _trigger_queue.back();
316                         _trigger_queue.pop_back();
317                 }
318         }
319         pthread_mutex_unlock (&_trigger_mutex);
320
321         to_run->process();
322         to_run->finish (_current_chain);
323
324         return false;
325 }
326
327 static void get_rt()
328 {
329         if (!jack_is_realtime (AudioEngine::instance()->jack())) {
330                 return;
331         }
332
333         int priority = jack_client_real_time_priority (AudioEngine::instance()->jack());
334
335         if (priority) {
336                 struct sched_param rtparam;
337         
338                 memset (&rtparam, 0, sizeof (rtparam));
339                 rtparam.sched_priority = priority;
340         
341                 pthread_setschedparam (pthread_self(), SCHED_FIFO, &rtparam);
342         }
343 }
344
345 void
346 Graph::helper_thread()
347 {
348         ProcessThread *pt = new ProcessThread;
349
350         pt->get_buffers();
351         get_rt();
352
353         while(1) {
354                 if (run_one()) {
355                         break;
356                 }
357         }
358
359         pt->drop_buffers();
360 }
361
362 void
363 Graph::main_thread()
364 {
365         ProcessThread *pt = new ProcessThread;
366
367         pt->get_buffers();
368         get_rt();
369
370   again:
371         sem_wait (&_callback_start_sem);
372
373         this->prep();
374
375         if (_graph_empty) {
376                 sem_post (&_callback_done_sem);
377                 goto again;
378         }
379
380         while (1) {
381                 if (run_one()) {
382                         break;
383                 }
384         }
385
386         pt->drop_buffers();
387 }
388
389 void
390 Graph::dump (int chain)
391 {
392 #ifndef NDEBUG
393         node_list_t::iterator ni;
394         node_set_t::iterator ai;
395
396         chain = _pending_chain;
397
398         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
399         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
400                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
401                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
402                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
403                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
404                 }
405         }
406
407         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
408         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
409                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
410         }
411
412         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
413 #endif
414 }
415
416 int
417 Graph::silent_process_routes (nframes_t nframes, framepos_t start_frame, framepos_t end_frame,
418                               bool can_record, bool rec_monitors_input, bool& need_butler)
419 {
420         _process_nframes = nframes;
421         _process_start_frame = start_frame;
422         _process_end_frame = end_frame;
423         _process_can_record = can_record;
424         _process_rec_monitors_input = rec_monitors_input;
425
426         _process_silent = true;
427         _process_noroll = false;
428         _process_retval = 0;
429         _process_need_butler = false;
430
431         if (!_graph_empty) {
432                 sem_post (&_callback_start_sem);
433                 sem_wait (&_callback_done_sem);
434         }
435
436         need_butler = _process_need_butler;
437
438         return _process_retval;
439 }
440
441 int
442 Graph::process_routes (nframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick,
443                        bool can_record, bool rec_monitors_input, bool& need_butler)
444 {
445         _process_nframes = nframes;
446         _process_start_frame = start_frame;
447         _process_end_frame = end_frame;
448         _process_can_record = can_record;
449         _process_rec_monitors_input = rec_monitors_input;
450         _process_declick = declick;
451
452         _process_silent = false;
453         _process_noroll = false;
454         _process_retval = 0;
455         _process_need_butler = false;
456
457         sem_post (&_callback_start_sem);
458         sem_wait (&_callback_done_sem);
459
460         need_butler = _process_need_butler;
461
462         return _process_retval;
463 }
464
465 int
466 Graph::routes_no_roll (nframes_t nframes, framepos_t start_frame, framepos_t end_frame, 
467                        bool non_rt_pending, bool can_record, int declick)
468 {
469         _process_nframes = nframes;
470         _process_start_frame = start_frame;
471         _process_end_frame = end_frame;
472         _process_can_record = can_record;
473         _process_declick = declick;
474         _process_non_rt_pending = non_rt_pending;
475
476         _process_silent = false;
477         _process_noroll = true;
478         _process_retval = 0;
479         _process_need_butler = false;
480
481         sem_post (&_callback_start_sem);
482         sem_wait (&_callback_done_sem);
483
484         return _process_retval;
485 }
486 void
487 Graph::process_one_route (Route* route)
488 {
489         bool need_butler = false;
490         int retval;
491
492         assert (route);
493
494         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
495
496         if (_process_silent) {
497                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_can_record, _process_rec_monitors_input, need_butler);
498         } else if (_process_noroll) {
499                 route->set_pending_declick (_process_declick);
500                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending, _process_can_record, _process_declick);
501         } else {
502                 route->set_pending_declick (_process_declick);
503                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, _process_can_record, _process_rec_monitors_input, need_butler);
504         }
505
506         if (retval) {
507                 _process_retval = retval;
508         }
509     
510         if (need_butler) {
511                 _process_need_butler = true;
512         }
513 }
514
515
516