2 Copyright (C) 2023 Grok Image Compression Inc.
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
32 #include <condition_variable>
41 #pragma warning(disable : 4100)
46 #include <semaphore.h>
52 static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
53 static std::string grokSentSynch = "Global\\grok_sent";
54 static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
55 static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
56 static std::string clientSentSynch = "Global\\client_sent";
57 static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
58 static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
59 static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
60 static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
61 static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
62 static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
63 static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
64 "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
65 static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
66 static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
67 "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
68 static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
69 static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
70 static const size_t messageBufferLen = 256;
71 struct IMessengerLogger
73 virtual ~IMessengerLogger(void) = default;
74 virtual void info(const char* fmt, ...) = 0;
75 virtual void warn(const char* fmt, ...) = 0;
76 virtual void error(const char* fmt, ...) = 0;
79 template<typename... Args>
80 std::string log_message(char const* const format, Args&... args) noexcept
82 constexpr size_t message_size = 512;
83 char message[message_size];
85 std::snprintf(message, message_size, format, args...);
86 return std::string(message);
89 struct MessengerLogger : public IMessengerLogger
91 explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
92 virtual ~MessengerLogger() = default;
93 virtual void info(const char* fmt, ...) override
96 std::string new_fmt = preamble_ + fmt + "\n";
98 vfprintf(stdout, new_fmt.c_str(), args);
101 virtual void warn(const char* fmt, ...) override
104 std::string new_fmt = preamble_ + fmt + "\n";
106 vfprintf(stdout, new_fmt.c_str(), args);
109 virtual void error(const char* fmt, ...) override
112 std::string new_fmt = preamble_ + fmt + "\n";
114 vfprintf(stderr, new_fmt.c_str(), args);
119 std::string preamble_;
122 static IMessengerLogger* sLogger = nullptr;
123 #if defined(__GNUC__) || defined(__clang__)
124 #pragma GCC diagnostic push
125 #pragma GCC diagnostic ignored "-Wunused-function"
127 static void setMessengerLogger(IMessengerLogger* logger)
132 #if defined(__GNUC__) || defined(__clang__)
133 #pragma GCC diagnostic pop
135 static IMessengerLogger* getMessengerLogger(void)
141 MessengerInit(const std::string &outBuf, const std::string &outSent,
142 const std::string &outReceiveReady, const std::string &inBuf,
143 const std::string &inSent,
144 const std::string &inReceiveReady,
145 std::function<void(std::string)> processor,
146 size_t numProcessingThreads)
147 : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
148 outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
149 inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
150 numProcessingThreads_(numProcessingThreads),
151 uncompressedFrameSize_(0), compressedFrameSize_(0),
154 if(firstLaunch(true))
160 shm_unlink(grokToClientMessageBuf.c_str());
161 shm_unlink(clientToGrokMessageBuf.c_str());
164 static bool firstLaunch(bool isClient)
166 bool debugGrok = false;
167 return debugGrok != isClient;
169 std::string outboundMessageBuf;
170 std::string outboundSentSynch;
171 std::string outboundReceiveReadySynch;
173 std::string inboundMessageBuf;
174 std::string inboundSentSynch;
175 std::string inboundReceiveReadySynch;
177 std::function<void(std::string)> processor_;
178 size_t numProcessingThreads_;
180 size_t uncompressedFrameSize_;
181 size_t compressedFrameSize_;
185 /*************************** Synchronization *******************************/
192 typedef int grk_handle;
195 Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
196 : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
198 // unlink semaphores in case of previous crash
199 if(MessengerInit::firstLaunch(true))
206 if(MessengerInit::firstLaunch(true))
209 void post(SynchDirection dir)
211 auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
212 int rc = sem_post(sem);
214 getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
216 void wait(SynchDirection dir)
218 auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
219 int rc = sem_wait(sem);
221 getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
225 sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
227 getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
228 receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
229 if(!receiveReadySem_)
230 getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
234 int rc = sem_close(sentSem_);
236 getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
238 rc = sem_close(receiveReadySem_);
240 getMessengerLogger()->error("Error closing semaphore %s: %s",
241 receiveReadySemName_.c_str(), strerror(errno));
245 int rc = sem_unlink(sentSemName_.c_str());
246 if(rc == -1 && errno != ENOENT)
247 getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
249 rc = sem_unlink(receiveReadySemName_.c_str());
250 if(rc == -1 && errno != ENOENT)
251 getMessengerLogger()->error("Error unlinking semaphore %s: %s",
252 receiveReadySemName_.c_str(), strerror(errno));
255 sem_t* receiveReadySem_;
258 std::string sentSemName_;
259 std::string receiveReadySemName_;
261 struct SharedMemoryManager
263 static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
265 *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
268 getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
271 int rc = ftruncate(*shm_fd, sizeof(char) * len);
274 getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
277 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
278 rc = shm_unlink(name.c_str());
279 // 2 == No such file or directory
281 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
284 *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
287 getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
290 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
291 rc = shm_unlink(name.c_str());
292 // 2 == No such file or directory
294 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
297 return *buffer != nullptr;
299 static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
301 if (!*buffer || !shm_fd)
304 int rc = munmap(*buffer, len);
307 getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
311 getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
312 rc = shm_unlink(name.c_str());
313 // 2 == No such file or directory
315 fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
321 template<typename Data>
322 class MessengerBlockingQueue
325 explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
326 MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
329 return queue_.size();
331 // deactivate and clear queue
335 std::lock_guard<std::mutex> lk(mutex_);
337 while(!queue_.empty())
341 // release all waiting threads
342 can_pop_.notify_all();
343 can_push_.notify_all();
347 std::lock_guard<std::mutex> lk(mutex_);
350 bool push(Data const& value)
354 std::unique_lock<std::mutex> lk(mutex_);
358 can_pop_.notify_one();
362 bool waitAndPush(Data& value)
366 std::unique_lock<std::mutex> lk(mutex_);
369 // in case of spurious wakeup, loop until predicate in lambda
371 can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
375 can_pop_.notify_one();
379 bool pop(Data& value)
383 std::unique_lock<std::mutex> lk(mutex_);
387 can_push_.notify_one();
391 bool waitAndPop(Data& value)
395 std::unique_lock<std::mutex> lk(mutex_);
398 // in case of spurious wakeup, loop until predicate in lambda
400 can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
404 can_push_.notify_one();
410 bool push_(Data const& value)
412 if(queue_.size() == max_size_ || !active_)
418 bool pop_(Data& value)
420 if(queue_.empty() || !active_)
422 value = queue_.front();
427 std::queue<Data> queue_;
428 mutable std::mutex mutex_;
429 std::condition_variable can_pop_;
430 std::condition_variable can_push_;
436 BufferSrc(void) : BufferSrc("") {}
437 explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
439 BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
440 : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
444 return !file_.empty() && framePtr_ == nullptr;
448 return clientFrameId_;
451 size_t clientFrameId_;
457 static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
458 static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
459 static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
463 explicit Messenger(MessengerInit init)
464 : running(true), initialized_(false), shutdown_(false), init_(init),
465 outboundSynch_(nullptr),
466 inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
467 uncompressed_fd_(0), compressed_fd_(0)
469 virtual ~Messenger(void)
472 sendQueue.deactivate();
473 receiveQueue.deactivate();
475 if (outboundSynch_) {
476 outboundSynch_->post(SYNCH_RECEIVE_READY);
481 inboundSynch_->post(SYNCH_SENT);
485 for(auto& p : processors_)
488 delete outboundSynch_;
489 delete inboundSynch_;
493 void startThreads(void) {
495 new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
496 outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
499 new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
500 inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
502 for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
503 processors_.push_back(std::thread(processorThread, this, init_.processor_));
505 bool initBuffers(void)
508 if(init_.uncompressedFrameSize_)
510 rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
511 init_.uncompressedFrameSize_ * init_.numFrames_,
512 &uncompressed_fd_, &uncompressed_buffer_);
514 if(init_.compressedFrameSize_)
516 rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
517 init_.compressedFrameSize_ * init_.numFrames_,
518 &compressed_fd_, &compressed_buffer_);
526 bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
527 init_.uncompressedFrameSize_ * init_.numFrames_,
528 uncompressed_fd_, &uncompressed_buffer_);
529 rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
530 init_.compressedFrameSize_ * init_.numFrames_,
531 compressed_fd_, &compressed_buffer_);
535 template<typename... Args>
536 void send(const std::string& str, Args... args)
538 std::ostringstream oss;
540 int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
541 static_cast<void>(dummy);
543 sendQueue.push(oss.str());
547 boost::filesystem::path const& dir,
551 uint32_t samplesPerPixel,
557 const std::string server,
559 const std::string license
563 std::unique_lock<std::mutex> lk(shutdownMutex_);
564 if (async_result_.valid())
566 if(MessengerInit::firstLaunch(true))
570 auto fullServer = server + ":" + std::to_string(port);
572 "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -k 1 "
573 "-G %d -%s %d,%d -j %s -J %s",
574 GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
575 device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
576 license.c_str(), fullServer.c_str());
579 void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
581 // client fills queue with pending uncompressed buffers
582 init_.uncompressedFrameSize_ = uncompressedFrameSize;
583 init_.compressedFrameSize_ = compressedFrameSize;
584 init_.numFrames_ = numFrames;
586 auto ptr = uncompressed_buffer_;
587 for(size_t i = 0; i < init_.numFrames_; ++i)
589 availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
590 ptr += init_.uncompressedFrameSize_;
593 std::unique_lock<std::mutex> lk(shutdownMutex_);
595 clientInitializedCondition_.notify_all();
597 bool waitForClientInit(void)
602 std::unique_lock<std::mutex> lk(shutdownMutex_);
607 clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
609 return initialized_ && !shutdown_;
611 static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
613 return sizeof(uint16_t) * w * h * samplesPerPixel;
615 void reclaimCompressed(size_t frameId)
617 availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
619 void reclaimUncompressed(size_t frameId)
621 availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
623 uint8_t* getUncompressedFrame(size_t frameId)
625 assert(frameId < init_.numFrames_);
626 if(frameId >= init_.numFrames_)
629 return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
631 uint8_t* getCompressedFrame(size_t frameId)
633 assert(frameId < init_.numFrames_);
634 if(frameId >= init_.numFrames_)
637 return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
639 std::atomic_bool running;
642 MessengerBlockingQueue<std::string> sendQueue;
643 MessengerBlockingQueue<std::string> receiveQueue;
644 MessengerBlockingQueue<BufferSrc> availableBuffers_;
647 std::future<int> async_result_;
648 std::mutex shutdownMutex_;
649 std::condition_variable shutdownCondition_;
652 std::condition_variable clientInitializedCondition_;
654 void launch(std::string const& cmd, boost::filesystem::path const& dir)
656 // Change the working directory
659 boost::system::error_code ec;
660 boost::filesystem::current_path(dir, ec);
662 getMessengerLogger()->error("Error: failed to change the working directory");
666 // Execute the command using std::async and std::system
668 async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
670 std::thread outbound;
671 Synch* outboundSynch_;
674 Synch* inboundSynch_;
676 std::vector<std::thread> processors_;
677 char* uncompressed_buffer_;
678 char* compressed_buffer_;
680 grk_handle uncompressed_fd_;
681 grk_handle compressed_fd_;
684 static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
686 grk_handle shm_fd = 0;
687 char* send_buffer = nullptr;
689 if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
691 while(messenger->running)
693 synch->wait(SYNCH_RECEIVE_READY);
694 if(!messenger->running)
697 if(!messenger->sendQueue.waitAndPop(message))
699 if(!messenger->running)
701 memcpy(send_buffer, message.c_str(), message.size() + 1);
702 synch->post(SYNCH_SENT);
704 SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
707 static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
709 grk_handle shm_fd = 0;
710 char* receive_buffer = nullptr;
712 if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
714 while(messenger->running)
716 synch->wait(SYNCH_SENT);
717 if(!messenger->running)
719 auto message = std::string(receive_buffer);
720 synch->post(SYNCH_RECEIVE_READY);
721 messenger->receiveQueue.push(message);
723 SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
727 explicit Msg(const std::string &msg) : ct_(0)
729 std::stringstream ss(msg);
733 std::getline(ss, substr, ',');
734 cs_.push_back(substr);
739 if(ct_ == cs_.size())
741 getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
747 uint32_t nextUint(void)
749 return (uint32_t)std::stoi(next());
752 std::vector<std::string> cs_;
755 static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
757 while(messenger->running)
760 if(!messenger->receiveQueue.waitAndPop(message))
762 if(!messenger->running)
765 auto tag = msg.next();
766 if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
768 auto width = msg.nextUint();
769 auto stride = msg.nextUint();
771 auto height = msg.nextUint();
772 auto samplesPerPixel = msg.nextUint();
773 auto depth = msg.nextUint();
775 messenger->init_.uncompressedFrameSize_ =
776 Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
777 auto compressedFrameSize = msg.nextUint();
778 auto numFrames = msg.nextUint();
779 messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
781 else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
783 messenger->reclaimUncompressed(msg.nextUint());
785 else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
787 messenger->reclaimCompressed(msg.nextUint());
794 struct ScheduledFrames
796 void store(F const& val)
798 std::unique_lock<std::mutex> lk(mapMutex_);
799 auto it = map_.find(val.index());
800 if (it == map_.end())
801 map_.emplace(std::make_pair(val.index(), val));
803 boost::optional<F> retrieve(size_t index)
805 std::unique_lock<std::mutex> lk(mapMutex_);
806 auto it = map_.find(index);
817 std::mutex mapMutex_;
818 std::map<size_t, F> map_;
822 struct ScheduledMessenger : public Messenger
824 explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
828 ~ScheduledMessenger(void) {
831 bool scheduleCompress(F const& proxy, std::function<void(BufferSrc const&)> converter){
832 size_t frameSize = init_.uncompressedFrameSize_;
833 assert(frameSize >= init_.uncompressedFrameSize_);
835 if(!availableBuffers_.waitAndPop(src))
838 scheduledFrames_.store(proxy);
840 send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
844 void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
847 auto clientFrameId = msg.nextUint();
848 auto compressedFrameId = msg.nextUint();
849 auto compressedFrameLength = msg.nextUint();
850 if (!needsRecompression) {
851 auto src_frame = scheduledFrames_.retrieve(clientFrameId);
855 processor(*src_frame, getCompressedFrame(compressedFrameId),compressedFrameLength);
858 send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
859 if (shutdown_ && framesCompressed_ == framesScheduled_)
860 shutdownCondition_.notify_all();
864 std::unique_lock<std::mutex> lk(shutdownMutex_);
865 if (!async_result_.valid())
868 if (framesScheduled_) {
869 uint32_t scheduled = framesScheduled_;
870 send(GRK_MSGR_BATCH_FLUSH, scheduled);
871 shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
873 availableBuffers_.deactivate();
874 send(GRK_MSGR_BATCH_SHUTDOWN);
875 int result = async_result_.get();
877 getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
878 } catch (std::exception &ex) {
879 getMessengerLogger()->error("%s",ex.what());
884 boost::optional<F> retrieve(size_t index) {
885 return scheduledFrames_.retrieve(index);
889 scheduledFrames_.store(val);
893 ScheduledFrames<F> scheduledFrames_;
894 std::atomic<uint32_t> framesScheduled_;
895 std::atomic<uint32_t> framesCompressed_;
898 } // namespace grk_plugin