From d41b67d0063548d19920f62847ca31492d8cdf32 Mon Sep 17 00:00:00 2001 From: Andrea Bondavalli Date: Thu, 28 May 2020 20:47:38 +0200 Subject: [PATCH] Set of changes to add support for mDNS sources SDP update via RTSP ANNOUNCE method as described by Ravenna. - modified SessionManager class to register and trigger observer callbacks when a new session is added or removed - moved MDNSServer out of the SessionManager and added instantiation in the daemon main - MDNSServer instance registers to SessionManager to receive source add & remove events and invokes appropiate Avahi client operation - RTSPServer instance registers to SessionManager to receive source update events. In case of update It sends RTSP ANNOUNCE (with the new SDP) to the connected RTSP clients that previously requested (via DESCRIBE method) the SDP file for the specific source - removed disconnetion timeout handling in RTSPServer. RTSPServer connections are now persistent --- daemon/main.cpp | 17 ++++++++++++-- daemon/mdns_server.cpp | 10 ++++++++ daemon/mdns_server.hpp | 8 ++++++- daemon/rtsp_server.cpp | 48 +++++++++++++++++++++++++++++++++----- daemon/rtsp_server.hpp | 23 +++++++++++++++--- daemon/session_manager.cpp | 19 +++++++++++++-- daemon/session_manager.hpp | 17 +++++++------- 7 files changed, 119 insertions(+), 23 deletions(-) diff --git a/daemon/main.cpp b/daemon/main.cpp index 2f4ca8f..e02dfb4 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -24,6 +24,7 @@ #include "config.hpp" #include "driver_manager.hpp" #include "http_server.hpp" +#include "mdns_server.hpp" #include "rtsp_server.hpp" #include "log.hpp" #include "session_manager.hpp" @@ -117,6 +118,12 @@ int main(int argc, char* argv[]) { std::string("SessionManager:: init failed")); } + /* start mDNS server */ + MDNSServer mdns_server(session_manager, config); + if (config->get_mdns_enabled() && !mdns_server.init()) { + throw std::runtime_error(std::string("MDNSServer:: init failed")); + } + /* start rtsp server */ RtspServer rtsp_server(session_manager, config); if (!rtsp_server.init()) { @@ -154,8 +161,6 @@ int main(int argc, char* argv[]) { break; } - rtsp_server.process(); - std::this_thread::sleep_for(std::chrono::seconds(1)); } @@ -180,6 +185,14 @@ int main(int argc, char* argv[]) { std::string("RtspServer:: terminate failed")); } + /* stop mDNS server */ + if (config->get_mdns_enabled()) { + if (!mdns_server.terminate()) { + throw std::runtime_error( + std::string("MDNServer:: terminate failed")); + } + } + /* stop session manager */ if (!session_manager->terminate()) { throw std::runtime_error( diff --git a/daemon/mdns_server.cpp b/daemon/mdns_server.cpp index 68c0fd6..aec2432 100644 --- a/daemon/mdns_server.cpp +++ b/daemon/mdns_server.cpp @@ -320,6 +320,16 @@ bool MDNSServer::init() { (void)avahi_threaded_poll_start(poll_.get()); #endif + + session_manager_->add_source_observer( + SessionManager::ObserverType::add_source, + std::bind(&MDNSServer::add_service, this, + std::placeholders::_2, std::placeholders::_3)); + + session_manager_->add_source_observer( + SessionManager::ObserverType::remove_source, + std::bind(&MDNSServer::remove_service, this, std::placeholders::_2)); + running_ = true; return true; } diff --git a/daemon/mdns_server.hpp b/daemon/mdns_server.hpp index 1c8139e..a056c60 100644 --- a/daemon/mdns_server.hpp +++ b/daemon/mdns_server.hpp @@ -33,12 +33,17 @@ #include #include +#include "session_manager.hpp" #include "config.hpp" #include "utils.hpp" class MDNSServer { public: - MDNSServer(std::shared_ptr config) : config_(config){}; + MDNSServer(std::shared_ptr session_manager, + std::shared_ptr config) + : session_manager_(session_manager), + config_(config){} + MDNSServer() = delete; MDNSServer(const MDNSServer&) = delete; MDNSServer& operator=(const MDNSServer&) = delete; @@ -52,6 +57,7 @@ class MDNSServer { protected: std::atomic_bool running_{false}; + std::shared_ptr session_manager_; std::shared_ptr config_; std::string node_id_{get_node_id()}; diff --git a/daemon/rtsp_server.cpp b/daemon/rtsp_server.cpp index fdb2dfa..a821f17 100644 --- a/daemon/rtsp_server.cpp +++ b/daemon/rtsp_server.cpp @@ -20,18 +20,22 @@ using boost::asio::ip::tcp; -void RtspServer::process() { - /* cleanup of expired sessions */ + +bool RtspServer::add_source(uint8_t id, + const std::string& name, + const std::string& sdp) { + BOOST_LOG_TRIVIAL(info) << "rtsp_server:: added source " << name; + bool ret = false; std::lock_guard lock(mutex_); for (unsigned int i = 0; i < sessions_.size(); i++) { - if (duration_cast(steady_clock::now() - sessions_start_point_[i]) - .count() > RtspSession::session_tout_secs) { auto session = sessions_[i].lock(); if (session != nullptr) { - session->stop(); + ret |= session->announce(id, name, sdp, + config_->get_ip_addr_str(), + config_->get_rtsp_port()); } - } } + return ret; } void RtspServer::accept() { @@ -63,6 +67,37 @@ void RtspServer::accept() { }); } +bool RtspSession::announce(uint8_t id, + const std::string& name, + const std::string& sdp, + const std::string& address, + uint16_t port) { + /* if a describe request is currently not beeing process + * and the specified source id has been described on this session send update */ + if (cseq_ < 0 && source_ids_.find(id) != source_ids_.end()) { + std::string path(std::string("/by-name/") + get_node_id() + " " + name); + std::stringstream ss; + ss << "ANNOUNCE rtsp://" << address << ":" << std::to_string(port) + << httplib::detail::encode_url(path) << " RTSP/1.0\r\n" + << "User-Agent: aes67-daemon\r\n" + << "connection: Keep-Alive" << "\r\n" + << "CSeq: " << announce_cseq_++ << "\r\n" + << "Content-Length: " << sdp.length() << "\r\n" + << "Content-Type: application/sdp\r\n" + << "\r\n" + << sdp; + + BOOST_LOG_TRIVIAL(info) + << "rtsp_server:: " << "ANNOUNCE sent to " + << socket_.remote_endpoint(); + + send_response(ss.str()); + return true; + } + return false; +} + + bool RtspSession::process_request() { /* DESCRIBE rtsp://127.0.0.1:8080/by-name/test RTSP/1.0 @@ -164,6 +199,7 @@ void RtspSession::build_response(const std::string& url) { << "rtsp_server:: " << request_ << " response 200 to " << socket_.remote_endpoint(); send_response(ss.str()); + source_ids_.insert(id); return; } } diff --git a/daemon/rtsp_server.hpp b/daemon/rtsp_server.hpp index 07c5b0f..0794e43 100644 --- a/daemon/rtsp_server.hpp +++ b/daemon/rtsp_server.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "session_manager.hpp" @@ -52,13 +53,18 @@ class RtspSession : public std::enable_shared_from_this { void start(); void stop(); + bool announce(uint8_t source_id, + const std::string& name, + const std::string& sdp, + const std::string& address, + uint16_t port); + private: bool process_request(); void build_response(const std::string& url); void read_request(); void send_error(int status_code, const std::string& description); void send_response(const std::string& response); - std::shared_ptr session_manager_; tcp::socket socket_; char data_[max_length + 1]; @@ -66,6 +72,9 @@ class RtspSession : public std::enable_shared_from_this { size_t length_{0}; int32_t cseq_{-1}; size_t consumed_{0}; + int32_t announce_cseq_{0}; + /* set with the ids described on this session */ + std::unordered_set source_ids_; }; class RtspServer { @@ -87,6 +96,14 @@ class RtspServer { accept(); /* start rtsp server on a separate thread */ res_ = std::async([this](){ io_service_.run(); }); + + session_manager_->add_source_observer( + SessionManager::ObserverType::add_source, + std::bind(&RtspServer::add_source, this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3)); + return true; } @@ -97,9 +114,9 @@ class RtspServer { return true; } - void process(); - private: + /* a new source has been added or updated */ + bool add_source(uint8_t id, const std::string& name, const std::string& sdp); void accept(); std::mutex mutex_; diff --git a/daemon/session_manager.cpp b/daemon/session_manager.cpp index 31ae8d6..56986ea 100644 --- a/daemon/session_manager.cpp +++ b/daemon/session_manager.cpp @@ -420,18 +420,33 @@ uint8_t SessionManager::get_source_id(const std::string& name) const { return it != source_names_.end() ? it->second : (stream_id_max + 1); } +void SessionManager::add_source_observer(ObserverType type, Observer cb) { + switch(type) { + case ObserverType::add_source: + add_source_observers.push_back(cb); + break; + case ObserverType::remove_source: + remove_source_observers.push_back(cb); + break; + } +} + void SessionManager::on_add_source(const StreamSource& source, const StreamInfo& info) { + for (auto cb : add_source_observers) { + cb(source.id, source.name, get_source_sdp_(source.id, info)); + } igmp_.join(config_->get_ip_addr_str(), ip::address_v4(info.stream.m_ui32DestIP).to_string()); - mdns_.add_service(source.name, get_source_sdp_(source.id, info)); source_names_[source.name] = source.id; } void SessionManager::on_remove_source(const StreamInfo& info) { + for (auto cb : remove_source_observers) { + cb(info.stream.m_uiId, info.stream.m_cName, {}); + } igmp_.leave(config_->get_ip_addr_str(), ip::address_v4(info.stream.m_ui32DestIP).to_string()); - mdns_.remove_service(info.stream.m_cName); source_names_.erase(info.stream.m_cName); } diff --git a/daemon/session_manager.hpp b/daemon/session_manager.hpp index 020d905..11c88c2 100644 --- a/daemon/session_manager.hpp +++ b/daemon/session_manager.hpp @@ -30,7 +30,6 @@ #include "driver_manager.hpp" #include "igmp.hpp" #include "sap.hpp" -#include "mdns_server.hpp" struct StreamSource { uint8_t id{0}; @@ -107,10 +106,6 @@ class SessionManager { // session manager interface bool init() { if (!running_) { - /* init mDNS server */ - if (config_->get_mdns_enabled() && !mdns_.init()) { - return false; - } running_ = true; res_ = std::async(std::launch::async, &SessionManager::worker, this); } @@ -127,9 +122,6 @@ class SessionManager { for (auto sink : get_sinks()) { remove_sink(sink.id); } - if (config_->get_mdns_enabled()) { - mdns_.terminate(); - } return ret; } return true; @@ -142,6 +134,11 @@ class SessionManager { std::error_code remove_source(uint32_t id); uint8_t get_source_id(const std::string& name) const; + enum class ObserverType { add_source, remove_source }; + using Observer = std::function; + void add_source_observer(ObserverType type, Observer cb); + std::error_code add_sink(const StreamSink& sink); std::error_code get_sink(uint8_t id, StreamSink& sink) const; std::list get_sinks() const; @@ -210,7 +207,9 @@ class SessionManager { PTPStatus ptp_status_; mutable std::shared_mutex ptp_mutex_; - MDNSServer mdns_{config_}; + std::list add_source_observers; + std::list remove_source_observers; + SAP sap_{config_->get_sap_mcast_addr()}; IGMP igmp_; };