X-Git-Url: https://main.carlh.net/gitweb/?a=blobdiff_plain;f=libs%2Fardour%2Fworker.cc;h=024ec8b6add2ceed20baa07e43ebbb5fb501d125;hb=8139becb1898187729b0ea57f145302d4975bf3a;hp=67c72ffeba0b4b290865d70022148a948e92e832;hpb=a8f0c3255f6f645913f57748d34db640bc7adcc2;p=ardour.git diff --git a/libs/ardour/worker.cc b/libs/ardour/worker.cc index 67c72ffeba..024ec8b6ad 100644 --- a/libs/ardour/worker.cc +++ b/libs/ardour/worker.cc @@ -1,5 +1,5 @@ /* - Copyright (C) 2012 Paul Davis + Copyright (C) 2012-2016 Paul Davis Author: David Robillard This program is free software; you can redistribute it and/or modify @@ -18,50 +18,98 @@ */ #include +#include #include "ardour/worker.h" #include "pbd/error.h" +#include "pbd/compose.h" + +#include namespace ARDOUR { -Worker::Worker(Workee* workee, uint32_t ring_size) +Worker::Worker(Workee* workee, uint32_t ring_size, bool threaded) : _workee(workee) - , _requests(new RingBuffer(ring_size)) - , _responses(new RingBuffer(ring_size)) + , _requests(threaded ? new PBD::RingBuffer(ring_size) : NULL) + , _responses(new PBD::RingBuffer(ring_size)) , _response((uint8_t*)malloc(ring_size)) - , _sem(0) + , _sem(string_compose ("worker_semaphore%1", this).c_str(), 0) + , _thread(NULL) , _exit(false) - , _thread (Glib::Threads::Thread::create(sigc::mem_fun(*this, &Worker::run))) -{} + , _synchronous(!threaded) +{ + if (threaded) { + _thread = Glib::Threads::Thread::create( + sigc::mem_fun(*this, &Worker::run)); + } +} Worker::~Worker() { _exit = true; - _sem.post(); - _thread->join(); + _sem.signal(); + if (_thread) { + _thread->join(); + } + delete _responses; + delete _requests; + free (_response); } bool Worker::schedule(uint32_t size, const void* data) { + if (_synchronous || !_requests) { + _workee->work(*this, size, data); + return true; + } + if (_requests->write_space() < size + sizeof(size)) { + return false; + } if (_requests->write((const uint8_t*)&size, sizeof(size)) != sizeof(size)) { return false; } if (_requests->write((const uint8_t*)data, size) != size) { - return false; // FIXME: corruption + return false; } - _sem.post(); + _sem.signal(); return true; } bool Worker::respond(uint32_t size, const void* data) { + if (_responses->write_space() < size + sizeof(size)) { + return false; + } if (_responses->write((const uint8_t*)&size, sizeof(size)) != sizeof(size)) { return false; } if (_responses->write((const uint8_t*)data, size) != size) { - return false; // FIXME: corruption + return false; + } + return true; +} + +bool +Worker::verify_message_completeness(PBD::RingBuffer* rb) +{ + uint32_t read_space = rb->read_space(); + uint32_t size; + PBD::RingBuffer::rw_vector vec; + rb->get_read_vector (&vec); + if (vec.len[0] + vec.len[1] < sizeof(size)) { + return false; + } + if (vec.len[0] >= sizeof(size)) { + memcpy (&size, vec.buf[0], sizeof (size)); + } else { + memcpy (&size, vec.buf[0], vec.len[0]); + memcpy (&size + vec.len[0], vec.buf[1], sizeof(size) - vec.len[0]); + } + if (read_space < size+sizeof(size)) { + /* message from writer is yet incomplete. respond next cycle */ + return false; } return true; } @@ -71,7 +119,12 @@ Worker::emit_responses() { uint32_t read_space = _responses->read_space(); uint32_t size = 0; - while (read_space > sizeof(size)) { + while (read_space >= sizeof(size)) { + if (!verify_message_completeness(_responses)) { + /* message from writer is yet incomplete. respond next cycle */ + return; + } + /* read and send response */ _responses->read((uint8_t*)&size, sizeof(size)); _responses->read(_response, size); _workee->work_response(size, _response); @@ -87,10 +140,22 @@ Worker::run() while (true) { _sem.wait(); if (_exit) { + free(buf); return; } - uint32_t size = 0; + uint32_t size = _requests->read_space(); + if (size < sizeof(size)) { + PBD::error << "Worker: no work-data on ring buffer" << endmsg; + continue; + } + while (!verify_message_completeness(_requests)) { + Glib::usleep(2000); + if (_exit) { + if (buf) free(buf); + return; + } + } if (_requests->read((uint8_t*)&size, sizeof(size)) < sizeof(size)) { PBD::error << "Worker: Error reading size from request ring" << endmsg; @@ -99,7 +164,13 @@ Worker::run() if (size > buf_size) { buf = realloc(buf, size); - buf_size = size; + if (buf) { + buf_size = size; + } else { + PBD::error << "Worker: Error allocating memory" + << endmsg; + buf_size = 0; // TODO: This is probably fatal + } } if (_requests->read((uint8_t*)buf, size) < size) { @@ -108,7 +179,7 @@ Worker::run() continue; // TODO: This is probably fatal } - _workee->work(size, buf); + _workee->work(*this, size, buf); } }