diff --git a/README.md b/README.md index f3239ec..17244e2 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ The daemon can be cross-compiled for multiple platforms and implements the follo * HTTP REST API for the daemon control and configuration * SAP sources discovery and advertisement compatible with AES67 standard * mDNS sources discovery and advertisement (using Linux Avahi) compatible with Ravenna standard -* RTSP client and server to retrieve or return SDP files via DESCRIBE method compatible with Ravenna standard +* RTSP client and server to retrieve, return and update SDP files via DESCRIBE and ANNOUNCE methods according to Ravenna standard * IGMP handling for SAP, PTP and RTP sessions The directory also contains the daemon regression tests in the [tests](daemon/tests) subdirectory. diff --git a/daemon/README.md b/daemon/README.md index 456b4db..0193450 100644 --- a/daemon/README.md +++ b/daemon/README.md @@ -10,7 +10,7 @@ The daemon is responsible for: * HTTP REST API for the daemon control and configuration * SAP sources discovery and advertisement compatible with AES67 standard * mDNS sources discovery and advertisement (using Linux Avahi) compatible with Ravenna standard -* RTSP client and server to retrieve or return SDP files via DESCRIBE method compatible with Ravenna standard +* RTSP client and server to retrieve, return and update SDP files via DESCRIBE and ANNOUNCE methods according to Ravenna standard * IGMP handling for SAP, PTP and RTP sessions diff --git a/daemon/browser.cpp b/daemon/browser.cpp index f276000..a90685b 100644 --- a/daemon/browser.cpp +++ b/daemon/browser.cpp @@ -107,9 +107,9 @@ bool Browser::worker() { BOOST_LOG_TRIVIAL(debug) << "browser:: refreshing SAP source " << it->id; // annoucement, update last seen and announce period - auto upd_source{*it}; uint32_t last_seen = duration_cast(steady_clock::now() - startup_).count(); + auto upd_source{*it}; upd_source.announce_period = last_seen - upd_source.last_seen; upd_source.last_seen = last_seen; sources_.replace(it, upd_source); @@ -155,26 +155,32 @@ bool Browser::worker() { return true; } -void Browser::on_new_rtsp_source(const std::string& name, - const std::string& domain, - const RtspSource& s) { +void Browser::on_change_rtsp_source(const std::string& name, + const std::string& domain, + const RtspSource& s) { uint32_t last_seen = duration_cast(steady_clock::now() - startup_).count(); std::unique_lock sources_lock(sources_mutex_); + /* search by name */ auto rng = sources_.get().equal_range(name); while(rng.first != rng.second){ const auto& it = rng.first; if (it->source == "mDNS" && it->domain == domain) { - /* conflict ? */ - BOOST_LOG_TRIVIAL(warning) << "browser:: mDNS source conflict on" - << " name " << it->name - << " domain " << it->domain - << ", skipping ... "; + /* mDNS source with same name and domain -> update */ + BOOST_LOG_TRIVIAL(info) << "browser:: updating RTSP source " << s.id + << " name " << name + << " domain " << domain; + auto upd_source{*it}; + upd_source.id = s.id; + upd_source.sdp = s.sdp; + upd_source.address = s.address; + upd_source.last_seen = last_seen; + sources_.get().replace(it, upd_source); return; } ++rng.first; } - + /* entry not found -> add */ BOOST_LOG_TRIVIAL(info) << "browser:: adding RTSP source " << s.id << " name " << name << " domain " << domain; diff --git a/daemon/browser.hpp b/daemon/browser.hpp index 6f400a4..b669d6f 100644 --- a/daemon/browser.hpp +++ b/daemon/browser.hpp @@ -73,7 +73,7 @@ class Browser : public MDNSClient { bool worker(); - virtual void on_new_rtsp_source( + virtual void on_change_rtsp_source( const std::string& name, const std::string& domain, const RtspSource& source) override; diff --git a/daemon/mdns_client.cpp b/daemon/mdns_client.cpp index 297be23..c1e4afb 100644 --- a/daemon/mdns_client.cpp +++ b/daemon/mdns_client.cpp @@ -17,14 +17,13 @@ // along with this program. If not, see . // -#include "mdns_client.hpp" - #include #include "config.hpp" #include "interface.hpp" #include "log.hpp" #include "rtsp_client.hpp" +#include "mdns_client.hpp" #ifdef _USE_AVAHI_ void MDNSClient::resolve_callback(AvahiServiceResolver* r, @@ -89,6 +88,7 @@ void MDNSClient::resolve_callback(AvahiServiceResolver* r, (mdns.config_->get_interface_name() == "lo"))) { std::lock_guard lock(mdns.sources_res_mutex_); + /* process RTSP client in async task */ mdns.sources_res_.emplace_back(std::async( std::launch::async, [&mdns, @@ -96,11 +96,12 @@ void MDNSClient::resolve_callback(AvahiServiceResolver* r, domain_ = std::forward(domain), addr_ = std::forward(addr), port_ = std::forward(std::to_string(port))] { - auto res = RtspClient::describe(std::string("/by-name/") + name_, - addr_, port_); - if (res.first) { - mdns.on_new_rtsp_source(name_, domain_, res.second); - } + RtspClient::process( + std::bind(&MDNSClient::on_change_rtsp_source, &mdns, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3), + name_, domain_, std::string("/by-name/") + name_, + addr_, port_); })); } @@ -153,6 +154,7 @@ void MDNSClient::browse_callback(AvahiServiceBrowser* b, BOOST_LOG_TRIVIAL(info) << "mdns_client:: (Browser) REMOVE: " << "service " << name << " of type " << type << " in domain " << domain; + RtspClient::stop(name, domain); mdns.on_remove_rtsp_source(name, domain); break; @@ -259,6 +261,7 @@ void MDNSClient::process_results() { bool MDNSClient::terminate() { if (running_) { running_ = false; + RtspClient::stop_all(); #ifdef _USE_AVAHI_ /* wait for all pending results and remove from list */ std::lock_guard lock(sources_res_mutex_); diff --git a/daemon/mdns_client.hpp b/daemon/mdns_client.hpp index e7c8640..890bac6 100644 --- a/daemon/mdns_client.hpp +++ b/daemon/mdns_client.hpp @@ -48,11 +48,11 @@ class MDNSClient { virtual bool terminate(); protected: - virtual void on_new_rtsp_source(const std::string& name, - const std::string& domain, - const RtspSource& source) = 0; + virtual void on_change_rtsp_source(const std::string& name, + const std::string& domain, + const RtspSource& source){}; virtual void on_remove_rtsp_source(const std::string& name, - const std::string& domain) = 0; + const std::string& domain){}; void process_results(); std::list > sources_res_; diff --git a/daemon/rtsp_client.cpp b/daemon/rtsp_client.cpp index a0779bb..9d801f6 100644 --- a/daemon/rtsp_client.cpp +++ b/daemon/rtsp_client.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include "log.hpp" #include "utils.hpp" @@ -36,13 +37,27 @@ using namespace boost::asio; using namespace boost::asio::ip; using namespace boost::algorithm; + struct RtspResponse { - uint32_t cseq; + int32_t cseq{-1}; std::string content_type; - uint64_t content_length; + uint64_t content_length{0}; std::string body; }; +static std::string sdp_get_subject(const std::string& sdp) { + std::stringstream ssstrem(sdp); + std::string line; + while (getline(ssstrem, line, '\n')) { + if (line.substr(0, 2) == "s=") { + auto subject = line.substr(2); + trim(subject); + return subject; + } + } + return ""; +} + RtspResponse read_response(tcp::iostream& s, uint16_t max_length) { RtspResponse res; std::string header; @@ -73,6 +88,8 @@ RtspResponse read_response(tcp::iostream& s, uint16_t max_length) { << "cannot perform number conversion"; } + BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: reading body length " + << res.content_length; // read up to max_length if (res.content_length > 0 && res.content_length < max_length) { res.body.reserve(res.content_length); @@ -83,19 +100,17 @@ RtspResponse read_response(tcp::iostream& s, uint16_t max_length) { return res; } -std::pair RtspClient::describe(const std::string& path, - const std::string& address, - const std::string& port) { - RtspSource rs; - bool success{false}; +std::pair RtspClient::process( + RtspClient::Observer callback, + const std::string& name, + const std::string& domain, + const std::string& path, + const std::string& address, + const std::string& port, + bool wait_for_updates) { + RtspSource rtsp_source; try { - tcp::iostream s; - -#if BOOST_VERSION < 106700 - s.expires_from_now(boost::posix_time::seconds(client_timeout)); -#else - s.expires_from_now(std::chrono::seconds(client_timeout)); -#endif + ip::tcp::iostream s; BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: connecting to " << "rtsp://" << address << ":" << port << path; @@ -103,10 +118,10 @@ std::pair RtspClient::describe(const std::string& path, if (!s) { BOOST_LOG_TRIVIAL(warning) << "rtsp_client:: unable to connect to " << address << ":" << port; - return std::make_pair(success, rs); + return std::make_pair(false, rtsp_source); } - uint16_t cseq = seq_number++; + uint16_t cseq = g_seq_number++; s << "DESCRIBE rtsp://" << address << ":" << port << httplib::detail::encode_url(path) << " RTSP/1.0\r\n"; s << "CSeq: " << cseq << "\r\n"; @@ -122,56 +137,157 @@ std::pair RtspClient::describe(const std::string& path, s >> rtsp_version; unsigned int status_code; s >> status_code; - std::string status_message; - std::getline(s, status_message); + std::string request; + std::getline(s, request); if (!s || rtsp_version.substr(0, 5) != "RTSP/") { BOOST_LOG_TRIVIAL(error) << "rtsp_client:: invalid response from " << "rtsp://" << address << ":" << port << path; - return std::make_pair(success, rs); + return std::make_pair(false, rtsp_source); } if (status_code != 200) { BOOST_LOG_TRIVIAL(error) << "rtsp_client:: response with status code " << status_code << " from " << "rtsp://" << address << ":" << port << path; - return std::make_pair(success, rs); + return std::make_pair(false, rtsp_source); } - auto res = read_response(s, max_body_length); - if (res.content_type.rfind("application/sdp", 0) == std::string::npos) { - BOOST_LOG_TRIVIAL(error) << "rtsp_client:: unsupported content-type " - << res.content_type << " from " - << "rtsp://" << address << ":" << port << path; - return std::make_pair(success, rs); - } - if (res.cseq != cseq) { - BOOST_LOG_TRIVIAL(error) - << "rtsp_client:: invalid response sequence " << res.cseq << " from " - << "rtsp://" << address << ":" << port << path; - return std::make_pair(success, rs); - } + bool is_announce = false; + bool is_describe = true; + std::string announced_name; + do { + auto res = read_response(s, max_body_length); + if (is_describe && res.cseq != cseq) { + BOOST_LOG_TRIVIAL(error) + << "rtsp_client:: invalid response sequence " << res.cseq + << " from rtsp://" << address << ":" << port << path; + return std::make_pair(false, rtsp_source); + } - std::stringstream ss; - ss << "rtsp:" << std::hex - << crc16(reinterpret_cast(res.body.c_str()), - res.body.length()); - /*<< std::hex << ip::address_v4::from_string(address.c_str()).to_ulong();*/ + if (res.content_type.rfind("application/sdp", 0) == std::string::npos) { + BOOST_LOG_TRIVIAL(error) << "rtsp_client:: unsupported content-type " + << res.content_type << " from " + << "rtsp://" << address << ":" << port << path; + if (is_describe) { + return std::make_pair(false, rtsp_source); + } + } else { + std::stringstream ss; + ss << "rtsp:" << std::hex + << crc16(reinterpret_cast(res.body.c_str()), + res.body.length()); + /*<< std::hex << ip::address_v4::from_string(address.c_str()).to_ulong();*/ + rtsp_source.id = ss.str(); + rtsp_source.source = "mDNS"; + rtsp_source.address = address; + rtsp_source.sdp = std::move(res.body); + BOOST_LOG_TRIVIAL(info) << "rtsp_client:: completed " + << "rtsp://" << address << ":" << port << path; - rs.id = ss.str(); - rs.source = "mDNS"; - rs.address = address; - rs.sdp = std::move(res.body); + if (is_announce || is_describe) { + if (is_announce && announced_name.empty()) { + /* if no name from URL we try from SDP file */ + announced_name = sdp_get_subject(rtsp_source.sdp); + } + callback(announced_name.empty() ? name : announced_name, domain, + rtsp_source); + } - BOOST_LOG_TRIVIAL(info) << "rtsp_client:: describe completed " - << "rtsp://" << address << ":" << port << path; + if (is_announce) { + s << "RTSP/1.0 200 OK\r\n"; + s << "CSeq: " << res.cseq << "\r\n"; + s << "\r\n"; + } else if (!is_describe) { + s << "RTSP/1.0 405 Method Not Allowed\r\n"; + s << "CSeq: " << res.cseq << "\r\n"; + s << "\r\n"; + } + } + + if (wait_for_updates) { + auto name_domain = std::make_pair(name, domain); + g_mutex.lock(); + g_active_clients[name_domain] = &s; + g_mutex.unlock(); + + /* we start waiting for updates */ + do { + std::getline(s, request); + } while (request.empty() && !s.error()); + if (s.error()) { + BOOST_LOG_TRIVIAL(info) << "rtsp_client:: end: " + << s.error().message(); + break; + } + BOOST_LOG_TRIVIAL(info) << "rtsp_client:: received " << request; + boost::trim(request); + is_describe = is_announce = false; + announced_name = ""; + std::vector fields; + split(fields, request, boost::is_any_of(" ")); + if (fields.size() >= 2 && fields[0] == "ANNOUNCE") { + auto const [ok, protocol, host, port, path] = parse_url(fields[1]); + if (ok) { + /* if we find a valid announced source name we use it + * otherwise we try from SDP file or we use the mDNS name */ + if (path.rfind("/by-name/") != std::string::npos) { + announced_name = path.substr(9); + BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: found announced name " + << announced_name; + } + } + is_announce = true; + } + } + } while (wait_for_updates); - success = true; } catch (std::exception& e) { BOOST_LOG_TRIVIAL(warning) << "rtsp_client:: error with " << "rtsp://" << address << ":" << port << path << ": " << e.what(); + return std::make_pair(false, rtsp_source); } - return std::make_pair(success, rs); + return std::make_pair(true, rtsp_source); } + + +void RtspClient::stop(const std::string& name, const std::string& domain) { + std::lock_guard lock(g_mutex); + auto it = g_active_clients.find(std::make_pair(name, domain)); + if (it != g_active_clients.end()) { + BOOST_LOG_TRIVIAL(info) + << "rtsp_client:: stopping client " << name << " " << domain; +#if BOOST_VERSION < 106600 + it->second->close(); +#else + it->second->socket().shutdown(tcp::socket::shutdown_both); +#endif + g_active_clients.erase(it); + } +} + +void RtspClient::stop_all() { + std::lock_guard lock(g_mutex); + auto it = g_active_clients.begin(); + while (it != g_active_clients.end()) { + BOOST_LOG_TRIVIAL(info) + << "rtsp_client:: stopping client " + << it->first.first << " " << it->first.second; +#if BOOST_VERSION < 106600 + it->second->close(); +#else + it->second->socket().shutdown(tcp::socket::shutdown_both); +#endif + it = g_active_clients.erase(it); + } +} + +std::pair RtspClient::describe( + const std::string& path, + const std::string& address, + const std::string& port) { + return RtspClient::process({}, {}, {}, path, address, port, false); +} + diff --git a/daemon/rtsp_client.hpp b/daemon/rtsp_client.hpp index 473df1e..3da888b 100644 --- a/daemon/rtsp_client.hpp +++ b/daemon/rtsp_client.hpp @@ -20,6 +20,9 @@ #ifndef _RTSP_CLIENT_HPP_ #define _RTSP_CLIENT_HPP_ +#include +#include + struct RtspSource { std::string id; std::string source; @@ -33,12 +36,33 @@ class RtspClient { constexpr static uint16_t client_timeout = 10; // sec constexpr static const char dft_port[] = "554"; - static std::pair describe( - const std::string& path, - const std::string& address, - const std::string& port = dft_port); + using Observer = std::function; + + static std::pair process( + Observer callback, + const std::string& name, + const std::string& domain, + const std::string& path, + const std::string& address, + const std::string& port = dft_port, + bool wait_for_updates = true); + + static void stop(const std::string& name, const std::string& domain); + static void stop_all(); + + static std::pair describe( + const std::string& path, + const std::string& address, + const std::string& port = dft_port); + + inline static std::atomic g_seq_number{0}; + inline static std::map, + boost::asio::ip::tcp::iostream* /*stream*/> g_active_clients; + inline static std::mutex g_mutex; - inline static std::atomic seq_number; }; #endif