- while (true) {
- boost::mutex::scoped_lock lock (_worker_mutex);
- while (_queue.empty () && !_terminate) {
- _empty_condition.wait (lock);
- }
-
- if (_terminate) {
- return;
- }
-
- shared_ptr<Socket> socket = _queue.front ();
- _queue.pop_front ();
-
- lock.unlock ();
-
- int frame = -1;
- string ip;
-
- struct timeval start;
- struct timeval after_read;
- struct timeval after_encode;
- struct timeval end;
-
- gettimeofday (&start, 0);
-
- try {
- frame = process (socket, after_read, after_encode);
- ip = socket->socket().remote_endpoint().address().to_string();
- } catch (std::exception& e) {
- cerr << "Error: " << e.what() << "\n";
- LOG_ERROR ("Error: %1", e.what());
- }
-
- gettimeofday (&end, 0);
-
- socket.reset ();
-
- lock.lock ();
-
- if (frame >= 0) {
- struct timeval end;
- gettimeofday (&end, 0);
-
- SafeStringStream message;
- message.precision (2);
- message << fixed
- << "Encoded frame " << frame << " from " << ip << ": "
- << "receive " << (seconds(after_read) - seconds(start)) << "s "
- << "encode " << (seconds(after_encode) - seconds(after_read)) << "s "
- << "send " << (seconds(end) - seconds(after_encode)) << "s.";
-
- if (_verbose) {
- cout << message.str() << "\n";
- }
-
- LOG_GENERAL_NC (message.str ());
- }
-
- _full_condition.notify_all ();
- }
-}
-
-void
-Server::run (int num_threads)
-{
- LOG_GENERAL ("Server starting with %1 threads", num_threads);
- if (_verbose) {
- cout << "DCP-o-matic server starting with " << num_threads << " threads.\n";
- }
-
- for (int i = 0; i < num_threads; ++i) {
- _worker_threads.push_back (new thread (bind (&Server::worker_thread, this)));
- }
-
- _broadcast.thread = new thread (bind (&Server::broadcast_thread, this));
-