Merge pull request #111 from bondagit/daemon_v1.6.1

Daemon v1.6.1
This commit is contained in:
Andrea Bondavalli 2023-02-03 15:51:50 +01:00 committed by GitHub
commit 407a17d0e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 412 additions and 71 deletions

View File

@ -50,10 +50,11 @@ The daemon can be cross-compiled for multiple platforms and implements the follo
* control and configuration of up to 64 multicast and unicast sources and sinks using the ALSA RAVENNA/AES67 driver via netlink
* session handling and SDP parsing and creation
* HTTP REST API for the daemon control and configuration
* SAP sources discovery and advertisement compatible with AES67 standard
* SAP sources discovery, update and advertisement compatible with AES67 standard
* mDNS sources discovery and advertisement (using Linux Avahi) compatible with Ravenna standard
* RTSP client and server to retrieve, return and update SDP files via DESCRIBE and ANNOUNCE methods according to Ravenna standard
* IGMP handling for SAP, PTP and RTP sessions
* Integration with systemd watchdog monitoring (from daemon release v1.6)
The directory also contains the daemon regression tests in the [tests](daemon/tests) subdirectory.
See the [README](daemon/README.md) file in this directory for additional information about the AES67 daemon configuration and the HTTP REST API.
@ -113,6 +114,45 @@ The [aes67-daemon branch of ravenna-alsa-lkm repository](https://github.com/bond
See [ALSA RAVENNA/AES67 Driver README](https://github.com/bondagit/aes67-linux-daemon/blob/master/README.md) for additional information about the Merging Technologies module and for proper Linux Kernel configuration and tuning.
### [systemd](systemd) directory ###
This directory contains systemd configuration files for the daemon.
The daemon integrates with systemd watchdog. To enable it recompile it with the option _-DWITH_SYSTEMD=ON_
You can install the daemon under systemd using the following commands:
sudo useradd -M -l aes67-daemon -c "AES67 Linux daemon"
sudo cp daemon/aes67-daemon /usr/local/bin/aes67-daemon
sudo cp daemon/daemon.conf /etc
sudo cp systemd/aes67-daemon.service /etc/systemd/system
sudo systemctl enable aes67-daemon
systemctl daemon-reexec
To start the daemon use:
sudo systemctl start aes67-daemon
To stop it use:
sudo systemctl stop aes67-daemon
The daemon requires the RAVENNA module to be loaded.
You can install the module on Ubuntu distro using the following commands:
cd 3rdparty/ravenna-alsa-lkm/driver
sudo make modules_install
If this doesn't work because you miss kernel certificate follow the instructions at: [No OpenSSL sign-file signing_key.pem](https://superuser.com/questions/1214116/no-openssl-sign-file-signing-key-pem-leads-to-error-while-loading-kernel-modules)
Finally use the command to probe the modules:
sudo depmod -a
### [test](test) directory ###
This directory contains the files used to run the daemon platform compatibility test on the network loopback interface. The [test](#test) is described below.

View File

@ -43,7 +43,7 @@ cd ..
cd daemon
echo "Building aes67-daemon ..."
cmake -DCPP_HTTPLIB_DIR="$TOPDIR"/3rdparty/cpp-httplib -DRAVENNA_ALSA_LKM_DIR="$TOPDIR"/3rdparty/ravenna-alsa-lkm -DENABLE_TESTS=ON -DWITH_AVAHI=ON -DFAKE_DRIVER=OFF .
cmake -DCPP_HTTPLIB_DIR="$TOPDIR"/3rdparty/cpp-httplib -DRAVENNA_ALSA_LKM_DIR="$TOPDIR"/3rdparty/ravenna-alsa-lkm -DENABLE_TESTS=ON -DWITH_AVAHI=ON -DFAKE_DRIVER=OFF -DWITH_SYSTEMD=OFF .
make
cd ..

View File

@ -11,6 +11,8 @@ option(WITH_AVAHI "Include mDNS support via Avahi" OFF)
option(FAKE_DRIVER "Use fake driver instead of RAVENNA" OFF)
set(CMAKE_CXX_STANDARD 17)
option(WITH_SYSTEMD "Include systemd notify and watchdog support" OFF)
# ravena lkm _should_ be provided by the CLI. Nonetheless, we should be able
# to find it in system dirs too...
if (NOT RAVENNNA_ALSA_LKM_DIR)
@ -54,3 +56,9 @@ if(WITH_AVAHI)
include_directories(aes67-daemon ${AVAHI_INCLUDE_DIRS})
target_link_libraries(aes67-daemon ${AVAHI_LIBRARIES})
endif()
if(WITH_SYSTEMD)
MESSAGE(STATUS "WITH_SYSTEMD")
add_definitions(-D_USE_SYSTEMD_)
target_link_libraries(aes67-daemon systemd)
endif()

View File

@ -12,6 +12,7 @@ The daemon is responsible for:
* mDNS sources discovery and advertisement (using Linux Avahi) compatible with Ravenna standard
* RTSP client and server to retrieve, return and update SDP files via DESCRIBE and ANNOUNCE methods according to Ravenna standard
* IGMP handling for SAP, PTP and RTP sessions
* automatic update of Sinks based on discovered mDNS/SAP remote sources
## Configuration file ##
@ -187,7 +188,8 @@ Example
"ip_addr": "127.0.0.1",
"node_id": "AES67 daemon d9aca383",
"custom_node_id": "",
"ptp_status_script": "./scripts/ptp_status.sh"
"ptp_status_script": "./scripts/ptp_status.sh",
"auto_sinks_update": true
}
where:
@ -269,6 +271,10 @@ where:
> JSON string specifying the unique node identifier used to identify mDNS, SAP and SDP services announced by the daemon.
> **_NOTE:_** This parameter is read-only and cannot be set. The server will determine the node id at startup time.
> **auto\_sinks\_update**
> JSON boolean specifying whether to enable or disable the automatic update of the configured Sinks.
> When enabled the daemon will automatically update the configured Sinks according to the discovered remote sources via SAP and mDNS/RTSP updates. The SDP Originator (o=) is used to match a Sink with the remote source/s.
> **custom\_node\_id**
> JSON string specifying a custom node identifier used to identify mDNS, SAP and SDP services announced by the daemon.
> When this parameter is empty the *node_id* is automatically generated by the daemon based on the current IP address.

View File

@ -19,7 +19,6 @@
#include <boost/algorithm/string.hpp>
#include "utils.hpp"
#include "browser.hpp"
using namespace boost::algorithm;
@ -73,22 +72,22 @@ bool Browser::worker() {
BOOST_LOG_TRIVIAL(debug) << "browser:: received SAP message for " << id;
std::unique_lock sources_lock(sources_mutex_);
last_update_ =
duration_cast<second_t>(steady_clock::now() - startup_).count();
auto it = sources_.get<id_tag>().find(id);
if (it == sources_.end()) {
// Source is not in the map
if (is_announce) {
// annoucement, add new source
sources_.insert(
{id,
sources_.insert({id,
"SAP",
ip::address_v4(ntohl(addr)).to_string(),
sdp_get_subject(sdp),
{},
sdp_get_origin(sdp),
sdp,
static_cast<uint32_t>(
duration_cast<second_t>(steady_clock::now() - startup_)
.count()),
last_update_,
config_->get_sap_interval()});
}
} else {
@ -97,12 +96,10 @@ bool Browser::worker() {
BOOST_LOG_TRIVIAL(debug)
<< "browser:: refreshing SAP source " << it->id;
// annoucement, update last seen and announce period
uint32_t last_seen =
duration_cast<second_t>(steady_clock::now() - startup_).count();
auto upd_source{*it};
if ((last_seen - upd_source.last_seen) != 0) {
upd_source.announce_period = last_seen - upd_source.last_seen;
upd_source.last_seen = last_seen;
if ((last_update_ - upd_source.last_seen) != 0) {
upd_source.announce_period = last_update_ - upd_source.last_seen;
upd_source.last_seen = last_update_;
sources_.replace(it, upd_source);
}
} else {
@ -130,6 +127,8 @@ bool Browser::worker() {
BOOST_LOG_TRIVIAL(info)
<< "browser:: SAP source " << it->id << " timeout";
it = sources_.erase(it);
last_update_ =
duration_cast<second_t>(steady_clock::now() - startup_).count();
} else {
it++;
}
@ -150,9 +149,9 @@ bool Browser::worker() {
void Browser::on_change_rtsp_source(const std::string& name,
const std::string& domain,
const RtspSource& s) {
uint32_t last_seen =
duration_cast<second_t>(steady_clock::now() - startup_).count();
std::unique_lock sources_lock(sources_mutex_);
last_update_ =
duration_cast<second_t>(steady_clock::now() - startup_).count();
/* search by name */
auto rng = sources_.get<name_tag>().equal_range(name);
while (rng.first != rng.second) {
@ -163,9 +162,10 @@ void Browser::on_change_rtsp_source(const std::string& name,
<< " name " << name << " domain " << domain;
auto upd_source{*it};
upd_source.id = s.id;
upd_source.sdp = s.sdp;
upd_source.address = s.address;
upd_source.last_seen = last_seen;
upd_source.origin = sdp_get_origin(s.sdp);
upd_source.sdp = s.sdp;
upd_source.last_seen = last_update_;
sources_.get<name_tag>().replace(it, upd_source);
return;
}
@ -174,8 +174,8 @@ void Browser::on_change_rtsp_source(const std::string& name,
/* entry not found -> add */
BOOST_LOG_TRIVIAL(info) << "browser:: adding RTSP source " << s.id << " name "
<< name << " domain " << domain;
sources_.insert(
{s.id, s.source, s.address, name, domain, s.sdp, last_seen, 0});
sources_.insert({s.id, s.source, s.address, name, domain,
sdp_get_origin(s.sdp), s.sdp, last_update_, 0});
}
void Browser::on_remove_rtsp_source(const std::string& name,
@ -190,6 +190,8 @@ void Browser::on_remove_rtsp_source(const std::string& name,
<< "browser:: removing RTSP source " << it->id << " name " << it->name
<< " domain " << it->domain;
name_idx.erase(it);
last_update_ =
duration_cast<second_t>(steady_clock::now() - startup_).count();
break;
}
++rng.first;
@ -205,6 +207,7 @@ bool Browser::init() {
running_ = true;
res_ = std::async(std::launch::async, &Browser::worker, this);
}
last_update_ = 0;
return true;
}

View File

@ -35,6 +35,7 @@
#include "igmp.hpp"
#include "mdns_client.hpp"
#include "sap.hpp"
#include "utils.hpp"
using namespace boost::multi_index;
@ -44,6 +45,7 @@ struct RemoteSource {
std::string address;
std::string name;
std::string domain; /* mDNS only */
SDPOrigin origin;
std::string sdp;
uint32_t last_seen{0}; /* seconds from daemon startup */
uint32_t announce_period{0}; /* period between annoucements */
@ -59,6 +61,7 @@ class Browser : public MDNSClient {
bool init() override;
bool terminate() override;
uint32_t get_last_update_ts() const { return last_update_; }
std::list<RemoteSource> get_remote_sources(
const std::string& source = "all") const;
@ -97,6 +100,7 @@ class Browser : public MDNSClient {
SAP sap_{config_->get_sap_mcast_addr()};
IGMP igmp_;
std::chrono::time_point<std::chrono::steady_clock> startup_;
uint32_t last_update_{0}; /* seconds from daemon startup */
};
#endif

View File

@ -54,6 +54,7 @@ class Config {
const std::string& get_config_filename() const { return config_filename_; };
const std::string& get_custom_node_id() const { return custom_node_id_; };
std::string get_node_id() const;
bool get_auto_sinks_update() const { return auto_sinks_update_; };
/* attributes set during init */
const std::array<uint8_t, 6>& get_mac_addr() const { return mac_addr_; };
@ -122,6 +123,9 @@ class Config {
void set_custom_node_id(const std::string& node_id) {
custom_node_id_ = node_id;
};
void set_auto_sinks_update(bool auto_sinks_update) {
auto_sinks_update_ = auto_sinks_update;
};
void set_driver_restart(bool restart) { driver_restart_ = restart; }
friend bool operator!=(const Config& lhs, const Config& rhs) {
@ -144,6 +148,7 @@ class Config {
lhs.get_status_file() != rhs.get_status_file() ||
lhs.get_interface_name() != rhs.get_interface_name() ||
lhs.get_mdns_enabled() != rhs.get_mdns_enabled() ||
lhs.get_auto_sinks_update() != rhs.get_auto_sinks_update() ||
lhs.get_custom_node_id() != rhs.get_custom_node_id();
};
friend bool operator==(const Config& lhs, const Config& rhs) {
@ -174,6 +179,7 @@ class Config {
std::string ptp_status_script_;
std::string custom_node_id_;
std::string node_id_;
bool auto_sinks_update_{true};
/* set during init */
std::array<uint8_t, 6> mac_addr_{0, 0, 0, 0, 0, 0};

View File

@ -19,5 +19,6 @@
"interface_name": "lo",
"mdns_enabled": true,
"custom_node_id": "",
"ptp_status_script": "./scripts/ptp_status.sh"
"ptp_status_script": "./scripts/ptp_status.sh",
"auto_sinks_update": true
}

View File

@ -29,7 +29,7 @@
#include "json.hpp"
static inline std::string remove_undesired_chars(const std::string& s) {
std::regex html_regex("[^ A-Za-z0-9:~._/=%\()\\r\\n\\t\?#-]?");
std::regex html_regex("[^ A-Za-z0-9:~.,_/=%\()\\r\\n\\t\?#-]?");
return std::regex_replace(s, html_regex, "");
}
@ -99,13 +99,16 @@ std::string config_to_json(const Config& config) {
<< ",\n \"interface_name\": \""
<< escape_json(config.get_interface_name()) << "\""
<< ",\n \"mdns_enabled\": " << std::boolalpha << config.get_mdns_enabled()
<< ",\n \"custom_node_id\": \"" << escape_json(config.get_custom_node_id()) << "\""
<< ",\n \"custom_node_id\": \""
<< escape_json(config.get_custom_node_id()) << "\""
<< ",\n \"node_id\": \"" << escape_json(config.get_node_id()) << "\""
<< ",\n \"ptp_status_script\": \""
<< escape_json(config.get_ptp_status_script()) << "\""
<< ",\n \"mac_addr\": \"" << escape_json(config.get_mac_addr_str()) << "\""
<< ",\n \"mac_addr\": \"" << escape_json(config.get_mac_addr_str())
<< "\""
<< ",\n \"ip_addr\": \"" << escape_json(config.get_ip_addr_str()) << "\""
<< "\n}\n";
<< ",\n \"auto_sinks_update\": " << std::boolalpha
<< config.get_auto_sinks_update() << "\n}\n";
return ss.str();
}
@ -327,6 +330,8 @@ Config json_to_config_(std::istream& js, Config& config) {
} else if (key == "custom_node_id") {
config.set_custom_node_id(
remove_undesired_chars(val.get_value<std::string>()));
} else if (key == "auto_sinks_update") {
config.set_auto_sinks_update(val.get_value<bool>());
} else if (key == "mac_addr" || key == "ip_addr" || key == "node_id") {
/* ignored */
} else {

View File

@ -31,11 +31,15 @@
#include "rtsp_server.hpp"
#include "session_manager.hpp"
#ifdef _USE_SYSTEMD_
#include <systemd/sd-daemon.h>
#endif
namespace po = boost::program_options;
namespace postyle = boost::program_options::command_line_style;
namespace logging = boost::log;
static std::string version("bondagit-1.5.3");
static std::string version("bondagit-1.6.1");
static std::atomic<bool> terminate = false;
void termination_handler(int signum) {
@ -64,6 +68,11 @@ int main(int argc, char* argv[]) {
int unix_style = postyle::unix_style | postyle::short_allow_next;
bool driver_restart(true);
#ifdef _USE_SYSTEMD_
// with which interval we should pet the dog
uint64_t current_watchdog_usec;
#endif
po::variables_map vm;
try {
po::store(po::command_line_parser(argc, argv)
@ -98,6 +107,17 @@ int main(int argc, char* argv[]) {
std::string filename = vm["config"].as<std::string>();
#ifdef _USE_SYSTEMD_
sd_watchdog_enabled(0, &current_watchdog_usec);
if (current_watchdog_usec > 0) {
// Inform systemd that if we're not petting the dog in 5s we're bust.
sd_notify(0, "WATCHDOG_USEC=5000000");
current_watchdog_usec = 5000000;
}
#endif
while (!is_terminated() && rc == EXIT_SUCCESS) {
/* load configuration from file */
auto config = Config::parse(filename, driver_restart);
@ -112,6 +132,11 @@ int main(int argc, char* argv[]) {
log_init(*config);
if (config->get_ip_addr_str().empty()) {
#ifdef _USE_SYSTEMD_
if (current_watchdog_usec > 0)
sd_notify(0, "WATCHDOG=1");
sd_notify(0, "STATUS=no IP address, waiting ...");
#endif
BOOST_LOG_TRIVIAL(info) << "main:: no IP address, waiting ...";
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
@ -125,8 +150,14 @@ int main(int argc, char* argv[]) {
throw std::runtime_error(std::string("DriverManager:: init failed"));
}
/* start browser */
auto browser = Browser::create(config);
if (browser == nullptr || !browser->init()) {
throw std::runtime_error(std::string("Browser:: init failed"));
}
/* start session manager */
auto session_manager = SessionManager::create(driver, config);
auto session_manager = SessionManager::create(driver, browser, config);
if (session_manager == nullptr || !session_manager->init()) {
throw std::runtime_error(std::string("SessionManager:: init failed"));
}
@ -143,12 +174,6 @@ int main(int argc, char* argv[]) {
throw std::runtime_error(std::string("RtspServer:: init failed"));
}
/* start browser */
auto browser = Browser::create(config);
if (browser == nullptr || !browser->init()) {
throw std::runtime_error(std::string("Browser:: init failed"));
}
/* start http server */
HttpServer http_server(session_manager, browser, config);
if (!http_server.init()) {
@ -159,7 +184,22 @@ int main(int argc, char* argv[]) {
session_manager->load_status();
BOOST_LOG_TRIVIAL(debug) << "main:: init done, entering loop...";
#ifdef _USE_SYSTEMD_
// To be able to use sd_notify at all have to set service NotifyAccess
// (e.g. to main)
sd_notify(0, "READY=1"); // If service Type=notify the service is only
// considered ready once we send this (this is
// independent of watchdog capability)
sd_notify(0, "STATUS=Working");
#endif
while (!is_terminated()) {
#ifdef _USE_SYSTEMD_
if (current_watchdog_usec > 0)
sd_notify(0, "WATCHDOG=1");
#endif
auto [ip_addr, ip_str] = get_interface_ip(config->get_interface_name());
if (config->get_ip_addr_str() != ip_str) {
BOOST_LOG_TRIVIAL(warning)
@ -175,6 +215,15 @@ int main(int argc, char* argv[]) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
#ifdef _USE_SYSTEMD_
if (is_terminated()) {
sd_notify(0, "STOPPING=1");
sd_notify(0, "STATUS=Stopping");
} else {
sd_notify(0, "RELOADING=1");
sd_notify(0, "STATUS=Restarting");
}
#endif
/* save session status to file */
session_manager->save_status();
@ -184,11 +233,6 @@ int main(int argc, char* argv[]) {
throw std::runtime_error(std::string("HttpServer:: terminate failed"));
}
/* stop browser */
if (!browser->terminate()) {
throw std::runtime_error(std::string("Browser:: terminate failed"));
}
/* stop rtsp server */
if (!rtsp_server.terminate()) {
throw std::runtime_error(std::string("RtspServer:: terminate failed"));
@ -207,6 +251,11 @@ int main(int argc, char* argv[]) {
std::string("SessionManager:: terminate failed"));
}
/* stop browser */
if (!browser->terminate()) {
throw std::runtime_error(std::string("Browser:: terminate failed"));
}
/* stop driver manager */
if (!driver->terminate(*config)) {
throw std::runtime_error(

View File

@ -134,9 +134,9 @@ std::pair<bool, RtspSource> RtspClient::process(RtspClient::Observer callback,
BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: connecting to "
<< "rtsp://" << address << ":" << port << path;
#if BOOST_VERSION < 106600
s.expires_from_now(boost::posix_time::seconds(1));
s.expires_from_now(boost::posix_time::seconds(5));
#else
s.expires_after(boost::asio::chrono::seconds(1));
s.expires_after(boost::asio::chrono::seconds(5));
#endif
s.connect(address, port.length() ? port : dft_port);
if (!s || s.error()) {
@ -233,6 +233,11 @@ std::pair<bool, RtspSource> RtspClient::process(RtspClient::Observer callback,
}
if (wait_for_updates) {
#if BOOST_VERSION < 106600
s.expires_from_now(boost::posix_time::hours(24 * 365 * 10));
#else
s.expires_after(boost::asio::chrono::hours(24 * 365 * 10));
#endif
/* we start waiting for updates */
do {
std::getline(s, request);

View File

@ -38,7 +38,7 @@
#include "session_manager.hpp"
#include "interface.hpp"
static uint8_t get_codec_word_lenght(const std::string& codec) {
static uint8_t get_codec_word_length(const std::string& codec) {
if (codec == "L16") {
return 2;
}
@ -103,8 +103,21 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const {
return false;
}
break;
case 'o':
break;
case 'o': {
std::vector<std::string> fields;
boost::split(fields, val, [line](char c) { return c == ' '; });
if (fields.size() < 6) {
BOOST_LOG_TRIVIAL(warning)
<< "session_manager:: invalid origin at line " << num;
} else {
info.origin.username = fields[0];
info.origin.session_id = fields[1];
info.origin.session_version = std::stoull(fields[2]);
info.origin.network_type = fields[3];
info.origin.address_type = fields[4];
info.origin.unicast_address = fields[5];
}
} break;
case 't':
/* t=0 0 */
status = sdp_parser_status::time;
@ -150,7 +163,7 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const {
if (info.stream.m_byPayloadType == std::stoi(fields[0])) {
strncpy(info.stream.m_cCodec, fields[1].c_str(),
sizeof(info.stream.m_cCodec) - 1);
info.stream.m_byWordLength = get_codec_word_lenght(fields[1]);
info.stream.m_byWordLength = get_codec_word_length(fields[1]);
info.stream.m_ui32SamplingRate = std::stoul(fields[2]);
if (info.stream.m_byNbOfChannels != std::stoi(fields[3])) {
BOOST_LOG_TRIVIAL(warning)
@ -275,14 +288,15 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const {
std::shared_ptr<SessionManager> SessionManager::create(
std::shared_ptr<DriverManager> driver,
std::shared_ptr<Browser> browser,
std::shared_ptr<Config> config) {
// no need to be thread-safe here
static std::weak_ptr<SessionManager> instance;
if (auto ptr = instance.lock()) {
return ptr;
}
auto ptr =
std::shared_ptr<SessionManager>(new SessionManager(driver, config));
auto ptr = std::shared_ptr<SessionManager>(
new SessionManager(driver, browser, config));
instance = ptr;
return ptr;
}
@ -481,7 +495,7 @@ std::error_code SessionManager::add_source(const StreamSource& source) {
sizeof(info.stream.m_cName) - 1);
info.stream.m_ucDSCP = source.dscp; // IPv4 DSCP
info.stream.m_byPayloadType = source.payload_type;
info.stream.m_byWordLength = get_codec_word_lenght(source.codec);
info.stream.m_byWordLength = get_codec_word_length(source.codec);
info.stream.m_byNbOfChannels = source.map.size();
strncpy(info.stream.m_cCodec, source.codec.c_str(),
sizeof(info.stream.m_cCodec) - 1);
@ -613,8 +627,7 @@ std::string SessionManager::get_source_sdp_(uint32_t id,
ss << "v=0\n"
<< "o=- " << info.session_id << " " << info.session_version << " IN IP4 "
<< ip::address_v4(info.stream.m_ui32SrcIP).to_string() << "\n"
<< "s=" << config_->get_node_id() << " "
<< info.stream.m_cName << "\n"
<< "s=" << config_->get_node_id() << " " << info.stream.m_cName << "\n"
<< "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string();
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
ss << "/" << static_cast<unsigned>(info.stream.m_byTTL);
@ -642,8 +655,8 @@ std::string SessionManager::get_source_sdp_(uint32_t id,
if (info.refclk_ptp_traceable) {
ss << "traceable\n";
} else {
ss << ptp_status_.gmid << ":"
<< static_cast<unsigned>(ptp_config_.domain) << "\n";
ss << ptp_status_.gmid << ":" << static_cast<unsigned>(ptp_config_.domain)
<< "\n";
}
ss << "a=recvonly\n";
@ -1009,6 +1022,56 @@ size_t SessionManager::process_sap() {
return sdp_len_sum;
}
std::list<StreamSink> SessionManager::get_updated_sinks(
const std::list<RemoteSource>& sources_list) {
std::list<StreamSink> sinks_list;
std::shared_lock sinks_lock(sinks_mutex_);
for (auto const& [id, info] : sinks_) {
uint64_t newVersion{0};
StreamSink sink{get_sink_(id, info)};
for (auto& source : sources_list) {
// if no remote source origin specified, skip
if (source.origin.session_id == "")
continue;
// search for the largest corresponding remote source version
if (sinks_[sink.id].origin == source.origin && sink.sdp != source.sdp &&
sinks_[sink.id].origin.session_version <
source.origin.session_version &&
newVersion < source.origin.session_version) {
newVersion = source.origin.session_version;
sink.sdp = source.sdp;
}
}
if (newVersion) {
BOOST_LOG_TRIVIAL(info)
<< "session_manager:: sink " << std::to_string(sink.id)
<< " SDP change detected version " << newVersion << " updating";
sinks_list.emplace_back(std::move(sink));
}
}
return sinks_list;
}
void SessionManager::update_sinks() {
if (config_->get_auto_sinks_update()) {
uint32_t last_update = browser_->get_last_update_ts();
// check remote sources only if an update arrived
if (last_update && last_sink_update_ != last_update) {
BOOST_LOG_TRIVIAL(debug) << "Updating sinks ...";
std::list<RemoteSource> remote_sources = browser_->get_remote_sources();
auto sinks_list = get_updated_sinks(remote_sources);
for (auto& sink : sinks_list) {
// Re-add sink with new SDP, since the sink.id is the same there will be
// an update
add_sink(sink);
}
last_sink_update_ = last_update;
}
}
}
void SessionManager::on_update_sources() {
// trigger sources SDP file update
sources_mutex_.lock();
@ -1039,7 +1102,8 @@ void SessionManager::on_ptp_status_changed(const std::string& status) const {
for (int i = STDERR_FILENO + 1; i < fdlimit; i++)
close(i);
char* argv_list[] = {const_cast<char*>(config_->get_ptp_status_script().c_str()),
char* argv_list[] = {
const_cast<char*>(config_->get_ptp_status_script().c_str()),
const_cast<char*>(status.c_str()), NULL};
execv(config_->get_ptp_status_script().c_str(), argv_list);
@ -1157,6 +1221,8 @@ bool SessionManager::worker() {
<< sap_interval << " secs";
}
update_sinks();
std::this_thread::sleep_for(std::chrono::seconds(1));
}

View File

@ -25,9 +25,11 @@
#include <map>
#include <shared_mutex>
#include <thread>
#include <chrono>
#include "config.hpp"
#include "driver_interface.hpp"
#include "browser.hpp"
#include "igmp.hpp"
#include "sap.hpp"
@ -93,6 +95,7 @@ struct StreamInfo {
std::string sink_sdp;
uint32_t session_id{0};
uint32_t session_version{0};
SDPOrigin origin;
};
class SessionManager {
@ -101,6 +104,7 @@ class SessionManager {
static std::shared_ptr<SessionManager> create(
std::shared_ptr<DriverManager> driver,
std::shared_ptr<Browser> browser,
std::shared_ptr<Config> config);
SessionManager() = delete;
SessionManager(const SessionManager&) = delete;
@ -111,6 +115,9 @@ class SessionManager {
bool init() {
if (!running_) {
running_ = true;
g_session_version = std::chrono::system_clock::now().time_since_epoch() /
std::chrono::seconds(1);
// to have an increasing session versions between restarts
res_ = std::async(std::launch::async, &SessionManager::worker, this);
}
return true;
@ -165,6 +172,10 @@ class SessionManager {
constexpr static const char ptp_primary_mcast_addr[] = "224.0.1.129";
constexpr static const char ptp_pdelay_mcast_addr[] = "224.0.1.107";
std::list<StreamSink> get_updated_sinks(
const std::list<RemoteSource>& sources_list);
void update_sinks();
void on_add_source(const StreamSource& source, const StreamInfo& info);
void on_remove_source(const StreamInfo& info);
@ -183,16 +194,21 @@ class SessionManager {
StreamSource get_source_(uint8_t id, const StreamInfo& info) const;
StreamSink get_sink_(uint8_t id, const StreamInfo& info) const;
bool sink_is_still_valid(const std::string sdp,
const std::list<RemoteSource> sources_list) const;
bool parse_sdp(const std::string sdp, StreamInfo& info) const;
bool worker();
// singleton, use create() to build
SessionManager(std::shared_ptr<DriverManager> driver,
std::shared_ptr<Browser> browser,
std::shared_ptr<Config> config)
: driver_(driver), config_(config) {
: browser_(browser), driver_(driver), config_(config) {
ptp_config_.domain = config->get_ptp_domain();
ptp_config_.dscp = config->get_ptp_dscp();
};
std::shared_ptr<Browser> browser_;
std::shared_ptr<DriverManager> driver_;
std::shared_ptr<Config> config_;
std::future<bool> res_;
@ -229,9 +245,10 @@ class SessionManager {
SAP sap_{config_->get_sap_mcast_addr()};
IGMP igmp_;
uint32_t last_sink_update_{0};
/* used to handle session versioning */
inline static std::atomic<uint16_t> g_session_version{0};
inline static std::atomic<uint32_t> g_session_version{0};
};
#endif

View File

@ -18,9 +18,10 @@
"status_file": "",
"interface_name": "lo",
"mdns_enabled": true,
"mac_addr": "00:00:00:00:00:00",
"ip_addr": "127.0.0.1",
"custom_node_id": "test node",
"node_id": "test node",
"ptp_status_script": ""
"ptp_status_script": "",
"mac_addr": "00:00:00:00:00:00",
"ip_addr": "127.0.0.1",
"auto_sinks_update": true
}

View File

@ -398,6 +398,7 @@ BOOST_AUTO_TEST_CASE(get_config) {
auto interface_name = pt.get<std::string>("interface_name");
auto mac_addr = pt.get<std::string>("mac_addr");
auto ip_addr = pt.get<std::string>("ip_addr");
auto auto_sinks_update = pt.get<bool>("auto_sinks_update");
BOOST_CHECK_MESSAGE(http_port == 9999, "config as excepcted");
// BOOST_CHECK_MESSAGE(log_severity == 5, "config as excepcted");
BOOST_CHECK_MESSAGE(playout_delay == 0, "config as excepcted");
@ -419,6 +420,7 @@ BOOST_AUTO_TEST_CASE(get_config) {
BOOST_CHECK_MESSAGE(ptp_status_script == "", "config as excepcted");
BOOST_CHECK_MESSAGE(node_id == "test node", "config as excepcted");
BOOST_CHECK_MESSAGE(custom_node_id == "test node", "config as excepcted");
BOOST_CHECK_MESSAGE(auto_sinks_update == true, "config as excepcted");
}
BOOST_AUTO_TEST_CASE(get_ptp_status) {

View File

@ -19,6 +19,7 @@
//
#include "utils.hpp"
#include "log.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/format.hpp>
@ -86,9 +87,9 @@ std::string get_host_node_id(uint32_t ip_addr) {
}
std::string sdp_get_subject(const std::string& sdp) {
std::stringstream ssstrem(sdp);
std::stringstream sstrem(sdp);
std::string line;
while (getline(ssstrem, line, '\n')) {
while (getline(sstrem, line, '\n')) {
if (line.substr(0, 2) == "s=") {
auto subject = line.substr(2);
boost::trim(subject);
@ -97,3 +98,40 @@ std::string sdp_get_subject(const std::string& sdp) {
}
return "";
}
SDPOrigin sdp_get_origin(const std::string sdp) {
SDPOrigin origin;
try {
std::stringstream sstream(sdp);
std::string line;
while (getline(sstream, line, '\n')) {
boost::trim(line);
if (line[1] != '=') {
BOOST_LOG_TRIVIAL(error) << "session_manager:: invalid SDP file";
break;
}
std::string val = line.substr(2);
if (line[0] == 'o') {
std::vector<std::string> fields;
boost::split(fields, val, [line](char c) { return c == ' '; });
if (fields.size() < 6) {
BOOST_LOG_TRIVIAL(error) << "session_manager:: invalid origin";
break;
}
origin.username = fields[0];
origin.session_id = fields[1];
origin.session_version = std::stoull(fields[2]);
origin.network_type = fields[3];
origin.address_type = fields[4];
origin.unicast_address = fields[5];
break;
}
}
} catch (...) {
BOOST_LOG_TRIVIAL(fatal) << "session_manager:: invalid SDP"
<< ", cannot extract SDP identifier";
}
return origin;
}

View File

@ -38,4 +38,23 @@ std::string get_host_node_id(uint32_t ip_addr);
std::string sdp_get_subject(const std::string& sdp);
struct SDPOrigin {
std::string username;
std::string session_id;
uint64_t session_version{0};
std::string network_type;
std::string address_type;
std::string unicast_address;
bool operator==(const SDPOrigin& rhs) const {
// session_version is not part of comparison, see RFC 4566
return username == rhs.username && session_id == rhs.session_id &&
network_type == rhs.network_type &&
address_type == rhs.address_type &&
unicast_address == rhs.unicast_address;
}
};
SDPOrigin sdp_get_origin(const std::string sdp);
#endif

View File

@ -0,0 +1,2 @@
#Type Name ID GECOS Home directory Shell
u aes67-daemon - "AES67 daemon user"

View File

@ -0,0 +1,60 @@
[Unit]
Description=AES67 daemon service
Before=multi-user.target
After=network.target
[Service]
Type=notify
# Will be adjusted by service during startup
WatchdogSec=10
# Run as separate user created via sysusers.d
User=aes67-daemon
ExecStart=/usr/local/bin/aes67-daemon
# Security filters.
CapabilityBoundingSet=
DevicePolicy=closed
LockPersonality=yes
MemoryDenyWriteExecute=yes
NoNewPrivileges=yes
PrivateDevices=yes
PrivateMounts=yes
PrivateTmp=yes
PrivateUsers=yes
# interface::get_mac_from_arp_cache() reads from /proc/net/arp
ProcSubset=all
ProtectClock=yes
ProtectControlGroups=yes
ProtectHome=yes
ProtectHostname=yes
ProtectKernelLogs=yes
ProtectKernelModules=yes
ProtectKernelTunables=yes
ProtectProc=invisible
ProtectSystem=strict
RemoveIPC=yes
RestrictAddressFamilies=AF_INET AF_NETLINK AF_UNIX
RestrictNamespaces=yes
RestrictRealtime=yes
RestrictSUIDSGID=yes
SystemCallArchitectures=native
SystemCallFilter=~@clock
SystemCallFilter=~@cpu-emulation
SystemCallFilter=~@debug
SystemCallFilter=~@module
SystemCallFilter=~@mount
SystemCallFilter=~@obsolete
SystemCallFilter=~@privileged
SystemCallFilter=~@raw-io
SystemCallFilter=~@reboot
SystemCallFilter=~@resources
SystemCallFilter=~@swap
UMask=077
# Paths matching daemon.conf
ReadWritePaths=/etc/daemon.conf
ReadWritePaths=/etc/status.json
[Install]
WantedBy=multi-user.target

View File

@ -22,5 +22,6 @@
"ip_addr": "127.0.0.1",
"node_id": "AES67 daemon 007f0100",
"custom_node_id": "",
"ptp_status_script": ""
"ptp_status_script": "",
"auto_sinks_update": true
}

View File

@ -59,7 +59,8 @@ class Config extends Component {
ipAddr: '',
errors: 0,
isConfigLoading: false,
isVersionLoading: false
isVersionLoading: false,
autoSinksUpdate: false
};
this.onSubmit = this.onSubmit.bind(this);
this.inputIsValid = this.inputIsValid.bind(this);
@ -104,6 +105,7 @@ class Config extends Component {
macAddr: data.mac_addr,
ipAddr: data.ip_addr,
nodeId: data.node_id,
autoSinksUpdate: data.auto_sinks_update,
isConfigLoading: false
}))
.catch(err => this.setState({isConfigLoading: false}));
@ -139,7 +141,8 @@ class Config extends Component {
this.state.sapMcastAddr,
this.state.sapInterval,
this.state.mdnsEnabled,
this.state.customNodeId)
this.state.customNodeId,
this.state.autoSinksUpdate)
.then(response => toast.success('Applying new configuration ...'));
}
@ -226,6 +229,10 @@ class Config extends Component {
<th align="left"> <label>mDNS enabled</label> </th>
<th align="left"> <input type="checkbox" onChange={e => this.setState({mdnsEnabled: e.target.checked})} checked={this.state.mdnsEnabled ? true : undefined}/> </th>
</tr>
<tr height="35">
<th align="left"> <label>Auto Sinks update</label> </th>
<th align="left"> <input type="checkbox" onChange={e => this.setState({autoSinksUpdate: e.target.checked})} checked={this.state.autoSinksUpdate ? true : undefined}/> </th>
</tr>
<tr>
<th align="left"> <label>Network Interface</label> </th>
<th align="left"> <input value={this.state.interfaceName} disabled/> </th>

View File

@ -84,7 +84,7 @@ export default class RestAPI {
});
}
static setConfig(log_severity, syslog_proto, syslog_server, rtp_mcast_base, rtp_port, rtsp_port, playout_delay, tic_frame_size_at_1fs, sample_rate, max_tic_frame_size, sap_mcast_addr, sap_interval, mdns_enabled, custom_node_id) {
static setConfig(log_severity, syslog_proto, syslog_server, rtp_mcast_base, rtp_port, rtsp_port, playout_delay, tic_frame_size_at_1fs, sample_rate, max_tic_frame_size, sap_mcast_addr, sap_interval, mdns_enabled, custom_node_id, auto_sinks_update) {
return this.doFetch(config, {
body: JSON.stringify({
log_severity: parseInt(log_severity, 10),
@ -100,7 +100,8 @@ export default class RestAPI {
sap_mcast_addr: sap_mcast_addr,
sap_interval: parseInt(sap_interval, 10),
custom_node_id: custom_node_id,
mdns_enabled: mdns_enabled
mdns_enabled: mdns_enabled,
auto_sinks_update: auto_sinks_update
}),
method: 'POST'
}).catch(err => {