Implementation of automatic Sink updates based on remote source updates received by the Browser.
The daemon uses the most recent version of corresponding remote sources to update a Sync. The Originator (o=) field of the SDP is used identify the corresponding remote sources. Many thanks to @Sikabo for the original implementation !
This commit is contained in:
parent
b10350488f
commit
f826a5b2ef
@ -125,8 +125,14 @@ int main(int argc, char* argv[]) {
|
|||||||
throw std::runtime_error(std::string("DriverManager:: init failed"));
|
throw std::runtime_error(std::string("DriverManager:: init failed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* start browser */
|
||||||
|
auto browser = Browser::create(config);
|
||||||
|
if (browser == nullptr || !browser->init()) {
|
||||||
|
throw std::runtime_error(std::string("Browser:: init failed"));
|
||||||
|
}
|
||||||
|
|
||||||
/* start session manager */
|
/* start session manager */
|
||||||
auto session_manager = SessionManager::create(driver, config);
|
auto session_manager = SessionManager::create(driver, browser, config);
|
||||||
if (session_manager == nullptr || !session_manager->init()) {
|
if (session_manager == nullptr || !session_manager->init()) {
|
||||||
throw std::runtime_error(std::string("SessionManager:: init failed"));
|
throw std::runtime_error(std::string("SessionManager:: init failed"));
|
||||||
}
|
}
|
||||||
@ -143,12 +149,6 @@ int main(int argc, char* argv[]) {
|
|||||||
throw std::runtime_error(std::string("RtspServer:: init failed"));
|
throw std::runtime_error(std::string("RtspServer:: init failed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* start browser */
|
|
||||||
auto browser = Browser::create(config);
|
|
||||||
if (browser == nullptr || !browser->init()) {
|
|
||||||
throw std::runtime_error(std::string("Browser:: init failed"));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* start http server */
|
/* start http server */
|
||||||
HttpServer http_server(session_manager, browser, config);
|
HttpServer http_server(session_manager, browser, config);
|
||||||
if (!http_server.init()) {
|
if (!http_server.init()) {
|
||||||
@ -184,11 +184,6 @@ int main(int argc, char* argv[]) {
|
|||||||
throw std::runtime_error(std::string("HttpServer:: terminate failed"));
|
throw std::runtime_error(std::string("HttpServer:: terminate failed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* stop browser */
|
|
||||||
if (!browser->terminate()) {
|
|
||||||
throw std::runtime_error(std::string("Browser:: terminate failed"));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* stop rtsp server */
|
/* stop rtsp server */
|
||||||
if (!rtsp_server.terminate()) {
|
if (!rtsp_server.terminate()) {
|
||||||
throw std::runtime_error(std::string("RtspServer:: terminate failed"));
|
throw std::runtime_error(std::string("RtspServer:: terminate failed"));
|
||||||
@ -207,6 +202,11 @@ int main(int argc, char* argv[]) {
|
|||||||
std::string("SessionManager:: terminate failed"));
|
std::string("SessionManager:: terminate failed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* stop browser */
|
||||||
|
if (!browser->terminate()) {
|
||||||
|
throw std::runtime_error(std::string("Browser:: terminate failed"));
|
||||||
|
}
|
||||||
|
|
||||||
/* stop driver manager */
|
/* stop driver manager */
|
||||||
if (!driver->terminate(*config)) {
|
if (!driver->terminate(*config)) {
|
||||||
throw std::runtime_error(
|
throw std::runtime_error(
|
||||||
|
@ -38,7 +38,7 @@
|
|||||||
#include "session_manager.hpp"
|
#include "session_manager.hpp"
|
||||||
#include "interface.hpp"
|
#include "interface.hpp"
|
||||||
|
|
||||||
static uint8_t get_codec_word_lenght(const std::string& codec) {
|
static uint8_t get_codec_word_length(const std::string& codec) {
|
||||||
if (codec == "L16") {
|
if (codec == "L16") {
|
||||||
return 2;
|
return 2;
|
||||||
}
|
}
|
||||||
@ -103,8 +103,21 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case 'o':
|
case 'o': {
|
||||||
break;
|
std::vector<std::string> fields;
|
||||||
|
boost::split(fields, val, [line](char c) { return c == ' '; });
|
||||||
|
if (fields.size() < 6) {
|
||||||
|
BOOST_LOG_TRIVIAL(warning)
|
||||||
|
<< "session_manager:: invalid origin at line " << num;
|
||||||
|
} else {
|
||||||
|
info.origin.username = fields[0];
|
||||||
|
info.origin.session_id = fields[1];
|
||||||
|
info.origin.session_version = std::stoull(fields[2]);
|
||||||
|
info.origin.network_type = fields[3];
|
||||||
|
info.origin.address_type = fields[4];
|
||||||
|
info.origin.unicast_address = fields[5];
|
||||||
|
}
|
||||||
|
} break;
|
||||||
case 't':
|
case 't':
|
||||||
/* t=0 0 */
|
/* t=0 0 */
|
||||||
status = sdp_parser_status::time;
|
status = sdp_parser_status::time;
|
||||||
@ -150,7 +163,7 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const {
|
|||||||
if (info.stream.m_byPayloadType == std::stoi(fields[0])) {
|
if (info.stream.m_byPayloadType == std::stoi(fields[0])) {
|
||||||
strncpy(info.stream.m_cCodec, fields[1].c_str(),
|
strncpy(info.stream.m_cCodec, fields[1].c_str(),
|
||||||
sizeof(info.stream.m_cCodec) - 1);
|
sizeof(info.stream.m_cCodec) - 1);
|
||||||
info.stream.m_byWordLength = get_codec_word_lenght(fields[1]);
|
info.stream.m_byWordLength = get_codec_word_length(fields[1]);
|
||||||
info.stream.m_ui32SamplingRate = std::stoul(fields[2]);
|
info.stream.m_ui32SamplingRate = std::stoul(fields[2]);
|
||||||
if (info.stream.m_byNbOfChannels != std::stoi(fields[3])) {
|
if (info.stream.m_byNbOfChannels != std::stoi(fields[3])) {
|
||||||
BOOST_LOG_TRIVIAL(warning)
|
BOOST_LOG_TRIVIAL(warning)
|
||||||
@ -275,14 +288,15 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const {
|
|||||||
|
|
||||||
std::shared_ptr<SessionManager> SessionManager::create(
|
std::shared_ptr<SessionManager> SessionManager::create(
|
||||||
std::shared_ptr<DriverManager> driver,
|
std::shared_ptr<DriverManager> driver,
|
||||||
|
std::shared_ptr<Browser> browser,
|
||||||
std::shared_ptr<Config> config) {
|
std::shared_ptr<Config> config) {
|
||||||
// no need to be thread-safe here
|
// no need to be thread-safe here
|
||||||
static std::weak_ptr<SessionManager> instance;
|
static std::weak_ptr<SessionManager> instance;
|
||||||
if (auto ptr = instance.lock()) {
|
if (auto ptr = instance.lock()) {
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
auto ptr =
|
auto ptr = std::shared_ptr<SessionManager>(
|
||||||
std::shared_ptr<SessionManager>(new SessionManager(driver, config));
|
new SessionManager(driver, browser, config));
|
||||||
instance = ptr;
|
instance = ptr;
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
@ -481,7 +495,7 @@ std::error_code SessionManager::add_source(const StreamSource& source) {
|
|||||||
sizeof(info.stream.m_cName) - 1);
|
sizeof(info.stream.m_cName) - 1);
|
||||||
info.stream.m_ucDSCP = source.dscp; // IPv4 DSCP
|
info.stream.m_ucDSCP = source.dscp; // IPv4 DSCP
|
||||||
info.stream.m_byPayloadType = source.payload_type;
|
info.stream.m_byPayloadType = source.payload_type;
|
||||||
info.stream.m_byWordLength = get_codec_word_lenght(source.codec);
|
info.stream.m_byWordLength = get_codec_word_length(source.codec);
|
||||||
info.stream.m_byNbOfChannels = source.map.size();
|
info.stream.m_byNbOfChannels = source.map.size();
|
||||||
strncpy(info.stream.m_cCodec, source.codec.c_str(),
|
strncpy(info.stream.m_cCodec, source.codec.c_str(),
|
||||||
sizeof(info.stream.m_cCodec) - 1);
|
sizeof(info.stream.m_cCodec) - 1);
|
||||||
@ -593,6 +607,44 @@ std::string SessionManager::get_removed_source_sdp_(
|
|||||||
return sdp;
|
return sdp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool SessionManager::parse_sdp_origin(const std::string sdp,
|
||||||
|
SDPOrigin& origin) const {
|
||||||
|
try {
|
||||||
|
std::stringstream sdp_string_stream(sdp);
|
||||||
|
std::string line;
|
||||||
|
while (getline(sdp_string_stream, line, '\n')) {
|
||||||
|
boost::trim(line);
|
||||||
|
if (line[1] != '=') {
|
||||||
|
BOOST_LOG_TRIVIAL(error) << "session_manager:: invalid SDP file";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
std::string val = line.substr(2);
|
||||||
|
switch (line[0]) {
|
||||||
|
case 'o':
|
||||||
|
std::vector<std::string> fields;
|
||||||
|
boost::split(fields, val, [line](char c) { return c == ' '; });
|
||||||
|
if (fields.size() < 6) {
|
||||||
|
BOOST_LOG_TRIVIAL(error) << "session_manager:: invalid origin";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
origin.username = fields[0];
|
||||||
|
origin.session_id = fields[1];
|
||||||
|
origin.session_version = std::stoull(fields[2]);
|
||||||
|
origin.network_type = fields[3];
|
||||||
|
origin.address_type = fields[4];
|
||||||
|
origin.unicast_address = fields[5];
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
BOOST_LOG_TRIVIAL(fatal) << "session_manager:: invalid SDP"
|
||||||
|
<< ", cannot extract SDP identifier";
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
std::string SessionManager::get_source_sdp_(uint32_t id,
|
std::string SessionManager::get_source_sdp_(uint32_t id,
|
||||||
const StreamInfo& info) const {
|
const StreamInfo& info) const {
|
||||||
std::shared_lock ptp_lock(ptp_mutex_);
|
std::shared_lock ptp_lock(ptp_mutex_);
|
||||||
@ -613,8 +665,7 @@ std::string SessionManager::get_source_sdp_(uint32_t id,
|
|||||||
ss << "v=0\n"
|
ss << "v=0\n"
|
||||||
<< "o=- " << info.session_id << " " << info.session_version << " IN IP4 "
|
<< "o=- " << info.session_id << " " << info.session_version << " IN IP4 "
|
||||||
<< ip::address_v4(info.stream.m_ui32SrcIP).to_string() << "\n"
|
<< ip::address_v4(info.stream.m_ui32SrcIP).to_string() << "\n"
|
||||||
<< "s=" << config_->get_node_id() << " "
|
<< "s=" << config_->get_node_id() << " " << info.stream.m_cName << "\n"
|
||||||
<< info.stream.m_cName << "\n"
|
|
||||||
<< "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string();
|
<< "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string();
|
||||||
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
|
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
|
||||||
ss << "/" << static_cast<unsigned>(info.stream.m_byTTL);
|
ss << "/" << static_cast<unsigned>(info.stream.m_byTTL);
|
||||||
@ -642,8 +693,8 @@ std::string SessionManager::get_source_sdp_(uint32_t id,
|
|||||||
if (info.refclk_ptp_traceable) {
|
if (info.refclk_ptp_traceable) {
|
||||||
ss << "traceable\n";
|
ss << "traceable\n";
|
||||||
} else {
|
} else {
|
||||||
ss << ptp_status_.gmid << ":"
|
ss << ptp_status_.gmid << ":" << static_cast<unsigned>(ptp_config_.domain)
|
||||||
<< static_cast<unsigned>(ptp_config_.domain) << "\n";
|
<< "\n";
|
||||||
}
|
}
|
||||||
ss << "a=recvonly\n";
|
ss << "a=recvonly\n";
|
||||||
|
|
||||||
@ -1009,6 +1060,47 @@ size_t SessionManager::process_sap() {
|
|||||||
return sdp_len_sum;
|
return sdp_len_sum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::list<StreamSink> SessionManager::get_updated_sinks(
|
||||||
|
const std::list<RemoteSource>& sources_list) {
|
||||||
|
std::list<StreamSink> sinks_list;
|
||||||
|
std::shared_lock sinks_lock(sinks_mutex_);
|
||||||
|
for (auto const& [id, info] : sinks_) {
|
||||||
|
uint64_t newVersion{0};
|
||||||
|
StreamSink sink{get_sink_(id, info)};
|
||||||
|
for (auto& source : sources_list) {
|
||||||
|
SDPOrigin source_sdp_origin;
|
||||||
|
if (!parse_sdp_origin(source.sdp, source_sdp_origin))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (sinks_[sink.id].origin == source_sdp_origin &&
|
||||||
|
sink.sdp != source.sdp &&
|
||||||
|
sinks_[sink.id].origin.session_version <
|
||||||
|
source_sdp_origin.session_version &&
|
||||||
|
newVersion < source_sdp_origin.session_version) {
|
||||||
|
newVersion = source_sdp_origin.session_version;
|
||||||
|
sink.sdp = source.sdp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newVersion) {
|
||||||
|
// Re-add sink with new SDP, since the sink.id is the same there will be
|
||||||
|
// an update
|
||||||
|
BOOST_LOG_TRIVIAL(info)
|
||||||
|
<< "session_manager:: sink " << std::to_string(sink.id)
|
||||||
|
<< " SDP change detected version " << newVersion << " updating";
|
||||||
|
sinks_list.emplace_back(sink);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sinks_list;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SessionManager::update_sinks(const std::list<RemoteSource>& sources_list) {
|
||||||
|
auto sinks_list = get_updated_sinks(sources_list);
|
||||||
|
for (auto& sink : sinks_list) {
|
||||||
|
add_sink(sink);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void SessionManager::on_update_sources() {
|
void SessionManager::on_update_sources() {
|
||||||
// trigger sources SDP file update
|
// trigger sources SDP file update
|
||||||
sources_mutex_.lock();
|
sources_mutex_.lock();
|
||||||
@ -1039,7 +1131,8 @@ void SessionManager::on_ptp_status_changed(const std::string& status) const {
|
|||||||
for (int i = STDERR_FILENO + 1; i < fdlimit; i++)
|
for (int i = STDERR_FILENO + 1; i < fdlimit; i++)
|
||||||
close(i);
|
close(i);
|
||||||
|
|
||||||
char* argv_list[] = {const_cast<char*>(config_->get_ptp_status_script().c_str()),
|
char* argv_list[] = {
|
||||||
|
const_cast<char*>(config_->get_ptp_status_script().c_str()),
|
||||||
const_cast<char*>(status.c_str()), NULL};
|
const_cast<char*>(status.c_str()), NULL};
|
||||||
|
|
||||||
execv(config_->get_ptp_status_script().c_str(), argv_list);
|
execv(config_->get_ptp_status_script().c_str(), argv_list);
|
||||||
@ -1157,6 +1250,11 @@ bool SessionManager::worker() {
|
|||||||
<< sap_interval << " secs";
|
<< sap_interval << " secs";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Use a newer version of source if the current version isn't available
|
||||||
|
* anymore. This typically happens when equipment is restarted. */
|
||||||
|
std::list<RemoteSource> remote_sources = browser_->get_remote_sources();
|
||||||
|
update_sinks(remote_sources);
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,9 +28,27 @@
|
|||||||
|
|
||||||
#include "config.hpp"
|
#include "config.hpp"
|
||||||
#include "driver_interface.hpp"
|
#include "driver_interface.hpp"
|
||||||
|
#include "browser.hpp"
|
||||||
#include "igmp.hpp"
|
#include "igmp.hpp"
|
||||||
#include "sap.hpp"
|
#include "sap.hpp"
|
||||||
|
|
||||||
|
struct SDPOrigin {
|
||||||
|
std::string username;
|
||||||
|
std::string session_id;
|
||||||
|
uint64_t session_version{0};
|
||||||
|
std::string network_type;
|
||||||
|
std::string address_type;
|
||||||
|
std::string unicast_address;
|
||||||
|
|
||||||
|
bool operator==(const SDPOrigin& rhs) const {
|
||||||
|
// session_version is not part of comparison, see RFC 4566
|
||||||
|
return username == rhs.username && session_id == rhs.session_id &&
|
||||||
|
network_type == rhs.network_type &&
|
||||||
|
address_type == rhs.address_type &&
|
||||||
|
unicast_address == rhs.unicast_address;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
struct StreamSource {
|
struct StreamSource {
|
||||||
uint8_t id{0};
|
uint8_t id{0};
|
||||||
bool enabled{false};
|
bool enabled{false};
|
||||||
@ -93,6 +111,7 @@ struct StreamInfo {
|
|||||||
std::string sink_sdp;
|
std::string sink_sdp;
|
||||||
uint32_t session_id{0};
|
uint32_t session_id{0};
|
||||||
uint32_t session_version{0};
|
uint32_t session_version{0};
|
||||||
|
SDPOrigin origin;
|
||||||
};
|
};
|
||||||
|
|
||||||
class SessionManager {
|
class SessionManager {
|
||||||
@ -101,6 +120,7 @@ class SessionManager {
|
|||||||
|
|
||||||
static std::shared_ptr<SessionManager> create(
|
static std::shared_ptr<SessionManager> create(
|
||||||
std::shared_ptr<DriverManager> driver,
|
std::shared_ptr<DriverManager> driver,
|
||||||
|
std::shared_ptr<Browser> browser,
|
||||||
std::shared_ptr<Config> config);
|
std::shared_ptr<Config> config);
|
||||||
SessionManager() = delete;
|
SessionManager() = delete;
|
||||||
SessionManager(const SessionManager&) = delete;
|
SessionManager(const SessionManager&) = delete;
|
||||||
@ -165,6 +185,10 @@ class SessionManager {
|
|||||||
constexpr static const char ptp_primary_mcast_addr[] = "224.0.1.129";
|
constexpr static const char ptp_primary_mcast_addr[] = "224.0.1.129";
|
||||||
constexpr static const char ptp_pdelay_mcast_addr[] = "224.0.1.107";
|
constexpr static const char ptp_pdelay_mcast_addr[] = "224.0.1.107";
|
||||||
|
|
||||||
|
std::list<StreamSink> get_updated_sinks(
|
||||||
|
const std::list<RemoteSource>& sources_list);
|
||||||
|
void update_sinks(const std::list<RemoteSource>& sources_list);
|
||||||
|
|
||||||
void on_add_source(const StreamSource& source, const StreamInfo& info);
|
void on_add_source(const StreamSource& source, const StreamInfo& info);
|
||||||
void on_remove_source(const StreamInfo& info);
|
void on_remove_source(const StreamInfo& info);
|
||||||
|
|
||||||
@ -183,16 +207,22 @@ class SessionManager {
|
|||||||
StreamSource get_source_(uint8_t id, const StreamInfo& info) const;
|
StreamSource get_source_(uint8_t id, const StreamInfo& info) const;
|
||||||
StreamSink get_sink_(uint8_t id, const StreamInfo& info) const;
|
StreamSink get_sink_(uint8_t id, const StreamInfo& info) const;
|
||||||
|
|
||||||
|
bool sink_is_still_valid(const std::string sdp,
|
||||||
|
const std::list<RemoteSource> sources_list) const;
|
||||||
|
|
||||||
bool parse_sdp(const std::string sdp, StreamInfo& info) const;
|
bool parse_sdp(const std::string sdp, StreamInfo& info) const;
|
||||||
|
bool parse_sdp_origin(const std::string sdp, SDPOrigin& origin) const;
|
||||||
bool worker();
|
bool worker();
|
||||||
// singleton, use create() to build
|
// singleton, use create() to build
|
||||||
SessionManager(std::shared_ptr<DriverManager> driver,
|
SessionManager(std::shared_ptr<DriverManager> driver,
|
||||||
|
std::shared_ptr<Browser> browser,
|
||||||
std::shared_ptr<Config> config)
|
std::shared_ptr<Config> config)
|
||||||
: driver_(driver), config_(config) {
|
: browser_(browser), driver_(driver), config_(config) {
|
||||||
ptp_config_.domain = config->get_ptp_domain();
|
ptp_config_.domain = config->get_ptp_domain();
|
||||||
ptp_config_.dscp = config->get_ptp_dscp();
|
ptp_config_.dscp = config->get_ptp_dscp();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
std::shared_ptr<Browser> browser_;
|
||||||
std::shared_ptr<DriverManager> driver_;
|
std::shared_ptr<DriverManager> driver_;
|
||||||
std::shared_ptr<Config> config_;
|
std::shared_ptr<Config> config_;
|
||||||
std::future<bool> res_;
|
std::future<bool> res_;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user