2 Copyright (C) 2023 Grok Image Compression Inc.
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
32 #include <condition_variable>
41 #pragma warning(disable : 4100)
46 #include <semaphore.h>
52 static std::string grokToClientMessageBuf = "Global\\grok_to_client_message";
53 static std::string grokSentSynch = "Global\\grok_sent";
54 static std::string clientReceiveReadySynch = "Global\\client_receive_ready";
55 static std::string clientToGrokMessageBuf = "Global\\client_to_grok_message";
56 static std::string clientSentSynch = "Global\\client_sent";
57 static std::string grokReceiveReadySynch = "Global\\grok_receive_ready";
58 static std::string grokUncompressedBuf = "Global\\grok_uncompressed_buf";
59 static std::string grokCompressedBuf = "Global\\grok_compressed_buf";
60 static const std::string GRK_MSGR_BATCH_IMAGE = "GRK_MSGR_BATCH_IMAGE";
61 static const std::string GRK_MSGR_BATCH_COMPRESS_INIT = "GRK_MSGR_BATCH_COMPRESS_INIT";
62 static const std::string GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED = "GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED";
63 static const std::string GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED =
64 "GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED";
65 static const std::string GRK_MSGR_BATCH_SUBMIT_COMPRESSED = "GRK_MSGR_BATCH_SUBMIT_COMPRESSED";
66 static const std::string GRK_MSGR_BATCH_PROCESSSED_COMPRESSED =
67 "GRK_MSGR_BATCH_PROCESSSED_COMPRESSED";
68 static const std::string GRK_MSGR_BATCH_SHUTDOWN = "GRK_MSGR_BATCH_SHUTDOWN";
69 static const std::string GRK_MSGR_BATCH_FLUSH = "GRK_MSGR_BATCH_FLUSH";
70 static const size_t messageBufferLen = 256;
71 struct IMessengerLogger
73 virtual ~IMessengerLogger(void) = default;
74 virtual void info(const char* fmt, ...) = 0;
75 virtual void warn(const char* fmt, ...) = 0;
76 virtual void error(const char* fmt, ...) = 0;
79 template<typename... Args>
80 std::string log_message(char const* const format, Args&... args) noexcept
82 constexpr size_t message_size = 512;
83 char message[message_size];
85 std::snprintf(message, message_size, format, args...);
86 return std::string(message);
89 struct MessengerLogger : public IMessengerLogger
91 explicit MessengerLogger(const std::string &preamble) : preamble_(preamble) {}
92 virtual ~MessengerLogger() = default;
93 virtual void info(const char* fmt, ...) override
96 std::string new_fmt = preamble_ + fmt + "\n";
98 vfprintf(stdout, new_fmt.c_str(), args);
101 virtual void warn(const char* fmt, ...) override
104 std::string new_fmt = preamble_ + fmt + "\n";
106 vfprintf(stdout, new_fmt.c_str(), args);
109 virtual void error(const char* fmt, ...) override
112 std::string new_fmt = preamble_ + fmt + "\n";
114 vfprintf(stderr, new_fmt.c_str(), args);
119 std::string preamble_;
122 static IMessengerLogger* sLogger = nullptr;
123 #if defined(__GNUC__) || defined(__clang__)
124 #pragma GCC diagnostic push
125 #pragma GCC diagnostic ignored "-Wunused-function"
127 static void setMessengerLogger(IMessengerLogger* logger)
132 #if defined(__GNUC__) || defined(__clang__)
133 #pragma GCC diagnostic pop
135 static IMessengerLogger* getMessengerLogger(void)
141 MessengerInit(const std::string &outBuf, const std::string &outSent,
142 const std::string &outReceiveReady, const std::string &inBuf,
143 const std::string &inSent,
144 const std::string &inReceiveReady,
145 std::function<void(std::string)> processor,
146 size_t numProcessingThreads)
147 : outboundMessageBuf(outBuf), outboundSentSynch(outSent),
148 outboundReceiveReadySynch(outReceiveReady), inboundMessageBuf(inBuf),
149 inboundSentSynch(inSent), inboundReceiveReadySynch(inReceiveReady), processor_(processor),
150 numProcessingThreads_(numProcessingThreads),
151 uncompressedFrameSize_(0), compressedFrameSize_(0),
154 if(firstLaunch(true))
160 shm_unlink(grokToClientMessageBuf.c_str());
161 shm_unlink(clientToGrokMessageBuf.c_str());
164 static bool firstLaunch(bool isClient)
166 bool debugGrok = false;
167 return debugGrok != isClient;
169 std::string outboundMessageBuf;
170 std::string outboundSentSynch;
171 std::string outboundReceiveReadySynch;
173 std::string inboundMessageBuf;
174 std::string inboundSentSynch;
175 std::string inboundReceiveReadySynch;
177 std::function<void(std::string)> processor_;
178 size_t numProcessingThreads_;
180 size_t uncompressedFrameSize_;
181 size_t compressedFrameSize_;
185 /*************************** Synchronization *******************************/
192 typedef int grk_handle;
195 Synch(const std::string &sentSemName, const std::string &receiveReadySemName)
196 : sentSemName_(sentSemName), receiveReadySemName_(receiveReadySemName)
198 // unlink semaphores in case of previous crash
199 if(MessengerInit::firstLaunch(true))
206 if(MessengerInit::firstLaunch(true))
209 void post(SynchDirection dir)
211 auto sem = (dir == SYNCH_SENT ? sentSem_ : receiveReadySem_);
212 int rc = sem_post(sem);
214 getMessengerLogger()->error("Error posting to semaphore: %s", strerror(errno));
216 void wait(SynchDirection dir)
218 auto sem = dir == SYNCH_SENT ? sentSem_ : receiveReadySem_;
219 int rc = sem_wait(sem);
221 getMessengerLogger()->error("Error waiting for semaphore: %s", strerror(errno));
225 sentSem_ = sem_open(sentSemName_.c_str(), O_CREAT, 0666, 0);
227 getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
228 receiveReadySem_ = sem_open(receiveReadySemName_.c_str(), O_CREAT, 0666, 1);
229 if(!receiveReadySem_)
230 getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
234 int rc = sem_close(sentSem_);
236 getMessengerLogger()->error("Error closing semaphore %s: %s", sentSemName_.c_str(),
238 rc = sem_close(receiveReadySem_);
240 getMessengerLogger()->error("Error closing semaphore %s: %s",
241 receiveReadySemName_.c_str(), strerror(errno));
245 int rc = sem_unlink(sentSemName_.c_str());
246 if(rc == -1 && errno != ENOENT)
247 getMessengerLogger()->error("Error unlinking semaphore %s: %s", sentSemName_.c_str(),
249 rc = sem_unlink(receiveReadySemName_.c_str());
250 if(rc == -1 && errno != ENOENT)
251 getMessengerLogger()->error("Error unlinking semaphore %s: %s",
252 receiveReadySemName_.c_str(), strerror(errno));
255 sem_t* receiveReadySem_;
258 std::string sentSemName_;
259 std::string receiveReadySemName_;
261 struct SharedMemoryManager
263 static bool initShm(const std::string &name, size_t len, grk_handle* shm_fd, char** buffer)
265 *shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
268 getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
271 int rc = ftruncate(*shm_fd, sizeof(char) * len);
274 getMessengerLogger()->error("Error truncating shared memory: %s", strerror(errno));
277 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
278 rc = shm_unlink(name.c_str());
280 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
283 *buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
286 getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
289 getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
290 rc = shm_unlink(name.c_str());
292 getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
295 return *buffer != nullptr;
297 static bool deinitShm(const std::string &name, size_t len, grk_handle &shm_fd, char** buffer)
299 if (!*buffer || !shm_fd)
302 int rc = munmap(*buffer, len);
305 getMessengerLogger()->error("Error unmapping shared memory %s: %s", name.c_str(), strerror(errno));
309 getMessengerLogger()->error("Error closing shared memory %s: %s", name.c_str(), strerror(errno));
310 rc = shm_unlink(name.c_str());
312 fprintf(stderr,"Error unlinking shared memory %s : %s\n", name.c_str(), strerror(errno));
318 template<typename Data>
319 class MessengerBlockingQueue
322 explicit MessengerBlockingQueue(size_t max) : active_(true), max_size_(max) {}
323 MessengerBlockingQueue() : MessengerBlockingQueue(UINT_MAX) {}
326 return queue_.size();
328 // deactivate and clear queue
332 std::lock_guard<std::mutex> lk(mutex_);
334 while(!queue_.empty())
338 // release all waiting threads
339 can_pop_.notify_all();
340 can_push_.notify_all();
344 std::lock_guard<std::mutex> lk(mutex_);
347 bool push(Data const& value)
351 std::unique_lock<std::mutex> lk(mutex_);
355 can_pop_.notify_one();
359 bool waitAndPush(Data& value)
363 std::unique_lock<std::mutex> lk(mutex_);
366 // in case of spurious wakeup, loop until predicate in lambda
368 can_push_.wait(lk, [this] { return queue_.size() < max_size_ || !active_; });
372 can_pop_.notify_one();
376 bool pop(Data& value)
380 std::unique_lock<std::mutex> lk(mutex_);
384 can_push_.notify_one();
388 bool waitAndPop(Data& value)
392 std::unique_lock<std::mutex> lk(mutex_);
395 // in case of spurious wakeup, loop until predicate in lambda
397 can_pop_.wait(lk, [this] { return !queue_.empty() || !active_; });
401 can_push_.notify_one();
407 bool push_(Data const& value)
409 if(queue_.size() == max_size_ || !active_)
415 bool pop_(Data& value)
417 if(queue_.empty() || !active_)
419 value = queue_.front();
424 std::queue<Data> queue_;
425 mutable std::mutex mutex_;
426 std::condition_variable can_pop_;
427 std::condition_variable can_push_;
433 BufferSrc(void) : BufferSrc("") {}
434 explicit BufferSrc(const std::string &file) : file_(file), clientFrameId_(0), frameId_(0), framePtr_(nullptr)
436 BufferSrc(size_t clientFrameId, size_t frameId, uint8_t* framePtr)
437 : file_(""), clientFrameId_(clientFrameId), frameId_(frameId), framePtr_(framePtr)
441 return !file_.empty() && framePtr_ == nullptr;
445 return clientFrameId_;
448 size_t clientFrameId_;
454 static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch);
455 static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch);
456 static void processorThread(Messenger* messenger, std::function<void(std::string)> processor);
460 explicit Messenger(MessengerInit init)
461 : running(true), initialized_(false), shutdown_(false), init_(init),
462 outboundSynch_(nullptr),
463 inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
464 uncompressed_fd_(0), compressed_fd_(0)
466 virtual ~Messenger(void)
469 sendQueue.deactivate();
470 receiveQueue.deactivate();
472 if (outboundSynch_) {
473 outboundSynch_->post(SYNCH_RECEIVE_READY);
478 inboundSynch_->post(SYNCH_SENT);
482 for(auto& p : processors_)
485 delete outboundSynch_;
486 delete inboundSynch_;
490 void startThreads(void) {
492 new Synch(init_.outboundSentSynch, init_.outboundReceiveReadySynch);
493 outbound = std::thread(outboundThread, this, init_.outboundMessageBuf, outboundSynch_);
496 new Synch(init_.inboundSentSynch, init_.inboundReceiveReadySynch);
497 inbound = std::thread(inboundThread, this, init_.inboundMessageBuf, inboundSynch_);
499 for(size_t i = 0; i < init_.numProcessingThreads_; ++i)
500 processors_.push_back(std::thread(processorThread, this, init_.processor_));
502 size_t serialize(const std::string &dir, size_t clientFrameId, uint8_t* compressedPtr,
503 size_t compressedLength)
506 if(!compressedPtr || !compressedLength)
508 sprintf(fname, "%s/test_%d.j2k", dir.c_str(), (int)clientFrameId);
509 auto fp = fopen(fname, "wb");
512 size_t written = fwrite(compressedPtr, 1, compressedLength, fp);
513 if(written != compressedLength)
523 bool initBuffers(void)
526 if(init_.uncompressedFrameSize_)
528 rc = rc && SharedMemoryManager::initShm(grokUncompressedBuf,
529 init_.uncompressedFrameSize_ * init_.numFrames_,
530 &uncompressed_fd_, &uncompressed_buffer_);
532 if(init_.compressedFrameSize_)
534 rc = rc && SharedMemoryManager::initShm(grokCompressedBuf,
535 init_.compressedFrameSize_ * init_.numFrames_,
536 &compressed_fd_, &compressed_buffer_);
544 bool rc = SharedMemoryManager::deinitShm(grokUncompressedBuf,
545 init_.uncompressedFrameSize_ * init_.numFrames_,
546 uncompressed_fd_, &uncompressed_buffer_);
547 rc = rc && SharedMemoryManager::deinitShm(grokCompressedBuf,
548 init_.compressedFrameSize_ * init_.numFrames_,
549 compressed_fd_, &compressed_buffer_);
553 template<typename... Args>
554 void send(const std::string& str, Args... args)
556 std::ostringstream oss;
558 int dummy[] = {0, ((void)(oss << ',' << args), 0)...};
559 static_cast<void>(dummy);
561 sendQueue.push(oss.str());
563 static pid_t get_pid_by_process_name(const char* name)
566 snprintf(command, sizeof(command), "pgrep %s", name);
567 auto pgrep = popen(command, "r");
571 if(fscanf(pgrep, "%d", &pid) != 1)
577 static bool terminate_process(const char* name)
579 auto pid = get_pid_by_process_name(name);
581 return (pid != -1 && kill(pid, SIGTERM) != -1);
583 static bool kill_process(const char* name)
585 auto pid = get_pid_by_process_name(name);
587 return (pid != -1 && kill(pid, SIGKILL) != -1);
589 void launchGrok(const std::string &dir, uint32_t width, uint32_t stride,
590 uint32_t height, uint32_t samplesPerPixel, uint32_t depth,
591 int device, bool is4K, uint32_t fps, uint32_t bandwidth,
592 const std::string server, uint32_t port,
593 const std::string license)
596 std::unique_lock<std::mutex> lk(shutdownMutex_);
597 if (async_result_.valid())
599 if(MessengerInit::firstLaunch(true))
603 auto fullServer = server + ":" + std::to_string(port);
605 "./grk_compress -batch_src %s,%d,%d,%d,%d,%d -out_fmt j2k -out_dir - -k 1 "
606 "-G %d -%s %d,%d -j %s -J %s",
607 GRK_MSGR_BATCH_IMAGE.c_str(), width, stride, height, samplesPerPixel, depth,
608 device, is4K ? "cinema4K" : "cinema2K", fps, bandwidth,
609 license.c_str(), fullServer.c_str());
612 void initClient(size_t uncompressedFrameSize, size_t compressedFrameSize, size_t numFrames)
614 // client fills queue with pending uncompressed buffers
615 init_.uncompressedFrameSize_ = uncompressedFrameSize;
616 init_.compressedFrameSize_ = compressedFrameSize;
617 init_.numFrames_ = numFrames;
619 auto ptr = uncompressed_buffer_;
620 for(size_t i = 0; i < init_.numFrames_; ++i)
622 availableBuffers_.push(BufferSrc(0, i, (uint8_t*)ptr));
623 ptr += init_.uncompressedFrameSize_;
626 std::unique_lock<std::mutex> lk(shutdownMutex_);
628 clientInitializedCondition_.notify_all();
630 bool waitForClientInit(void)
635 std::unique_lock<std::mutex> lk(shutdownMutex_);
640 clientInitializedCondition_.wait(lk, [this]{return initialized_ || shutdown_;});
642 return initialized_ && !shutdown_;
644 static size_t uncompressedFrameSize(uint32_t w, uint32_t h, uint32_t samplesPerPixel)
646 return sizeof(uint16_t) * w * h * samplesPerPixel;
648 void reclaimCompressed(size_t frameId)
650 availableBuffers_.push(BufferSrc(0, frameId, getCompressedFrame(frameId)));
652 void reclaimUncompressed(size_t frameId)
654 availableBuffers_.push(BufferSrc(0, frameId, getUncompressedFrame(frameId)));
656 uint8_t* getUncompressedFrame(size_t frameId)
658 assert(frameId < init_.numFrames_);
659 if(frameId >= init_.numFrames_)
662 return (uint8_t*)(uncompressed_buffer_ + frameId * init_.uncompressedFrameSize_);
664 uint8_t* getCompressedFrame(size_t frameId)
666 assert(frameId < init_.numFrames_);
667 if(frameId >= init_.numFrames_)
670 return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
672 std::atomic_bool running;
675 MessengerBlockingQueue<std::string> sendQueue;
676 MessengerBlockingQueue<std::string> receiveQueue;
677 MessengerBlockingQueue<BufferSrc> availableBuffers_;
680 std::future<int> async_result_;
681 std::mutex shutdownMutex_;
682 std::condition_variable shutdownCondition_;
685 std::condition_variable clientInitializedCondition_;
687 void launch(const std::string &cmd, const std::string &dir)
689 // Change the working directory
692 if(chdir(dir.c_str()) != 0)
694 getMessengerLogger()->error("Error: failed to change the working directory");
698 // Execute the command using std::async and std::system
700 async_result_ = std::async(std::launch::async, [this]() { return std::system(cmd_.c_str()); });
702 std::thread outbound;
703 Synch* outboundSynch_;
706 Synch* inboundSynch_;
708 std::vector<std::thread> processors_;
709 char* uncompressed_buffer_;
710 char* compressed_buffer_;
712 grk_handle uncompressed_fd_;
713 grk_handle compressed_fd_;
716 static void outboundThread(Messenger* messenger, const std::string &sendBuf, Synch* synch)
718 grk_handle shm_fd = 0;
719 char* send_buffer = nullptr;
721 if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
723 while(messenger->running)
725 synch->wait(SYNCH_RECEIVE_READY);
726 if(!messenger->running)
729 if(!messenger->sendQueue.waitAndPop(message))
731 if(!messenger->running)
733 memcpy(send_buffer, message.c_str(), message.size() + 1);
734 synch->post(SYNCH_SENT);
736 SharedMemoryManager::deinitShm(sendBuf, messageBufferLen, shm_fd, &send_buffer);
739 static void inboundThread(Messenger* messenger, const std::string &receiveBuf, Synch* synch)
741 grk_handle shm_fd = 0;
742 char* receive_buffer = nullptr;
744 if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
746 while(messenger->running)
748 synch->wait(SYNCH_SENT);
749 if(!messenger->running)
751 auto message = std::string(receive_buffer);
752 synch->post(SYNCH_RECEIVE_READY);
753 messenger->receiveQueue.push(message);
755 SharedMemoryManager::deinitShm(receiveBuf, messageBufferLen, shm_fd, &receive_buffer);
759 explicit Msg(const std::string &msg) : ct_(0)
761 std::stringstream ss(msg);
765 std::getline(ss, substr, ',');
766 cs_.push_back(substr);
771 if(ct_ == cs_.size())
773 getMessengerLogger()->error("Msg: comma separated list exhausted. returning empty.");
779 uint32_t nextUint(void)
781 return (uint32_t)std::stoi(next());
784 std::vector<std::string> cs_;
787 static void processorThread(Messenger* messenger, std::function<void(std::string)> processor)
789 while(messenger->running)
792 if(!messenger->receiveQueue.waitAndPop(message))
794 if(!messenger->running)
797 auto tag = msg.next();
798 if(tag == GRK_MSGR_BATCH_COMPRESS_INIT)
800 auto width = msg.nextUint();
801 auto stride = msg.nextUint();
803 auto height = msg.nextUint();
804 auto samplesPerPixel = msg.nextUint();
805 auto depth = msg.nextUint();
807 messenger->init_.uncompressedFrameSize_ =
808 Messenger::uncompressedFrameSize(width, height, samplesPerPixel);
809 auto compressedFrameSize = msg.nextUint();
810 auto numFrames = msg.nextUint();
811 messenger->initClient(compressedFrameSize, compressedFrameSize, numFrames);
813 else if(tag == GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED)
815 messenger->reclaimUncompressed(msg.nextUint());
817 else if(tag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
819 messenger->reclaimCompressed(msg.nextUint());
826 struct ScheduledFrames
830 std::unique_lock<std::mutex> lk(mapMutex_);
831 auto it = map_.find(val.index());
832 if (it == map_.end())
833 map_[val.index()] = val;
835 F retrieve(size_t index, bool &success)
837 std::unique_lock<std::mutex> lk(mapMutex_);
839 auto it = map_.find(index);
851 std::mutex mapMutex_;
852 std::map<size_t, F> map_;
856 struct ScheduledMessenger : public Messenger
858 explicit ScheduledMessenger(MessengerInit init) : Messenger(init),
862 ~ScheduledMessenger(void) {
865 bool scheduleCompress(F proxy, std::function<void(BufferSrc)> converter){
866 size_t frameSize = init_.uncompressedFrameSize_;
867 assert(frameSize >= init_.uncompressedFrameSize_);
869 if(!availableBuffers_.waitAndPop(src))
872 scheduledFrames_.store(proxy);
874 send(GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED, proxy.index(), src.frameId_);
878 void processCompressed(const std::string &message, std::function<void(F,uint8_t*,uint32_t)> processor, bool needsRecompression) {
881 auto clientFrameId = msg.nextUint();
882 auto compressedFrameId = msg.nextUint();
883 auto compressedFrameLength = msg.nextUint();
884 if (!needsRecompression) {
885 bool success = false;
886 auto srcFrame = scheduledFrames_.retrieve(clientFrameId,success);
889 processor(srcFrame, getCompressedFrame(compressedFrameId),compressedFrameLength);
892 send(GRK_MSGR_BATCH_PROCESSSED_COMPRESSED, compressedFrameId);
893 if (shutdown_ && framesCompressed_ == framesScheduled_)
894 shutdownCondition_.notify_all();
898 std::unique_lock<std::mutex> lk(shutdownMutex_);
899 if (!async_result_.valid())
902 if (framesScheduled_) {
903 uint32_t scheduled = framesScheduled_;
904 send(GRK_MSGR_BATCH_FLUSH, scheduled);
905 shutdownCondition_.wait(lk, [this] { return framesScheduled_ == framesCompressed_; });
907 availableBuffers_.deactivate();
908 send(GRK_MSGR_BATCH_SHUTDOWN);
909 int result = async_result_.get();
911 getMessengerLogger()->error("Accelerator failed with return code: %d\n",result);
912 } catch (std::exception &ex) {
913 getMessengerLogger()->error("%s",ex.what());
917 F retrieve(size_t index, bool &success) {
918 return scheduledFrames_.retrieve(index, success);
921 scheduledFrames_.store(val);
925 ScheduledFrames<F> scheduledFrames_;
926 std::atomic<uint32_t> framesScheduled_;
927 std::atomic<uint32_t> framesCompressed_;
930 } // namespace grk_plugin