Move things round a bit.
[dcpomatic.git] / src / tools / servomatic.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 #include <iostream>
21 #include <stdexcept>
22 #include <sstream>
23 #include <cstring>
24 #include <vector>
25 #include <unistd.h>
26 #include <errno.h>
27 #include <sys/types.h> 
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <boost/algorithm/string.hpp>
31 #include <boost/thread.hpp>
32 #include <boost/thread/mutex.hpp>
33 #include <boost/thread/condition.hpp>
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "exceptions.h"
37 #include "util.h"
38 #include "config.h"
39 #include "scaler.h"
40 #include "image.h"
41 #include "log.h"
42
43 #define BACKLOG 8
44
45 using namespace std;
46 using namespace boost;
47
48 static vector<thread *> worker_threads;
49
50 static std::list<int> queue;
51 static mutex worker_mutex;
52 static condition worker_condition;
53 static Log log_ ("servomatic.log");
54
55 int
56 process (int fd)
57 {
58         SocketReader reader (fd);
59         
60         char buffer[128];
61         reader.read_indefinite ((uint8_t *) buffer, sizeof (buffer));
62         reader.consume (strlen (buffer) + 1);
63         
64         stringstream s (buffer);
65         
66         string command;
67         s >> command;
68         if (command != "encode") {
69                 close (fd);
70                 return -1;
71         }
72         
73         Size in_size;
74         int pixel_format_int;
75         Size out_size;
76         int padding;
77         string scaler_id;
78         int frame;
79         float frames_per_second;
80         string post_process;
81         int colour_lut_index;
82         int j2k_bandwidth;
83         
84         s >> in_size.width >> in_size.height
85           >> pixel_format_int
86           >> out_size.width >> out_size.height
87           >> padding
88           >> scaler_id
89           >> frame
90           >> frames_per_second
91           >> post_process
92           >> colour_lut_index
93           >> j2k_bandwidth;
94         
95         PixelFormat pixel_format = (PixelFormat) pixel_format_int;
96         Scaler const * scaler = Scaler::from_id (scaler_id);
97         if (post_process == "none") {
98                 post_process = "";
99         }
100         
101         shared_ptr<SimpleImage> image (new SimpleImage (pixel_format, in_size));
102         
103         for (int i = 0; i < image->components(); ++i) {
104                 int line_size;
105                 s >> line_size;
106                 image->set_line_size (i, line_size);
107         }
108         
109         for (int i = 0; i < image->components(); ++i) {
110                 reader.read_definite_and_consume (image->data()[i], image->line_size()[i] * image->lines(i));
111         }
112         
113 #ifdef DEBUG_HASH
114         image->hash ("Image for encoding (as received by server)");
115 #endif          
116         
117         DCPVideoFrame dcp_video_frame (image, out_size, padding, scaler, frame, frames_per_second, post_process, colour_lut_index, j2k_bandwidth, &log_);
118         shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
119         encoded->send (fd);
120
121 #ifdef DEBUG_HASH
122         encoded->hash ("Encoded image (as made by server and as sent back)");
123 #endif          
124
125         
126         return frame;
127 }
128
129 void
130 worker_thread ()
131 {
132         while (1) {
133                 mutex::scoped_lock lock (worker_mutex);
134                 while (queue.empty ()) {
135                         worker_condition.wait (lock);
136                 }
137
138                 int fd = queue.front ();
139                 queue.pop_front ();
140                 
141                 lock.unlock ();
142
143                 int frame = -1;
144
145                 struct timeval start;
146                 gettimeofday (&start, 0);
147                 
148                 try {
149                         frame = process (fd);
150                 } catch (std::exception& e) {
151                         cerr << "Error: " << e.what() << "\n";
152                 }
153                 
154                 close (fd);
155                 
156                 lock.lock ();
157
158                 if (frame >= 0) {
159                         struct timeval end;
160                         gettimeofday (&end, 0);
161                         cout << "Encoded frame " << frame << " in " << (seconds (end) - seconds (start)) << "\n";
162                 }
163                 
164                 worker_condition.notify_all ();
165         }
166 }
167
168 int
169 main ()
170 {
171         Scaler::setup_scalers ();
172
173         int const num_threads = Config::instance()->num_local_encoding_threads ();
174         
175         for (int i = 0; i < num_threads; ++i) {
176                 worker_threads.push_back (new thread (worker_thread));
177         }
178         
179         int fd = socket (AF_INET, SOCK_STREAM, 0);
180         if (fd < 0) {
181                 throw NetworkError ("could not open socket");
182         }
183
184         int const o = 1;
185         setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &o, sizeof (o));
186
187         struct timeval tv;
188         tv.tv_sec = 20;
189         tv.tv_usec = 0;
190         setsockopt (fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof (tv));
191         setsockopt (fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof (tv));
192
193         struct sockaddr_in server_address;
194         memset (&server_address, 0, sizeof (server_address));
195         server_address.sin_family = AF_INET;
196         server_address.sin_addr.s_addr = INADDR_ANY;
197         server_address.sin_port = htons (Config::instance()->server_port ());
198         if (::bind (fd, (struct sockaddr *) &server_address, sizeof (server_address)) < 0) {
199                 stringstream s;
200                 s << "could not bind to port " << Config::instance()->server_port() << " (" << strerror (errno) << ")";
201                 throw NetworkError (s.str());
202         }
203
204         listen (fd, BACKLOG);
205
206         while (1) {
207                 struct sockaddr_in client_address;
208                 socklen_t client_length = sizeof (client_address);
209                 int new_fd = accept (fd, (struct sockaddr *) &client_address, &client_length);
210                 if (new_fd < 0) {
211                         if (errno != EAGAIN && errno != EWOULDBLOCK) {
212                                 throw NetworkError ("accept failed");
213                         }
214
215                         continue;
216                 }
217
218                 mutex::scoped_lock lock (worker_mutex);
219                 
220                 /* Wait until the queue has gone down a bit */
221                 while (int (queue.size()) >= num_threads * 2) {
222                         worker_condition.wait (lock);
223                 }
224
225                 struct timeval tv;
226                 tv.tv_sec = 20;
227                 tv.tv_usec = 0;
228                 setsockopt (new_fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof (tv));
229                 setsockopt (new_fd, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof (tv));
230                 
231                 queue.push_back (new_fd);
232                 worker_condition.notify_all ();
233         }
234         
235         close (fd);
236
237         return 0;
238 }