Added handling of session ID <sess-id> and session version <sess-version> in the Origin ("o=") field of the Sources SDP files.

Session ID is the Node ID plus the source ID.
Session version is the session ID plus an increasing version identifier.
This fixes issue  and allows Dante Controller to properly handle the daemon Sources.
This commit is contained in:
Andrea Bondavalli 2020-06-27 09:32:06 +02:00
parent 3aeed93856
commit cee8823e5c
2 changed files with 49 additions and 19 deletions

@ -37,7 +37,6 @@
#include "utils.hpp" #include "utils.hpp"
#include "session_manager.hpp" #include "session_manager.hpp"
static uint8_t get_codec_word_lenght(const std::string& codec) { static uint8_t get_codec_word_lenght(const std::string& codec) {
if (codec == "L16") { if (codec == "L16") {
return 2; return 2;
@ -492,6 +491,10 @@ std::error_code SessionManager::add_source(const StreamSource& source) {
info.refclk_ptp_traceable = source.refclk_ptp_traceable; info.refclk_ptp_traceable = source.refclk_ptp_traceable;
info.enabled = source.enabled; info.enabled = source.enabled;
info.io = source.io; info.io = source.io;
auto ip_addr = htonl(config_->get_ip_addr());
info.session_id = (ip_addr << 16) + (ip_addr >> 16) + source.id;
info.session_version = info.session_id + g_session_version++;
// info.m_ui32PlayOutDelay = 0; // only for Sink // info.m_ui32PlayOutDelay = 0; // only for Sink
std::unique_lock sources_lock(sources_mutex_); std::unique_lock sources_lock(sources_mutex_);
@ -531,10 +534,14 @@ std::error_code SessionManager::add_source(const StreamSource& source) {
return ret; return ret;
} }
std::string SessionManager::get_removed_source_sdp_(uint32_t id, std::string SessionManager::get_removed_source_sdp_(
uint32_t addr) const { uint32_t id,
std::string sdp("o=- " + std::to_string(id) + " 0 IN IP4 " + uint32_t src_addr,
ip::address_v4(addr).to_string() + "\n"); uint32_t session_id,
uint32_t session_version) const {
std::string sdp("o=- " + std::to_string(session_id) + " " +
std::to_string(session_version) + " IN IP4 " +
ip::address_v4(src_addr).to_string() + "\n");
return sdp; return sdp;
} }
@ -556,7 +563,7 @@ std::string SessionManager::get_source_sdp_(uint32_t id,
// build SDP // build SDP
std::stringstream ss; std::stringstream ss;
ss << "v=0\n" ss << "v=0\n"
<< "o=- " << static_cast<unsigned>(id) << " 0 IN IP4 " << "o=- " << info.session_id << " " << info.session_version << " IN IP4 "
<< ip::address_v4(info.stream.m_ui32SrcIP).to_string() << "\n" << ip::address_v4(info.stream.m_ui32SrcIP).to_string() << "\n"
<< "s=" << get_node_id(config_->get_ip_addr()) << " " << "s=" << get_node_id(config_->get_ip_addr()) << " "
<< info.stream.m_cName << "\n" << info.stream.m_cName << "\n"
@ -874,14 +881,15 @@ size_t SessionManager::process_sap() {
for (auto const& [id, info] : sources_) { for (auto const& [id, info] : sources_) {
if (info.enabled) { if (info.enabled) {
// retrieve current active source SDP // retrieve current active source SDP
std::string sdp = get_source_sdp_(id, info); auto sdp = get_source_sdp_(id, info);
// compute source 16bit crc // compute source 16bit crc
uint16_t msg_crc = uint16_t msg_crc =
crc16(reinterpret_cast<const uint8_t*>(sdp.c_str()), sdp.length()); crc16(reinterpret_cast<const uint8_t*>(sdp.c_str()), sdp.length());
// compute source hash // compute source hash
uint32_t msg_id_hash = (static_cast<uint32_t>(id) << 16) + msg_crc; uint32_t msg_id_hash = (static_cast<uint32_t>(id) << 16) + msg_crc;
// add/update this source in the announced sources // add/update this source in the announced sources
announced_sources_[msg_id_hash] = info.stream.m_ui32RTCPSrcIP; announced_sources_[msg_id_hash] = {info.stream.m_ui32RTCPSrcIP,
info.session_id, info.session_version};
// add this source to the currently active sources // add this source to the currently active sources
active_sources.insert(msg_id_hash); active_sources.insert(msg_id_hash);
// remove this source from deleted sources (if present) // remove this source from deleted sources (if present)
@ -894,11 +902,15 @@ size_t SessionManager::process_sap() {
} }
// check for sources that are no longer announced and send deletion/s // check for sources that are no longer announced and send deletion/s
for (auto const& [msg_id_hash, src_addr] : announced_sources_) { for (auto const& [msg_id_hash, info] : announced_sources_) {
auto src_addr = std::get<0>(info);
auto session_id = std::get<1>(info);
auto session_version = std::get<2>(info);
// check if this source is no longer announced // check if this source is no longer announced
if (active_sources.find(msg_id_hash) == active_sources.end()) { if (active_sources.find(msg_id_hash) == active_sources.end()) {
// retrieve deleted source SDP // retrieve deleted source SDP
std::string sdp = get_removed_source_sdp_(msg_id_hash >> 16, src_addr); std::string sdp = get_removed_source_sdp_(msg_id_hash >> 16, src_addr,
session_id, session_version);
// send deletion for this source // send deletion for this source
sap_.deletion(static_cast<uint16_t>(msg_id_hash), src_addr, sdp); sap_.deletion(static_cast<uint16_t>(msg_id_hash), src_addr, sdp);
// update amount of byte sent // update amount of byte sent
@ -926,15 +938,18 @@ size_t SessionManager::process_sap() {
void SessionManager::on_update_sources() { void SessionManager::on_update_sources() {
// trigger sources SDP file update // trigger sources SDP file update
std::shared_lock sources_lock(sources_mutex_); sources_mutex_.lock();
for (auto const& [id, info] : sources_) { for (auto& [id, info] : sources_) {
for (auto cb : update_source_observers) { for (auto cb : update_source_observers) {
info.session_version++;
cb(id, info.stream.m_cName, get_source_sdp_(id, info)); cb(id, info.stream.m_cName, get_source_sdp_(id, info));
} }
} }
sources_mutex_.unlock();
g_session_version++;
} }
void SessionManager::on_ptp_status_locked() { void SessionManager::on_ptp_status_locked() const {
// set sample rate, this may require seconds // set sample rate, this may require seconds
(void)driver_->set_sample_rate(driver_->get_current_sample_rate()); (void)driver_->set_sample_rate(driver_->get_current_sample_rate());
} }
@ -1049,9 +1064,13 @@ bool SessionManager::worker() {
} }
// at end, send deletion for all announced sources // at end, send deletion for all announced sources
for (auto const& [msg_id_hash, src_addr] : announced_sources_) { for (auto const& [msg_id_hash, info] : announced_sources_) {
auto src_addr = std::get<0>(info);
auto session_id = std::get<1>(info);
auto session_version = std::get<2>(info);
// retrieve deleted source SDP // retrieve deleted source SDP
std::string sdp = get_removed_source_sdp_(msg_id_hash >> 16, src_addr); std::string sdp = get_removed_source_sdp_(msg_id_hash >> 16, src_addr,
session_id, session_version);
// send deletion for this source // send deletion for this source
sap_.deletion(static_cast<uint16_t>(msg_id_hash), src_addr, sdp); sap_.deletion(static_cast<uint16_t>(msg_id_hash), src_addr, sdp);
} }

@ -90,6 +90,8 @@ struct StreamInfo {
bool sink_use_sdp{true}; bool sink_use_sdp{true};
std::string sink_source; std::string sink_source;
std::string sink_sdp; std::string sink_sdp;
uint32_t session_id{0};
uint32_t session_version{0};
}; };
class SessionManager { class SessionManager {
@ -166,11 +168,14 @@ class SessionManager {
void on_add_sink(const StreamSink& sink, const StreamInfo& info); void on_add_sink(const StreamSink& sink, const StreamInfo& info);
void on_remove_sink(const StreamInfo& info); void on_remove_sink(const StreamInfo& info);
void on_ptp_status_locked(); void on_ptp_status_locked() const;
void on_update_sources(); void on_update_sources();
std::string get_removed_source_sdp_(uint32_t id, uint32_t src_addr) const; std::string get_removed_source_sdp_(uint32_t id,
uint32_t src_addr,
uint32_t session_id,
uint32_t session_version) const;
std::string get_source_sdp_(uint32_t id, const StreamInfo& info) const; std::string get_source_sdp_(uint32_t id, const StreamInfo& info) const;
StreamSource get_source_(uint8_t id, const StreamInfo& info) const; StreamSource get_source_(uint8_t id, const StreamInfo& info) const;
StreamSink get_sink_(uint8_t id, const StreamInfo& info) const; StreamSink get_sink_(uint8_t id, const StreamInfo& info) const;
@ -201,7 +206,10 @@ class SessionManager {
mutable std::shared_mutex sinks_mutex_; mutable std::shared_mutex sinks_mutex_;
/* current announced sources */ /* current announced sources */
std::map<uint32_t /* msg_id_hash */, uint32_t /* src_addr */> std::map<uint32_t /* msg_id_hash */,
std::tuple<uint32_t /* src_addr */,
uint32_t /* session_id */,
uint32_t /* session_version */> >
announced_sources_; announced_sources_;
/* number of deletions sent for a a deleted source */ /* number of deletions sent for a a deleted source */
@ -218,6 +226,9 @@ class SessionManager {
SAP sap_{config_->get_sap_mcast_addr()}; SAP sap_{config_->get_sap_mcast_addr()};
IGMP igmp_; IGMP igmp_;
/* used to handle session versioning */
inline static std::atomic<uint16_t> g_session_version{0};
}; };
#endif #endif