// rtsp_server.cpp // // Copyright (c) 2019 2020 Andrea Bondavalli. All rights reserved. // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . #include "rtsp_server.hpp" #include "utils.hpp" using boost::asio::ip::tcp; void RtspServer::worker() { io_service_.run(); } void RtspServer::process() { /* cleanup of expired sessions */ std::lock_guard lock(mutex_); for (unsigned int i = 0; i < sessions_.size(); i++) { if (duration_cast(steady_clock::now() - sessions_start_point_[i]) .count() > RtspSession::session_tout_secs) { auto session = sessions_[i].lock(); if (session != nullptr) { session->stop(); } } } } void RtspServer::accept() { acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { if (!ec) { std::lock_guard lock(mutex_); /* check for free sessions */ unsigned int i = 0; for (; i < sessions_.size(); i++) { if (sessions_[i].use_count() == 0) { auto session = std::make_shared(session_manager_, std::move(socket_)); sessions_[i] = session; sessions_start_point_[i] = steady_clock::now(); session->start(); break; } } if (i == sessions_.size()) { BOOST_LOG_TRIVIAL(warning) << "rtsp_server:: too many clients connected, " << socket_.remote_endpoint() << " closing..."; socket_.close(); } } accept(); }); } bool RtspSession::process_request() { /* DESCRIBE rtsp://127.0.0.1:8080/by-name/test RTSP/1.0 CSeq: 312 User-Agent: pippo Accept: application/sdp */ data_[length_] = 0; std::stringstream sstream(data_); /* read the request */ if (!getline(sstream, request_, '\n')) { return false; } consumed_ = request_.length() + 1; boost::trim(request_); std::vector fields; split(fields, request_, boost::is_any_of(" ")); if (fields.size() < 3) { return false; } /* read the header */ bool is_end{false}; std::string header; while (getline(sstream, header, '\n')) { consumed_ += header.length() + 1; if (header == "" || header == "\r") { is_end = true; break; } boost::to_lower(header); boost::trim(header); if (header.rfind("cseq:", 0) != std::string::npos) { try { cseq_ = stoi(header.substr(5)); } catch (...) { break; } } } if (!is_end) { return false; } if (cseq_ < 0) { BOOST_LOG_TRIVIAL(error) << "rtsp_server:: CSeq not specified from " << socket_.remote_endpoint(); send_error(400, "Bad Request"); } else if (fields[2].substr(0, 5) != "RTSP/") { BOOST_LOG_TRIVIAL(error) << "rtsp_server:: no RTSP specified from " << socket_.remote_endpoint(); send_error(400, "Bad Request"); } else if (fields[0] != "DESCRIBE") { send_error(405, "Method Not Allowed"); } /*else if (fields[1].substr(0, 7) != "rtsp://") { BOOST_LOG_TRIVIAL(error) << "rtsp_server:: no rtsp protocol from " << socket_.remote_endpoint(); send_error(404, "Not supported"); } */ else { boost::trim(fields[1]); build_response(fields[1]); } return true; } void RtspSession::build_response(const std::string& url) { auto const [ok, protocol, host, port, path] = parse_url(url); if (!ok) { BOOST_LOG_TRIVIAL(error) << "rtsp_server:: cannot parse URL " << url << " from " << socket_.remote_endpoint(); send_error(400, "Bad Request"); return; } auto base_path = std::string("/by-name/") + get_node_id() + " "; uint8_t id = SessionManager::stream_id_max + 1; if (path.rfind(base_path) != std::string::npos) { /* extract the source name from path and retrive the id */ id = session_manager_->get_source_id(path.substr(base_path.length())); } else if (path.rfind("/by-id/") != std::string::npos) { try { id = stoi(path.substr(7)); } catch (...) { } } if (id != (SessionManager::stream_id_max + 1)) { std::string sdp; if (!session_manager_->get_source_sdp(id, sdp)) { std::stringstream ss; ss << "RTSP/1.0 200 OK\r\n" << "CSeq: " << cseq_ << "\r\n" << "Content-Length: " << sdp.length() << "\r\n" << "Content-Type: application/sdp\r\n" << "\r\n" << sdp; BOOST_LOG_TRIVIAL(info) << "rtsp_server:: " << request_ << " response 200 to " << socket_.remote_endpoint(); send_response(ss.str()); return; } } send_error(404, "Not found"); } void RtspSession::read_request() { auto self(shared_from_this()); if (length_ == max_length) { /* request cannot be consumed and we exceeded max length */ stop(); } else { socket_.async_read_some( boost::asio::buffer(data_ + length_, max_length - length_), [this, self](boost::system::error_code ec, std::size_t length) { if (!ec) { BOOST_LOG_TRIVIAL(debug) << "rtsp_server:: received " << length << " from " << socket_.remote_endpoint(); length_ += length; while (length_ && process_request()) { /* step to the next request */ std::memmove(data_, data_ + consumed_, length_ - consumed_); length_ -= consumed_; cseq_ = -1; } /* read more data */ read_request(); } }); } } void RtspSession::send_error(int status_code, const std::string& description) { BOOST_LOG_TRIVIAL(error) << "rtsp_server:: " << request_ << " response " << status_code << " to " << socket_.remote_endpoint(); std::stringstream ss; ss << "RTSP/1.0 " << status_code << " " << description << "\r\n"; if (cseq_ >= 0) { ss << "CSeq: " << cseq_ << "\r\n"; } ss << "\n\r"; send_response(ss.str()); } void RtspSession::send_response(const std::string& response) { auto self(shared_from_this()); boost::asio::async_write( socket_, boost::asio::buffer(response.c_str(), response.length()), [self](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { // we accept multiple requests within timeout // stop(); } }); } void RtspSession::start() { BOOST_LOG_TRIVIAL(debug) << "rtsp_server:: starting session with " << socket_.remote_endpoint(); read_request(); } void RtspSession::stop() { BOOST_LOG_TRIVIAL(debug) << "rtsp_server:: stopping session with " << socket_.remote_endpoint(); socket_.close(); }