Merge master.
[dcpomatic.git] / src / lib / server.cc
1 /*
2     Copyright (C) 2012-2014 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/server.cc
21  *  @brief Class to describe a server to which we can send
22  *  encoding work, and a class to implement such a server.
23  */
24
25 #include <string>
26 #include <vector>
27 #include <sstream>
28 #include <iostream>
29 #include <boost/algorithm/string.hpp>
30 #include <boost/scoped_array.hpp>
31 #include <libcxml/cxml.h>
32 #include <dcp/raw_convert.h>
33 #include "server.h"
34 #include "util.h"
35 #include "scaler.h"
36 #include "image.h"
37 #include "dcp_video_frame.h"
38 #include "config.h"
39 #include "cross.h"
40 #include "player_video_frame.h"
41
42 #include "i18n.h"
43
44 using std::string;
45 using std::stringstream;
46 using std::multimap;
47 using std::vector;
48 using std::list;
49 using std::cout;
50 using std::cerr;
51 using std::setprecision;
52 using std::fixed;
53 using boost::shared_ptr;
54 using boost::algorithm::is_any_of;
55 using boost::algorithm::split;
56 using boost::thread;
57 using boost::bind;
58 using boost::scoped_array;
59 using boost::optional;
60 using dcp::Size;
61 using dcp::raw_convert;
62
63 Server::Server (shared_ptr<Log> log, bool verbose)
64         : _log (log)
65         , _verbose (verbose)
66 {
67
68 }
69
70 /** @param after_read Filled in with gettimeofday() after reading the input from the network.
71  *  @param after_encode Filled in with gettimeofday() after encoding the image.
72  */
73 int
74 Server::process (shared_ptr<Socket> socket, struct timeval& after_read, struct timeval& after_encode)
75 {
76         uint32_t length = socket->read_uint32 ();
77         scoped_array<char> buffer (new char[length]);
78         socket->read (reinterpret_cast<uint8_t*> (buffer.get()), length);
79
80         stringstream s (buffer.get());
81         shared_ptr<cxml::Document> xml (new cxml::Document ("EncodingRequest"));
82         xml->read_stream (s);
83         if (xml->number_child<int> ("Version") != SERVER_LINK_VERSION) {
84                 cerr << "Mismatched server/client versions\n";
85                 _log->log ("Mismatched server/client versions");
86                 return -1;
87         }
88
89         shared_ptr<PlayerVideoFrame> pvf (new PlayerVideoFrame (xml, socket));
90
91         DCPVideoFrame dcp_video_frame (pvf, xml, _log);
92
93         gettimeofday (&after_read, 0);
94         
95         shared_ptr<EncodedData> encoded = dcp_video_frame.encode_locally ();
96
97         gettimeofday (&after_encode, 0);
98         
99         try {
100                 encoded->send (socket);
101         } catch (std::exception& e) {
102                 _log->log (String::compose ("Send failed; frame %1", dcp_video_frame.index()));
103                 throw;
104         }
105
106         return dcp_video_frame.index ();
107 }
108
109 void
110 Server::worker_thread ()
111 {
112         while (1) {
113                 boost::mutex::scoped_lock lock (_worker_mutex);
114                 while (_queue.empty ()) {
115                         _worker_condition.wait (lock);
116                 }
117
118                 shared_ptr<Socket> socket = _queue.front ();
119                 _queue.pop_front ();
120                 
121                 lock.unlock ();
122
123                 int frame = -1;
124                 string ip;
125
126                 struct timeval start;
127                 struct timeval after_read;
128                 struct timeval after_encode;
129                 struct timeval end;
130                 
131                 gettimeofday (&start, 0);
132                 
133                 try {
134                         frame = process (socket, after_read, after_encode);
135                         ip = socket->socket().remote_endpoint().address().to_string();
136                 } catch (std::exception& e) {
137                         _log->log (String::compose ("Error: %1", e.what()));
138                 }
139
140                 gettimeofday (&end, 0);
141
142                 socket.reset ();
143                 
144                 lock.lock ();
145
146                 if (frame >= 0) {
147                         struct timeval end;
148                         gettimeofday (&end, 0);
149
150                         stringstream message;
151                         message.precision (2);
152                         message << fixed
153                                 << "Encoded frame " << frame << " from " << ip << ": "
154                                 << "receive " << (seconds(after_read) - seconds(start)) << "s "
155                                 << "encode " << (seconds(after_encode) - seconds(after_read)) << "s "
156                                 << "send " << (seconds(end) - seconds(after_encode)) << "s.";
157                                                    
158                         if (_verbose) {
159                                 cout << message.str() << "\n";
160                         }
161
162                         _log->log (message.str ());
163                 }
164                 
165                 _worker_condition.notify_all ();
166         }
167 }
168
169 void
170 Server::run (int num_threads)
171 {
172         _log->log (String::compose ("Server starting with %1 threads", num_threads));
173         if (_verbose) {
174                 cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
175         }
176         
177         for (int i = 0; i < num_threads; ++i) {
178                 _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
179         }
180
181         _broadcast.thread = new thread (bind (&Server::broadcast_thread, this));
182         
183         boost::asio::io_service io_service;
184
185         boost::asio::ip::tcp::acceptor acceptor (
186                 io_service,
187                 boost::asio::ip::tcp::endpoint (boost::asio::ip::tcp::v4(), Config::instance()->server_port_base ())
188                 );
189         
190         while (1) {
191                 shared_ptr<Socket> socket (new Socket);
192                 acceptor.accept (socket->socket ());
193
194                 boost::mutex::scoped_lock lock (_worker_mutex);
195                 
196                 /* Wait until the queue has gone down a bit */
197                 while (int (_queue.size()) >= num_threads * 2) {
198                         _worker_condition.wait (lock);
199                 }
200                 
201                 _queue.push_back (socket);
202                 _worker_condition.notify_all ();
203         }
204 }
205
206 void
207 Server::broadcast_thread ()
208 try
209 {
210         boost::asio::io_service io_service;
211
212         boost::asio::ip::address address = boost::asio::ip::address_v4::any ();
213         boost::asio::ip::udp::endpoint listen_endpoint (address, Config::instance()->server_port_base() + 1);
214
215         _broadcast.socket = new boost::asio::ip::udp::socket (io_service);
216         _broadcast.socket->open (listen_endpoint.protocol ());
217         _broadcast.socket->bind (listen_endpoint);
218
219         _broadcast.socket->async_receive_from (
220                 boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
221                 _broadcast.send_endpoint,
222                 boost::bind (&Server::broadcast_received, this)
223                 );
224
225         io_service.run ();
226 }
227 catch (...)
228 {
229         store_current ();
230 }
231
232 void
233 Server::broadcast_received ()
234 {
235         _broadcast.buffer[sizeof(_broadcast.buffer) - 1] = '\0';
236
237         if (strcmp (_broadcast.buffer, DCPOMATIC_HELLO) == 0) {
238                 /* Reply to the client saying what we can do */
239                 xmlpp::Document doc;
240                 xmlpp::Element* root = doc.create_root_node ("ServerAvailable");
241                 root->add_child("Threads")->add_child_text (raw_convert<string> (_worker_threads.size ()));
242                 stringstream xml;
243                 doc.write_to_stream (xml, "UTF-8");
244
245                 shared_ptr<Socket> socket (new Socket);
246                 try {
247                         socket->connect (boost::asio::ip::tcp::endpoint (_broadcast.send_endpoint.address(), Config::instance()->server_port_base() + 1));
248                         socket->write (xml.str().length() + 1);
249                         socket->write ((uint8_t *) xml.str().c_str(), xml.str().length() + 1);
250                 } catch (...) {
251
252                 }
253         }
254                 
255         _broadcast.socket->async_receive_from (
256                 boost::asio::buffer (_broadcast.buffer, sizeof (_broadcast.buffer)),
257                 _broadcast.send_endpoint, boost::bind (&Server::broadcast_received, this)
258                 );
259 }