Explicitly set up Grok logger rather than relying on a static variable.
[dcpomatic.git] / src / lib / grok / context.h
1 /*
2     Copyright (C) 2023 Grok Image Compression Inc.
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #pragma once
22
23 #include "../config.h"
24 #include "../dcp_video.h"
25 #include "../film.h"
26 #include "../log.h"
27 #include "../dcpomatic_log.h"
28 #include "../writer.h"
29 #include "messenger.h"
30
31 class Film;
32
33 static std::mutex launchMutex;
34
35 namespace grk_plugin
36 {
37
38 struct GrokLogger : public MessengerLogger {
39         explicit GrokLogger(const std::string &preamble) : MessengerLogger(preamble)
40         {}
41         virtual ~GrokLogger() = default;
42         void info(const char* fmt, ...) override{
43                 va_list arg;
44                 va_start(arg, fmt);
45                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_GENERAL);
46                 va_end(arg);
47         }
48         void warn(const char* fmt, ...) override{
49                 va_list arg;
50                 va_start(arg, fmt);
51                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_WARNING);
52                 va_end(arg);
53         }
54         void error(const char* fmt, ...) override{
55                 va_list arg;
56                 va_start(arg, fmt);
57                 dcpomatic_log->log(preamble_ + log_message(fmt, arg),LogEntry::TYPE_ERROR);
58                 va_end(arg);
59         }
60 };
61
62 struct FrameProxy {
63         FrameProxy(int index, Eyes eyes, DCPVideo dcpv) : index_(index), eyes_(eyes), vf(dcpv)
64         {}
65         int index() const {
66                 return index_;
67         }
68         Eyes eyes(void) const {
69                 return eyes_;
70         }
71         int index_;
72         Eyes eyes_;
73         DCPVideo vf;
74 };
75
76 struct DcpomaticContext {
77         DcpomaticContext(std::shared_ptr<const Film> film, Writer& writer,
78                                                 EventHistory &history, const std::string &location) :
79                                                                         film_(film), writer_(writer),
80                                                                         history_(history), location_(location),
81                                                                         width_(0), height_(0)
82         {}
83         void setDimensions(uint32_t w, uint32_t h) {
84                 width_ = w;
85                 height_ = h;
86         }
87         std::shared_ptr<const Film> film_;
88         Writer& writer_;
89         EventHistory &history_;
90         std::string location_;
91         uint32_t width_;
92         uint32_t height_;
93 };
94
95 class GrokContext {
96 public:
97         explicit GrokContext(const DcpomaticContext &dcpomaticContext) :
98                                                                 dcpomaticContext_(dcpomaticContext),
99                                                                 messenger_(nullptr),
100                                                                 launched_(false)
101         {
102                 struct CompressedData : public dcp::Data {
103                         explicit CompressedData(int dataLen) : data_(new uint8_t[dataLen]), dataLen_(dataLen)
104                         {}
105                         ~CompressedData(void){
106                                 delete[] data_;
107                         }
108                         uint8_t const * data () const override {
109                                 return data_;
110                         }
111                         uint8_t * data () override {
112                                 return data_;
113                         }
114                         int size () const override {
115                                 return dataLen_;
116                         }
117                         uint8_t *data_;
118                         int dataLen_;
119                 };
120                 if (Config::instance()->enable_gpu ())  {
121                     boost::filesystem::path folder(dcpomaticContext_.location_);
122                     boost::filesystem::path binaryPath = folder / "grk_compress";
123                     if (!boost::filesystem::exists(binaryPath)) {
124                         getMessengerLogger()->error("Invalid binary location %s",
125                                         dcpomaticContext_.location_.c_str());
126                                 return;
127                     }
128                         auto proc = [this](const std::string& str) {
129                                 try {
130                                         Msg msg(str);
131                                         auto tag = msg.next();
132                                         if(tag == GRK_MSGR_BATCH_SUBMIT_COMPRESSED)
133                                         {
134                                                 auto clientFrameId = msg.nextUint();
135                                                 auto compressedFrameId = msg.nextUint();
136                                                 (void)compressedFrameId;
137                                                 auto compressedFrameLength = msg.nextUint();
138                                                 auto  processor =
139                                                                 [this](FrameProxy srcFrame, uint8_t* compressed, uint32_t compressedFrameLength)
140                                                 {
141                                                         auto compressedData = std::make_shared<CompressedData>(compressedFrameLength);
142                                                         memcpy(compressedData->data_,compressed,compressedFrameLength );
143                                                         dcpomaticContext_.writer_.write(compressedData, srcFrame.index(), srcFrame.eyes());
144                                                         frame_done ();
145                                                 };
146                                                 int const minimum_size = 16384;
147                                                 bool needsRecompression = compressedFrameLength < minimum_size;
148                                                 messenger_->processCompressed(str, processor, needsRecompression);
149                                                 if (needsRecompression) {
150                                                         auto fp = messenger_->retrieve(clientFrameId);
151                                                         if (!fp) {
152                                                                 return;
153                                                         }
154
155                                                         auto encoded = std::make_shared<dcp::ArrayData>(fp->vf.encode_locally());
156                                                         dcpomaticContext_.writer_.write(encoded, fp->vf.index(), fp->vf.eyes());
157                                                         frame_done ();
158                                                 }
159                                         }
160                                 } catch (std::exception &ex){
161                                         getMessengerLogger()->error("%s",ex.what());
162                                 }
163                         };
164                         auto clientInit =
165                                 MessengerInit(clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch,
166                                                           grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch, proc,
167                                                           std::thread::hardware_concurrency());
168                         messenger_ = new ScheduledMessenger<FrameProxy>(clientInit);
169                 }
170         }
171         ~GrokContext(void) {
172                 shutdown();
173         }
174         bool launch(DCPVideo dcpv, int device){
175                 if (!messenger_ )
176                         return false;
177                 if (launched_)
178                         return true;
179                 std::unique_lock<std::mutex> lk_global(launchMutex);
180                 if (!messenger_)
181                         return false;
182                 if (launched_)
183                         return true;
184                 if (MessengerInit::firstLaunch(true)) {
185                         auto s = dcpv.get_size();
186                         dcpomaticContext_.setDimensions(s.width, s.height);
187                         auto config = Config::instance();
188                         messenger_->launchGrok(dcpomaticContext_.location_,
189                                         dcpomaticContext_.width_,dcpomaticContext_.width_,
190                                         dcpomaticContext_.height_,
191                                         3, 12, device,
192                                         dcpomaticContext_.film_->resolution() == Resolution::FOUR_K,
193                                         dcpomaticContext_.film_->video_frame_rate(),
194                                         dcpomaticContext_.film_->j2k_bandwidth(),
195                                         config->gpu_license_server(),
196                                         config->gpu_license_port(),
197                                         config->gpu_license());
198                 }
199                 launched_ =  messenger_->waitForClientInit();
200
201                 return launched_;
202         }
203         bool scheduleCompress(DCPVideo const& vf){
204                 if (!messenger_)
205                         return false;
206
207                 auto fp = FrameProxy(vf.index(), vf.eyes(), vf);
208                 auto cvt = [this, &fp](BufferSrc src){
209                         // xyz conversion
210                         fp.vf.convert_to_xyz((uint16_t*)src.framePtr_);
211                 };
212                 return messenger_->scheduleCompress(fp, cvt);
213         }
214         void shutdown(void){
215                 if (!messenger_)
216                         return;
217
218                 std::unique_lock<std::mutex> lk_global(launchMutex);
219                 if (!messenger_)
220                         return;
221                 if (launched_)
222                         messenger_->shutdown();
223                 delete messenger_;
224                 messenger_ = nullptr;
225         }
226         void frame_done () {
227                 dcpomaticContext_.history_.event ();
228         }
229 private:
230         DcpomaticContext dcpomaticContext_;
231         ScheduledMessenger<FrameProxy> *messenger_;
232         bool launched_;
233 };
234
235 }
236