Cleanup: remove unnecessary forward declaration.
[dcpomatic.git] / src / lib / grok / context.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #pragma once
22
23 #include "../config.h"
24 #include "../dcp_video.h"
25 #include "../film.h"
26 #include "../log.h"
27 #include "../dcpomatic_log.h"
28 #include "../writer.h"
29 #include "messenger.h"
30
31
32 static std::mutex launchMutex;
33
34 namespace grk_plugin
35 {
36
37 struct GrokLogger : public MessengerLogger {
38         explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
39         {}
40         virtual ~GrokLogger() = default;
41         void info(const char* fmt, ...) override{
42                 va_list arg;
43                 va_start(arg, fmt);
44                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
45                 va_end(arg);
46         }
47         void warn(const char* fmt, ...) override{
48                 va_list arg;
49                 va_start(arg, fmt);
50                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
51                 va_end(arg);
52         }
53         void error(const char* fmt, ...) override{
54                 va_list arg;
55                 va_start(arg, fmt);
56                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
57                 va_end(arg);
58         }
59 };
60
61 struct FrameProxy {
62         FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
63         {}
64         int index() const {
65                 return index_;
66         }
67         Eyes eyes(void) const {
68                 return eyes_;
69         }
70         int index_;
71         Eyes eyes_;
72         DCPVideo vf;
73 };
74
75 struct DcpomaticContext {
76         DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
77                                                 EventHistory &history, const std::string &location) :
78                                                                         film_(film), writer_(writer),
79                                                                         history_(history), location_(location),
80                                                                         width_(0), height_(0)
81         {}
82         void setDimensions(uint32_t w, uint32_t h) {
83                 width_ = w;
84                 height_ = h;
85         }
86         std::shared_ptr<const Film> film_;
87         Writer& writer_;
88         EventHistory &history_;
89         std::string location_;
90         uint32_t width_;
91         uint32_t height_;
92 };
93
94 class GrokContext {
95 public:
96         explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
97                                                                 dcpomaticContext_(dcpomaticContext),
98                                                                 messenger_(nullptr),
99                                                                 launched_(false)
100         {
101                 struct CompressedData : public dcp::Data {
102                         explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
103                         {}
104                         ~CompressedData(void){
105                                 delete[] data_;
106                         }
107                         uint8_t const * data () const override {
108                                 return data_;
109                         }
110                         uint8_t * data () override {
111                                 return data_;
112                         }
113                         int size () const override {
114                                 return dataLen_;
115                         }
116                         uint8_t *data_;
117                         int dataLen_;
118                 };
119                 if (Config::instance()->enable_gpu ())  {
120                     boost::filesystem::path folder(dcpomaticContext_.location_);
121                     boost::filesystem::path binaryPath = folder / "grk_compress";
122                     if (!boost::filesystem::exists(binaryPath)) {
123                         getMessengerLogger()->error("Invalid binary location %s",
124                                         dcpomaticContext_.location_.c_str());
125                                 return;
126                     }
127                         auto proc = [this](const std::string& str) {
128                                 try {
129                                         Msg msg(str);
130                                         auto tag = msg.next();
131                                         if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
132                                         {
133                                                 auto clientFrameId = msg.nextUint();
134                                                 auto compressedFrameId = msg.nextUint();
135                                                 (void)compressedFrameId;
136                                                 auto compressedFrameLength = msg.nextUint();
137                                                 auto  processor =
138                                                                 [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
139                                                 {
140                                                         auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
141                                                         memcpy(compressedData->data_,compressed,compressedFrameLength );
142                                                         dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
143                                                         frame_done ();
144                                                 };
145                                                 int const minimum_size = 16384;
146                                                 bool needsRecompression = compressedFrameLength < minimum_size;
147                                                 messenger_->processCompressed(str, processor, needsRecompression);
148                                                 if (needsRecompression) {
149                                                         auto fp = messenger_->retrieve(clientFrameId);
150                                                         if (!fp) {
151                                                                 return;
152                                                         }
153
154                                                         auto encoded = std::make_shared<dcp::ArrayData>(fp->vf.encode_locally());
155                                                         dcpomaticContext_.writer_.write(encoded, fp->vf.index(), fp->vf.eyes());
156                                                         frame_done ();
157                                                 }
158                                         }
159                                 } catch (std::exception &ex){
160                                         getMessengerLogger()->error("%s",ex.what());
161                                 }
162                         };
163                         auto clientInit =
164                                 MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
165                                                           grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
166                                                           std::thread::hardware_concurrency());
167                         messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
168                 }
169         }
170         ~GrokContext(void) {
171                 shutdown();
172         }
173         bool launch(DCPVideo dcpv, int device){
174                 if (!messenger_ )
175                         return false;
176                 if (launched_)
177                         return true;
178                 std::unique_lock<std::mutex> lk_global(launchMutex);
179                 if (!messenger_)
180                         return false;
181                 if (launched_)
182                         return true;
183                 if (MessengerInit::firstLaunch(true)) {
184                         auto s = dcpv.get_size();
185                         dcpomaticContext_.setDimensions(s.width, s.height);
186                         auto config = Config::instance();
187                         messenger_->launchGrok(dcpomaticContext_.location_,
188                                         dcpomaticContext_.width_,dcpomaticContext_.width_,
189                                         dcpomaticContext_.height_,
190                                         3, 12, device,
191                                         dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
192                                         dcpomaticContext_.film_->video_frame_rate(),
193                                         dcpomaticContext_.film_->j2k_bandwidth(),
194                                         config->gpu_license_server(),
195                                         config->gpu_license_port(),
196                                         config->gpu_license());
197                 }
198                 launched_ =  messenger_->waitForClientInit();
199
200                 return launched_;
201         }
202         bool scheduleCompress(DCPVideo const& vf){
203                 if (!messenger_)
204                         return false;
205
206                 auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
207                 auto cvt = [this, &fp](BufferSrc src){
208                         // xyz conversion
209                         fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
210                 };
211                 return messenger_->scheduleCompress(fp, cvt);
212         }
213         void shutdown(void){
214                 if (!messenger_)
215                         return;
216
217                 std::unique_lock<std::mutex> lk_global(launchMutex);
218                 if (!messenger_)
219                         return;
220                 if (launched_)
221                         messenger_->shutdown();
222                 delete messenger_;
223                 messenger_ = nullptr;
224         }
225         void frame_done () {
226                 dcpomaticContext_.history_.event ();
227         }
228 private:
229         DcpomaticContext dcpomaticContext_;
230         ScheduledMessenger<FrameProxy> *messenger_;
231         bool launched_;
232 };
233
234 }
235