Merge master.
[dcpomatic.git] / src / lib / server.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/server.cc
21  *  @brief Class to describe a server to which we can send
22  *  encoding work, and a class to implement such a server.
23  */
24
25 #include <string>
26 #include <vector>
27 #include <sstream>
28 #include <iostream>
29 #include <boost/algorithm/string.hpp>
30 #include <boost/scoped_array.hpp>
31 #include <libcxml/cxml.h>
32 #include <dcp/raw_convert.h>
33 #include "server.h"
34 #include "util.h"
35 #include "scaler.h"
36 #include "image.h"
37 #include "dcp_video_frame.h"
38 #include "config.h"
39 #include "cross.h"
40
41 #include "i18n.h"
42
43 using std::string;
44 using std::stringstream;
45 using std::multimap;
46 using std::vector;
47 using std::list;
48 using std::cout;
49 using std::cerr;
50 using std::setprecision;
51 using std::fixed;
52 using boost::shared_ptr;
53 using boost::algorithm::is_any_of;
54 using boost::algorithm::split;
55 using boost::thread;
56 using boost::bind;
57 using boost::scoped_array;
58 using boost::optional;
59 using dcp::Size;
60 using dcp::raw_convert;
61
62 Server::Server (shared_ptr<Log> log, bool verbose)
63         : _log (log)
64         , _verbose (verbose)
65 {
66
67 }
68
69 /** @param after_read Filled in with gettimeofday() after reading the input from the network.
70  *  @param after_encode Filled in with gettimeofday() after encoding the image.
71  */
72 int
73 Server::process (shared_ptr<Socket> socket, struct timeval& after_read, struct timeval& after_encode)
74 {
75         uint32_t length = socket->read_uint32 ();
76         scoped_array<char> buffer (new char[length]);
77         socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
78         
79         stringstream s (buffer.get());
80         shared_ptr<cxml::Document> xml (new cxml::Document ("EncodingRequest"));
81         xml->read_stream (s);
82         if (xml->number_child<int> ("Version") != SERVER_LINK_VERSION) {
83                 cerr << "Mismatched server/client versions\n";
84                 _log->log ("Mismatched server/client versions");
85                 return -1;
86         }
87
88         dcp::Size size (
89                 xml->number_child<int> ("Width"), xml->number_child<int> ("Height")
90                 );
91
92         shared_ptr<Image> image (new Image (PIX_FMT_RGB24, size, true));
93
94         image->read_from_socket (socket);
95         DCPVideoFrame dcp_video_frame (image, xml, _log);
96
97         gettimeofday (&after_read, 0);
98         
99         shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
100
101         gettimeofday (&after_encode, 0);
102         
103         try {
104                 encoded->send (socket);
105         } catch (std::exception& e) {
106                 _log->log (String::compose (
107                                    "Send failed; frame %1, data size %2, pixel format %3, image size %4x%5, %6 components",
108                                    dcp_video_frame.frame(), encoded->size(), image->pixel_format(), image->size().width, image->size().height, image->components()
109                                    )
110                         );
111                 throw;
112         }
113
114         return dcp_video_frame.frame ();
115 }
116
117 void
118 Server::worker_thread ()
119 {
120         while (1) {
121                 boost::mutex::scoped_lock lock (_worker_mutex);
122                 while (_queue.empty ()) {
123                         _worker_condition.wait (lock);
124                 }
125
126                 shared_ptr<Socket> socket = _queue.front ();
127                 _queue.pop_front ();
128                 
129                 lock.unlock ();
130
131                 int frame = -1;
132                 string ip;
133
134                 struct timeval start;
135                 struct timeval after_read;
136                 struct timeval after_encode;
137                 struct timeval end;
138                 
139                 gettimeofday (&start, 0);
140                 
141                 try {
142                         frame = process (socket, after_read, after_encode);
143                         ip = socket->socket().remote_endpoint().address().to_string();
144                 } catch (std::exception& e) {
145                         _log->log (String::compose ("Error: %1", e.what()));
146                 }
147
148                 gettimeofday (&end, 0);
149
150                 socket.reset ();
151                 
152                 lock.lock ();
153
154                 if (frame >= 0) {
155                         struct timeval end;
156                         gettimeofday (&end, 0);
157
158                         stringstream message;
159                         message.precision (2);
160                         message << fixed
161                                 << "Encoded frame " << frame << " from " << ip << ": "
162                                 << "receive " << (seconds(after_read) - seconds(start)) << "s "
163                                 << "encode " << (seconds(after_encode) - seconds(after_read)) << "s "
164                                 << "send " << (seconds(end) - seconds(after_encode)) << "s.";
165                                                    
166                         if (_verbose) {
167                                 cout << message.str() << "\n";
168                         }
169
170                         _log->log (message.str ());
171                 }
172                 
173                 _worker_condition.notify_all ();
174         }
175 }
176
177 void
178 Server::run (int num_threads)
179 {
180         _log->log (String::compose ("Server starting with %1 threads", num_threads));
181         if (_verbose) {
182                 cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
183         }
184         
185         for (int i = 0; i < num_threads; ++i) {
186                 _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
187         }
188
189         _broadcast.thread = new thread (bind (&Server::broadcast_thread, this));
190         
191         boost::asio::io_service io_service;
192
193         boost::asio::ip::tcp::acceptor acceptor (
194                 io_service,
195                 boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base ())
196                 );
197         
198         while (1) {
199                 shared_ptr<Socket> socket (new Socket);
200                 acceptor.accept (socket->socket ());
201
202                 boost::mutex::scoped_lock lock (_worker_mutex);
203                 
204                 /* Wait until the queue has gone down a bit */
205                 while (int (_queue.size()) >= num_threads * 2) {
206                         _worker_condition.wait (lock);
207                 }
208                 
209                 _queue.push_back (socket);
210                 _worker_condition.notify_all ();
211         }
212 }
213
214 void
215 Server::broadcast_thread ()
216 try
217 {
218         boost::asio::io_service io_service;
219
220         boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
221         boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
222
223         _broadcast.socket = new boost::asio::ip::udp::socket (io_service);
224         _broadcast.socket->open (listen_endpoint.protocol ());
225         _broadcast.socket->bind (listen_endpoint);
226
227         _broadcast.socket->async_receive_from (
228                 boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
229                 _broadcast.send_endpoint,
230                 boost::bind (&Server::broadcast_received, this)
231                 );
232
233         io_service.run ();
234 }
235 catch (...)
236 {
237         store_current ();
238 }
239
240 void
241 Server::broadcast_received ()
242 {
243         _broadcast.buffer[sizeof(_broadcast.buffer) - 1] = '\0';
244
245         if (strcmp (_broadcast.buffer, DCPOMATIC_HELLO) == 0) {
246                 /* Reply to the client saying what we can do */
247                 xmlpp::Document doc;
248                 xmlpp::Element* root = doc.create_root_node ("ServerAvailable");
249                 root->add_child("Threads")->add_child_text (raw_convert<string> (_worker_threads.size ()));
250                 stringstream xml;
251                 doc.write_to_stream (xml, "UTF-8");
252
253                 shared_ptr<Socket> socket (new Socket);
254                 try {
255                         socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1));
256                         socket->write (xml.str().length() + 1);
257                         socket->write ((uint8_t *) xml.str().c_str(), xml.str().length() + 1);
258                 } catch (...) {
259
260                 }
261         }
262                 
263         _broadcast.socket->async_receive_from (
264                 boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
265                 _broadcast.send_endpoint, boost::bind (&Server::broadcast_received, this)
266                 );
267 }