From f826a5b2efb2ee2a4898dcc506999d1e922de24c Mon Sep 17 00:00:00 2001 From: Andrea Bondavalli Date: Sat, 14 Jan 2023 12:06:55 +0100 Subject: [PATCH] Implementation of automatic Sink updates based on remote source updates received by the Browser. The daemon uses the most recent version of corresponding remote sources to update a Sync. The Originator (o=) field of the SDP is used identify the corresponding remote sources. Many thanks to @Sikabo for the original implementation ! --- daemon/main.cpp | 24 +++---- daemon/session_manager.cpp | 124 +++++++++++++++++++++++++++++++++---- daemon/session_manager.hpp | 32 +++++++++- 3 files changed, 154 insertions(+), 26 deletions(-) diff --git a/daemon/main.cpp b/daemon/main.cpp index e65f7ba..761de88 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -125,8 +125,14 @@ int main(int argc, char* argv[]) { throw std::runtime_error(std::string("DriverManager:: init failed")); } + /* start browser */ + auto browser = Browser::create(config); + if (browser == nullptr || !browser->init()) { + throw std::runtime_error(std::string("Browser:: init failed")); + } + /* start session manager */ - auto session_manager = SessionManager::create(driver, config); + auto session_manager = SessionManager::create(driver, browser, config); if (session_manager == nullptr || !session_manager->init()) { throw std::runtime_error(std::string("SessionManager:: init failed")); } @@ -143,12 +149,6 @@ int main(int argc, char* argv[]) { throw std::runtime_error(std::string("RtspServer:: init failed")); } - /* start browser */ - auto browser = Browser::create(config); - if (browser == nullptr || !browser->init()) { - throw std::runtime_error(std::string("Browser:: init failed")); - } - /* start http server */ HttpServer http_server(session_manager, browser, config); if (!http_server.init()) { @@ -184,11 +184,6 @@ int main(int argc, char* argv[]) { throw std::runtime_error(std::string("HttpServer:: terminate failed")); } - /* stop browser */ - if (!browser->terminate()) { - throw std::runtime_error(std::string("Browser:: terminate failed")); - } - /* stop rtsp server */ if (!rtsp_server.terminate()) { throw std::runtime_error(std::string("RtspServer:: terminate failed")); @@ -207,6 +202,11 @@ int main(int argc, char* argv[]) { std::string("SessionManager:: terminate failed")); } + /* stop browser */ + if (!browser->terminate()) { + throw std::runtime_error(std::string("Browser:: terminate failed")); + } + /* stop driver manager */ if (!driver->terminate(*config)) { throw std::runtime_error( diff --git a/daemon/session_manager.cpp b/daemon/session_manager.cpp index 770251e..2bcad4a 100644 --- a/daemon/session_manager.cpp +++ b/daemon/session_manager.cpp @@ -38,7 +38,7 @@ #include "session_manager.hpp" #include "interface.hpp" -static uint8_t get_codec_word_lenght(const std::string& codec) { +static uint8_t get_codec_word_length(const std::string& codec) { if (codec == "L16") { return 2; } @@ -103,8 +103,21 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const { return false; } break; - case 'o': - break; + case 'o': { + std::vector fields; + boost::split(fields, val, [line](char c) { return c == ' '; }); + if (fields.size() < 6) { + BOOST_LOG_TRIVIAL(warning) + << "session_manager:: invalid origin at line " << num; + } else { + info.origin.username = fields[0]; + info.origin.session_id = fields[1]; + info.origin.session_version = std::stoull(fields[2]); + info.origin.network_type = fields[3]; + info.origin.address_type = fields[4]; + info.origin.unicast_address = fields[5]; + } + } break; case 't': /* t=0 0 */ status = sdp_parser_status::time; @@ -150,7 +163,7 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const { if (info.stream.m_byPayloadType == std::stoi(fields[0])) { strncpy(info.stream.m_cCodec, fields[1].c_str(), sizeof(info.stream.m_cCodec) - 1); - info.stream.m_byWordLength = get_codec_word_lenght(fields[1]); + info.stream.m_byWordLength = get_codec_word_length(fields[1]); info.stream.m_ui32SamplingRate = std::stoul(fields[2]); if (info.stream.m_byNbOfChannels != std::stoi(fields[3])) { BOOST_LOG_TRIVIAL(warning) @@ -275,14 +288,15 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const { std::shared_ptr SessionManager::create( std::shared_ptr driver, + std::shared_ptr browser, std::shared_ptr config) { // no need to be thread-safe here static std::weak_ptr instance; if (auto ptr = instance.lock()) { return ptr; } - auto ptr = - std::shared_ptr(new SessionManager(driver, config)); + auto ptr = std::shared_ptr( + new SessionManager(driver, browser, config)); instance = ptr; return ptr; } @@ -481,7 +495,7 @@ std::error_code SessionManager::add_source(const StreamSource& source) { sizeof(info.stream.m_cName) - 1); info.stream.m_ucDSCP = source.dscp; // IPv4 DSCP info.stream.m_byPayloadType = source.payload_type; - info.stream.m_byWordLength = get_codec_word_lenght(source.codec); + info.stream.m_byWordLength = get_codec_word_length(source.codec); info.stream.m_byNbOfChannels = source.map.size(); strncpy(info.stream.m_cCodec, source.codec.c_str(), sizeof(info.stream.m_cCodec) - 1); @@ -593,6 +607,44 @@ std::string SessionManager::get_removed_source_sdp_( return sdp; } +bool SessionManager::parse_sdp_origin(const std::string sdp, + SDPOrigin& origin) const { + try { + std::stringstream sdp_string_stream(sdp); + std::string line; + while (getline(sdp_string_stream, line, '\n')) { + boost::trim(line); + if (line[1] != '=') { + BOOST_LOG_TRIVIAL(error) << "session_manager:: invalid SDP file"; + return false; + } + std::string val = line.substr(2); + switch (line[0]) { + case 'o': + std::vector fields; + boost::split(fields, val, [line](char c) { return c == ' '; }); + if (fields.size() < 6) { + BOOST_LOG_TRIVIAL(error) << "session_manager:: invalid origin"; + return false; + } + + origin.username = fields[0]; + origin.session_id = fields[1]; + origin.session_version = std::stoull(fields[2]); + origin.network_type = fields[3]; + origin.address_type = fields[4]; + origin.unicast_address = fields[5]; + return true; + } + } + } catch (...) { + BOOST_LOG_TRIVIAL(fatal) << "session_manager:: invalid SDP" + << ", cannot extract SDP identifier"; + } + + return false; +} + std::string SessionManager::get_source_sdp_(uint32_t id, const StreamInfo& info) const { std::shared_lock ptp_lock(ptp_mutex_); @@ -613,8 +665,7 @@ std::string SessionManager::get_source_sdp_(uint32_t id, ss << "v=0\n" << "o=- " << info.session_id << " " << info.session_version << " IN IP4 " << ip::address_v4(info.stream.m_ui32SrcIP).to_string() << "\n" - << "s=" << config_->get_node_id() << " " - << info.stream.m_cName << "\n" + << "s=" << config_->get_node_id() << " " << info.stream.m_cName << "\n" << "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string(); if (IN_MULTICAST(info.stream.m_ui32DestIP)) { ss << "/" << static_cast(info.stream.m_byTTL); @@ -642,8 +693,8 @@ std::string SessionManager::get_source_sdp_(uint32_t id, if (info.refclk_ptp_traceable) { ss << "traceable\n"; } else { - ss << ptp_status_.gmid << ":" - << static_cast(ptp_config_.domain) << "\n"; + ss << ptp_status_.gmid << ":" << static_cast(ptp_config_.domain) + << "\n"; } ss << "a=recvonly\n"; @@ -1009,6 +1060,47 @@ size_t SessionManager::process_sap() { return sdp_len_sum; } +std::list SessionManager::get_updated_sinks( + const std::list& sources_list) { + std::list sinks_list; + std::shared_lock sinks_lock(sinks_mutex_); + for (auto const& [id, info] : sinks_) { + uint64_t newVersion{0}; + StreamSink sink{get_sink_(id, info)}; + for (auto& source : sources_list) { + SDPOrigin source_sdp_origin; + if (!parse_sdp_origin(source.sdp, source_sdp_origin)) + continue; + + if (sinks_[sink.id].origin == source_sdp_origin && + sink.sdp != source.sdp && + sinks_[sink.id].origin.session_version < + source_sdp_origin.session_version && + newVersion < source_sdp_origin.session_version) { + newVersion = source_sdp_origin.session_version; + sink.sdp = source.sdp; + } + } + + if (newVersion) { + // Re-add sink with new SDP, since the sink.id is the same there will be + // an update + BOOST_LOG_TRIVIAL(info) + << "session_manager:: sink " << std::to_string(sink.id) + << " SDP change detected version " << newVersion << " updating"; + sinks_list.emplace_back(sink); + } + } + return sinks_list; +} + +void SessionManager::update_sinks(const std::list& sources_list) { + auto sinks_list = get_updated_sinks(sources_list); + for (auto& sink : sinks_list) { + add_sink(sink); + } +} + void SessionManager::on_update_sources() { // trigger sources SDP file update sources_mutex_.lock(); @@ -1039,8 +1131,9 @@ void SessionManager::on_ptp_status_changed(const std::string& status) const { for (int i = STDERR_FILENO + 1; i < fdlimit; i++) close(i); - char* argv_list[] = {const_cast(config_->get_ptp_status_script().c_str()), - const_cast(status.c_str()), NULL}; + char* argv_list[] = { + const_cast(config_->get_ptp_status_script().c_str()), + const_cast(status.c_str()), NULL}; execv(config_->get_ptp_status_script().c_str(), argv_list); exit(0); @@ -1157,6 +1250,11 @@ bool SessionManager::worker() { << sap_interval << " secs"; } + /* Use a newer version of source if the current version isn't available + * anymore. This typically happens when equipment is restarted. */ + std::list remote_sources = browser_->get_remote_sources(); + update_sinks(remote_sources); + std::this_thread::sleep_for(std::chrono::seconds(1)); } diff --git a/daemon/session_manager.hpp b/daemon/session_manager.hpp index 3fcc16f..ffc21c7 100644 --- a/daemon/session_manager.hpp +++ b/daemon/session_manager.hpp @@ -28,9 +28,27 @@ #include "config.hpp" #include "driver_interface.hpp" +#include "browser.hpp" #include "igmp.hpp" #include "sap.hpp" +struct SDPOrigin { + std::string username; + std::string session_id; + uint64_t session_version{0}; + std::string network_type; + std::string address_type; + std::string unicast_address; + + bool operator==(const SDPOrigin& rhs) const { + // session_version is not part of comparison, see RFC 4566 + return username == rhs.username && session_id == rhs.session_id && + network_type == rhs.network_type && + address_type == rhs.address_type && + unicast_address == rhs.unicast_address; + } +}; + struct StreamSource { uint8_t id{0}; bool enabled{false}; @@ -93,6 +111,7 @@ struct StreamInfo { std::string sink_sdp; uint32_t session_id{0}; uint32_t session_version{0}; + SDPOrigin origin; }; class SessionManager { @@ -101,6 +120,7 @@ class SessionManager { static std::shared_ptr create( std::shared_ptr driver, + std::shared_ptr browser, std::shared_ptr config); SessionManager() = delete; SessionManager(const SessionManager&) = delete; @@ -165,6 +185,10 @@ class SessionManager { constexpr static const char ptp_primary_mcast_addr[] = "224.0.1.129"; constexpr static const char ptp_pdelay_mcast_addr[] = "224.0.1.107"; + std::list get_updated_sinks( + const std::list& sources_list); + void update_sinks(const std::list& sources_list); + void on_add_source(const StreamSource& source, const StreamInfo& info); void on_remove_source(const StreamInfo& info); @@ -183,16 +207,22 @@ class SessionManager { StreamSource get_source_(uint8_t id, const StreamInfo& info) const; StreamSink get_sink_(uint8_t id, const StreamInfo& info) const; + bool sink_is_still_valid(const std::string sdp, + const std::list sources_list) const; + bool parse_sdp(const std::string sdp, StreamInfo& info) const; + bool parse_sdp_origin(const std::string sdp, SDPOrigin& origin) const; bool worker(); // singleton, use create() to build SessionManager(std::shared_ptr driver, + std::shared_ptr browser, std::shared_ptr config) - : driver_(driver), config_(config) { + : browser_(browser), driver_(driver), config_(config) { ptp_config_.domain = config->get_ptp_domain(); ptp_config_.dscp = config->get_ptp_dscp(); }; + std::shared_ptr browser_; std::shared_ptr driver_; std::shared_ptr config_; std::future res_;