diff --git a/README.md b/README.md index 78a291f..bb631c8 100644 --- a/README.md +++ b/README.md @@ -50,10 +50,11 @@ The daemon can be cross-compiled for multiple platforms and implements the follo * control and configuration of up to 64 multicast and unicast sources and sinks using the ALSA RAVENNA/AES67 driver via netlink * session handling and SDP parsing and creation * HTTP REST API for the daemon control and configuration -* SAP sources discovery and advertisement compatible with AES67 standard +* SAP sources discovery, update and advertisement compatible with AES67 standard * mDNS sources discovery and advertisement (using Linux Avahi) compatible with Ravenna standard * RTSP client and server to retrieve, return and update SDP files via DESCRIBE and ANNOUNCE methods according to Ravenna standard * IGMP handling for SAP, PTP and RTP sessions +* Integration with systemd watchdog monitoring (from daemon release v1.6) The directory also contains the daemon regression tests in the [tests](daemon/tests) subdirectory. See the [README](daemon/README.md) file in this directory for additional information about the AES67 daemon configuration and the HTTP REST API. @@ -113,6 +114,45 @@ The [aes67-daemon branch of ravenna-alsa-lkm repository](https://github.com/bond See [ALSA RAVENNA/AES67 Driver README](https://github.com/bondagit/aes67-linux-daemon/blob/master/README.md) for additional information about the Merging Technologies module and for proper Linux Kernel configuration and tuning. +### [systemd](systemd) directory ### + +This directory contains systemd configuration files for the daemon. + +The daemon integrates with systemd watchdog. To enable it recompile it with the option _-DWITH_SYSTEMD=ON_ + +You can install the daemon under systemd using the following commands: + + sudo useradd -M -l aes67-daemon -c "AES67 Linux daemon" + sudo cp daemon/aes67-daemon /usr/local/bin/aes67-daemon + sudo cp daemon/daemon.conf /etc + sudo cp systemd/aes67-daemon.service /etc/systemd/system + sudo systemctl enable aes67-daemon + systemctl daemon-reexec + +To start the daemon use: + + sudo systemctl start aes67-daemon + +To stop it use: + + sudo systemctl stop aes67-daemon + + +The daemon requires the RAVENNA module to be loaded. + +You can install the module on Ubuntu distro using the following commands: + + cd 3rdparty/ravenna-alsa-lkm/driver + sudo make modules_install + +If this doesn't work because you miss kernel certificate follow the instructions at: [No OpenSSL sign-file signing_key.pem](https://superuser.com/questions/1214116/no-openssl-sign-file-signing-key-pem-leads-to-error-while-loading-kernel-modules) + + +Finally use the command to probe the modules: + + sudo depmod -a + + ### [test](test) directory ### This directory contains the files used to run the daemon platform compatibility test on the network loopback interface. The [test](#test) is described below. diff --git a/build.sh b/build.sh index 538902f..1ead070 100755 --- a/build.sh +++ b/build.sh @@ -43,7 +43,7 @@ cd .. cd daemon echo "Building aes67-daemon ..." -cmake -DCPP_HTTPLIB_DIR="$TOPDIR"/3rdparty/cpp-httplib -DRAVENNA_ALSA_LKM_DIR="$TOPDIR"/3rdparty/ravenna-alsa-lkm -DENABLE_TESTS=ON -DWITH_AVAHI=ON -DFAKE_DRIVER=OFF . +cmake -DCPP_HTTPLIB_DIR="$TOPDIR"/3rdparty/cpp-httplib -DRAVENNA_ALSA_LKM_DIR="$TOPDIR"/3rdparty/ravenna-alsa-lkm -DENABLE_TESTS=ON -DWITH_AVAHI=ON -DFAKE_DRIVER=OFF -DWITH_SYSTEMD=OFF . make cd .. diff --git a/daemon/CMakeLists.txt b/daemon/CMakeLists.txt index 2390458..2b20e2e 100644 --- a/daemon/CMakeLists.txt +++ b/daemon/CMakeLists.txt @@ -11,6 +11,8 @@ option(WITH_AVAHI "Include mDNS support via Avahi" OFF) option(FAKE_DRIVER "Use fake driver instead of RAVENNA" OFF) set(CMAKE_CXX_STANDARD 17) +option(WITH_SYSTEMD "Include systemd notify and watchdog support" OFF) + # ravena lkm _should_ be provided by the CLI. Nonetheless, we should be able # to find it in system dirs too... if (NOT RAVENNNA_ALSA_LKM_DIR) @@ -54,3 +56,9 @@ if(WITH_AVAHI) include_directories(aes67-daemon ${AVAHI_INCLUDE_DIRS}) target_link_libraries(aes67-daemon ${AVAHI_LIBRARIES}) endif() + +if(WITH_SYSTEMD) + MESSAGE(STATUS "WITH_SYSTEMD") + add_definitions(-D_USE_SYSTEMD_) + target_link_libraries(aes67-daemon systemd) +endif() diff --git a/daemon/README.md b/daemon/README.md index b16a126..9ddc21e 100644 --- a/daemon/README.md +++ b/daemon/README.md @@ -12,6 +12,7 @@ The daemon is responsible for: * mDNS sources discovery and advertisement (using Linux Avahi) compatible with Ravenna standard * RTSP client and server to retrieve, return and update SDP files via DESCRIBE and ANNOUNCE methods according to Ravenna standard * IGMP handling for SAP, PTP and RTP sessions +* automatic update of Sinks based on discovered mDNS/SAP remote sources ## Configuration file ## @@ -187,7 +188,8 @@ Example "ip_addr": "127.0.0.1", "node_id": "AES67 daemon d9aca383", "custom_node_id": "", - "ptp_status_script": "./scripts/ptp_status.sh" + "ptp_status_script": "./scripts/ptp_status.sh", + "auto_sinks_update": true } where: @@ -269,6 +271,10 @@ where: > JSON string specifying the unique node identifier used to identify mDNS, SAP and SDP services announced by the daemon. > **_NOTE:_** This parameter is read-only and cannot be set. The server will determine the node id at startup time. +> **auto\_sinks\_update** +> JSON boolean specifying whether to enable or disable the automatic update of the configured Sinks. +> When enabled the daemon will automatically update the configured Sinks according to the discovered remote sources via SAP and mDNS/RTSP updates. The SDP Originator (o=) is used to match a Sink with the remote source/s. + > **custom\_node\_id** > JSON string specifying a custom node identifier used to identify mDNS, SAP and SDP services announced by the daemon. > When this parameter is empty the *node_id* is automatically generated by the daemon based on the current IP address. diff --git a/daemon/browser.cpp b/daemon/browser.cpp index 420e01d..7f6ba58 100644 --- a/daemon/browser.cpp +++ b/daemon/browser.cpp @@ -19,7 +19,6 @@ #include -#include "utils.hpp" #include "browser.hpp" using namespace boost::algorithm; @@ -73,23 +72,23 @@ bool Browser::worker() { BOOST_LOG_TRIVIAL(debug) << "browser:: received SAP message for " << id; std::unique_lock sources_lock(sources_mutex_); + last_update_ = + duration_cast(steady_clock::now() - startup_).count(); auto it = sources_.get().find(id); if (it == sources_.end()) { // Source is not in the map if (is_announce) { // annoucement, add new source - sources_.insert( - {id, - "SAP", - ip::address_v4(ntohl(addr)).to_string(), - sdp_get_subject(sdp), - {}, - sdp, - static_cast( - duration_cast(steady_clock::now() - startup_) - .count()), - config_->get_sap_interval()}); + sources_.insert({id, + "SAP", + ip::address_v4(ntohl(addr)).to_string(), + sdp_get_subject(sdp), + {}, + sdp_get_origin(sdp), + sdp, + last_update_, + config_->get_sap_interval()}); } } else { // Source is already in the map @@ -97,12 +96,10 @@ bool Browser::worker() { BOOST_LOG_TRIVIAL(debug) << "browser:: refreshing SAP source " << it->id; // annoucement, update last seen and announce period - uint32_t last_seen = - duration_cast(steady_clock::now() - startup_).count(); auto upd_source{*it}; - if ((last_seen - upd_source.last_seen) != 0) { - upd_source.announce_period = last_seen - upd_source.last_seen; - upd_source.last_seen = last_seen; + if ((last_update_ - upd_source.last_seen) != 0) { + upd_source.announce_period = last_update_ - upd_source.last_seen; + upd_source.last_seen = last_update_; sources_.replace(it, upd_source); } } else { @@ -130,6 +127,8 @@ bool Browser::worker() { BOOST_LOG_TRIVIAL(info) << "browser:: SAP source " << it->id << " timeout"; it = sources_.erase(it); + last_update_ = + duration_cast(steady_clock::now() - startup_).count(); } else { it++; } @@ -150,9 +149,9 @@ bool Browser::worker() { void Browser::on_change_rtsp_source(const std::string& name, const std::string& domain, const RtspSource& s) { - uint32_t last_seen = - duration_cast(steady_clock::now() - startup_).count(); std::unique_lock sources_lock(sources_mutex_); + last_update_ = + duration_cast(steady_clock::now() - startup_).count(); /* search by name */ auto rng = sources_.get().equal_range(name); while (rng.first != rng.second) { @@ -163,9 +162,10 @@ void Browser::on_change_rtsp_source(const std::string& name, << " name " << name << " domain " << domain; auto upd_source{*it}; upd_source.id = s.id; - upd_source.sdp = s.sdp; upd_source.address = s.address; - upd_source.last_seen = last_seen; + upd_source.origin = sdp_get_origin(s.sdp); + upd_source.sdp = s.sdp; + upd_source.last_seen = last_update_; sources_.get().replace(it, upd_source); return; } @@ -174,8 +174,8 @@ void Browser::on_change_rtsp_source(const std::string& name, /* entry not found -> add */ BOOST_LOG_TRIVIAL(info) << "browser:: adding RTSP source " << s.id << " name " << name << " domain " << domain; - sources_.insert( - {s.id, s.source, s.address, name, domain, s.sdp, last_seen, 0}); + sources_.insert({s.id, s.source, s.address, name, domain, + sdp_get_origin(s.sdp), s.sdp, last_update_, 0}); } void Browser::on_remove_rtsp_source(const std::string& name, @@ -190,6 +190,8 @@ void Browser::on_remove_rtsp_source(const std::string& name, << "browser:: removing RTSP source " << it->id << " name " << it->name << " domain " << it->domain; name_idx.erase(it); + last_update_ = + duration_cast(steady_clock::now() - startup_).count(); break; } ++rng.first; @@ -205,6 +207,7 @@ bool Browser::init() { running_ = true; res_ = std::async(std::launch::async, &Browser::worker, this); } + last_update_ = 0; return true; } diff --git a/daemon/browser.hpp b/daemon/browser.hpp index c76d467..716ca88 100644 --- a/daemon/browser.hpp +++ b/daemon/browser.hpp @@ -35,6 +35,7 @@ #include "igmp.hpp" #include "mdns_client.hpp" #include "sap.hpp" +#include "utils.hpp" using namespace boost::multi_index; @@ -44,6 +45,7 @@ struct RemoteSource { std::string address; std::string name; std::string domain; /* mDNS only */ + SDPOrigin origin; std::string sdp; uint32_t last_seen{0}; /* seconds from daemon startup */ uint32_t announce_period{0}; /* period between annoucements */ @@ -59,6 +61,7 @@ class Browser : public MDNSClient { bool init() override; bool terminate() override; + uint32_t get_last_update_ts() const { return last_update_; } std::list get_remote_sources( const std::string& source = "all") const; @@ -97,6 +100,7 @@ class Browser : public MDNSClient { SAP sap_{config_->get_sap_mcast_addr()}; IGMP igmp_; std::chrono::time_point startup_; + uint32_t last_update_{0}; /* seconds from daemon startup */ }; #endif diff --git a/daemon/config.hpp b/daemon/config.hpp index 003ee69..7ad5344 100644 --- a/daemon/config.hpp +++ b/daemon/config.hpp @@ -54,6 +54,7 @@ class Config { const std::string& get_config_filename() const { return config_filename_; }; const std::string& get_custom_node_id() const { return custom_node_id_; }; std::string get_node_id() const; + bool get_auto_sinks_update() const { return auto_sinks_update_; }; /* attributes set during init */ const std::array& get_mac_addr() const { return mac_addr_; }; @@ -122,6 +123,9 @@ class Config { void set_custom_node_id(const std::string& node_id) { custom_node_id_ = node_id; }; + void set_auto_sinks_update(bool auto_sinks_update) { + auto_sinks_update_ = auto_sinks_update; + }; void set_driver_restart(bool restart) { driver_restart_ = restart; } friend bool operator!=(const Config& lhs, const Config& rhs) { @@ -144,6 +148,7 @@ class Config { lhs.get_status_file() != rhs.get_status_file() || lhs.get_interface_name() != rhs.get_interface_name() || lhs.get_mdns_enabled() != rhs.get_mdns_enabled() || + lhs.get_auto_sinks_update() != rhs.get_auto_sinks_update() || lhs.get_custom_node_id() != rhs.get_custom_node_id(); }; friend bool operator==(const Config& lhs, const Config& rhs) { @@ -174,6 +179,7 @@ class Config { std::string ptp_status_script_; std::string custom_node_id_; std::string node_id_; + bool auto_sinks_update_{true}; /* set during init */ std::array mac_addr_{0, 0, 0, 0, 0, 0}; diff --git a/daemon/daemon.conf b/daemon/daemon.conf index b049a64..58e1cec 100644 --- a/daemon/daemon.conf +++ b/daemon/daemon.conf @@ -19,5 +19,6 @@ "interface_name": "lo", "mdns_enabled": true, "custom_node_id": "", - "ptp_status_script": "./scripts/ptp_status.sh" + "ptp_status_script": "./scripts/ptp_status.sh", + "auto_sinks_update": true } diff --git a/daemon/json.cpp b/daemon/json.cpp index a42bb74..a395976 100644 --- a/daemon/json.cpp +++ b/daemon/json.cpp @@ -29,7 +29,7 @@ #include "json.hpp" static inline std::string remove_undesired_chars(const std::string& s) { - std::regex html_regex("[^ A-Za-z0-9:~._/=%\()\\r\\n\\t\?#-]?"); + std::regex html_regex("[^ A-Za-z0-9:~.,_/=%\()\\r\\n\\t\?#-]?"); return std::regex_replace(s, html_regex, ""); } @@ -99,13 +99,16 @@ std::string config_to_json(const Config& config) { << ",\n \"interface_name\": \"" << escape_json(config.get_interface_name()) << "\"" << ",\n \"mdns_enabled\": " << std::boolalpha << config.get_mdns_enabled() - << ",\n \"custom_node_id\": \"" << escape_json(config.get_custom_node_id()) << "\"" + << ",\n \"custom_node_id\": \"" + << escape_json(config.get_custom_node_id()) << "\"" << ",\n \"node_id\": \"" << escape_json(config.get_node_id()) << "\"" << ",\n \"ptp_status_script\": \"" << escape_json(config.get_ptp_status_script()) << "\"" - << ",\n \"mac_addr\": \"" << escape_json(config.get_mac_addr_str()) << "\"" + << ",\n \"mac_addr\": \"" << escape_json(config.get_mac_addr_str()) + << "\"" << ",\n \"ip_addr\": \"" << escape_json(config.get_ip_addr_str()) << "\"" - << "\n}\n"; + << ",\n \"auto_sinks_update\": " << std::boolalpha + << config.get_auto_sinks_update() << "\n}\n"; return ss.str(); } @@ -327,6 +330,8 @@ Config json_to_config_(std::istream& js, Config& config) { } else if (key == "custom_node_id") { config.set_custom_node_id( remove_undesired_chars(val.get_value())); + } else if (key == "auto_sinks_update") { + config.set_auto_sinks_update(val.get_value()); } else if (key == "mac_addr" || key == "ip_addr" || key == "node_id") { /* ignored */ } else { diff --git a/daemon/main.cpp b/daemon/main.cpp index e65f7ba..98039ec 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -31,11 +31,15 @@ #include "rtsp_server.hpp" #include "session_manager.hpp" +#ifdef _USE_SYSTEMD_ +#include +#endif + namespace po = boost::program_options; namespace postyle = boost::program_options::command_line_style; namespace logging = boost::log; -static std::string version("bondagit-1.5.3"); +static std::string version("bondagit-1.6.1"); static std::atomic terminate = false; void termination_handler(int signum) { @@ -64,6 +68,11 @@ int main(int argc, char* argv[]) { int unix_style = postyle::unix_style | postyle::short_allow_next; bool driver_restart(true); +#ifdef _USE_SYSTEMD_ + // with which interval we should pet the dog + uint64_t current_watchdog_usec; +#endif + po::variables_map vm; try { po::store(po::command_line_parser(argc, argv) @@ -98,6 +107,17 @@ int main(int argc, char* argv[]) { std::string filename = vm["config"].as(); +#ifdef _USE_SYSTEMD_ + sd_watchdog_enabled(0, ¤t_watchdog_usec); + + if (current_watchdog_usec > 0) { + // Inform systemd that if we're not petting the dog in 5s we're bust. + sd_notify(0, "WATCHDOG_USEC=5000000"); + + current_watchdog_usec = 5000000; + } +#endif + while (!is_terminated() && rc == EXIT_SUCCESS) { /* load configuration from file */ auto config = Config::parse(filename, driver_restart); @@ -112,6 +132,11 @@ int main(int argc, char* argv[]) { log_init(*config); if (config->get_ip_addr_str().empty()) { +#ifdef _USE_SYSTEMD_ + if (current_watchdog_usec > 0) + sd_notify(0, "WATCHDOG=1"); + sd_notify(0, "STATUS=no IP address, waiting ..."); +#endif BOOST_LOG_TRIVIAL(info) << "main:: no IP address, waiting ..."; std::this_thread::sleep_for(std::chrono::seconds(1)); continue; @@ -125,8 +150,14 @@ int main(int argc, char* argv[]) { throw std::runtime_error(std::string("DriverManager:: init failed")); } + /* start browser */ + auto browser = Browser::create(config); + if (browser == nullptr || !browser->init()) { + throw std::runtime_error(std::string("Browser:: init failed")); + } + /* start session manager */ - auto session_manager = SessionManager::create(driver, config); + auto session_manager = SessionManager::create(driver, browser, config); if (session_manager == nullptr || !session_manager->init()) { throw std::runtime_error(std::string("SessionManager:: init failed")); } @@ -143,12 +174,6 @@ int main(int argc, char* argv[]) { throw std::runtime_error(std::string("RtspServer:: init failed")); } - /* start browser */ - auto browser = Browser::create(config); - if (browser == nullptr || !browser->init()) { - throw std::runtime_error(std::string("Browser:: init failed")); - } - /* start http server */ HttpServer http_server(session_manager, browser, config); if (!http_server.init()) { @@ -159,7 +184,22 @@ int main(int argc, char* argv[]) { session_manager->load_status(); BOOST_LOG_TRIVIAL(debug) << "main:: init done, entering loop..."; + +#ifdef _USE_SYSTEMD_ + // To be able to use sd_notify at all have to set service NotifyAccess + // (e.g. to main) + sd_notify(0, "READY=1"); // If service Type=notify the service is only + // considered ready once we send this (this is + // independent of watchdog capability) + sd_notify(0, "STATUS=Working"); +#endif + while (!is_terminated()) { +#ifdef _USE_SYSTEMD_ + if (current_watchdog_usec > 0) + sd_notify(0, "WATCHDOG=1"); +#endif + auto [ip_addr, ip_str] = get_interface_ip(config->get_interface_name()); if (config->get_ip_addr_str() != ip_str) { BOOST_LOG_TRIVIAL(warning) @@ -175,6 +215,15 @@ int main(int argc, char* argv[]) { std::this_thread::sleep_for(std::chrono::seconds(1)); } +#ifdef _USE_SYSTEMD_ + if (is_terminated()) { + sd_notify(0, "STOPPING=1"); + sd_notify(0, "STATUS=Stopping"); + } else { + sd_notify(0, "RELOADING=1"); + sd_notify(0, "STATUS=Restarting"); + } +#endif /* save session status to file */ session_manager->save_status(); @@ -184,11 +233,6 @@ int main(int argc, char* argv[]) { throw std::runtime_error(std::string("HttpServer:: terminate failed")); } - /* stop browser */ - if (!browser->terminate()) { - throw std::runtime_error(std::string("Browser:: terminate failed")); - } - /* stop rtsp server */ if (!rtsp_server.terminate()) { throw std::runtime_error(std::string("RtspServer:: terminate failed")); @@ -207,6 +251,11 @@ int main(int argc, char* argv[]) { std::string("SessionManager:: terminate failed")); } + /* stop browser */ + if (!browser->terminate()) { + throw std::runtime_error(std::string("Browser:: terminate failed")); + } + /* stop driver manager */ if (!driver->terminate(*config)) { throw std::runtime_error( diff --git a/daemon/rtsp_client.cpp b/daemon/rtsp_client.cpp index ae323fb..e1dea27 100644 --- a/daemon/rtsp_client.cpp +++ b/daemon/rtsp_client.cpp @@ -134,9 +134,9 @@ std::pair RtspClient::process(RtspClient::Observer callback, BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: connecting to " << "rtsp://" << address << ":" << port << path; #if BOOST_VERSION < 106600 - s.expires_from_now(boost::posix_time::seconds(1)); + s.expires_from_now(boost::posix_time::seconds(5)); #else - s.expires_after(boost::asio::chrono::seconds(1)); + s.expires_after(boost::asio::chrono::seconds(5)); #endif s.connect(address, port.length() ? port : dft_port); if (!s || s.error()) { @@ -233,6 +233,11 @@ std::pair RtspClient::process(RtspClient::Observer callback, } if (wait_for_updates) { +#if BOOST_VERSION < 106600 + s.expires_from_now(boost::posix_time::hours(24 * 365 * 10)); +#else + s.expires_after(boost::asio::chrono::hours(24 * 365 * 10)); +#endif /* we start waiting for updates */ do { std::getline(s, request); diff --git a/daemon/session_manager.cpp b/daemon/session_manager.cpp index 770251e..c84a484 100644 --- a/daemon/session_manager.cpp +++ b/daemon/session_manager.cpp @@ -38,7 +38,7 @@ #include "session_manager.hpp" #include "interface.hpp" -static uint8_t get_codec_word_lenght(const std::string& codec) { +static uint8_t get_codec_word_length(const std::string& codec) { if (codec == "L16") { return 2; } @@ -103,8 +103,21 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const { return false; } break; - case 'o': - break; + case 'o': { + std::vector fields; + boost::split(fields, val, [line](char c) { return c == ' '; }); + if (fields.size() < 6) { + BOOST_LOG_TRIVIAL(warning) + << "session_manager:: invalid origin at line " << num; + } else { + info.origin.username = fields[0]; + info.origin.session_id = fields[1]; + info.origin.session_version = std::stoull(fields[2]); + info.origin.network_type = fields[3]; + info.origin.address_type = fields[4]; + info.origin.unicast_address = fields[5]; + } + } break; case 't': /* t=0 0 */ status = sdp_parser_status::time; @@ -150,7 +163,7 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const { if (info.stream.m_byPayloadType == std::stoi(fields[0])) { strncpy(info.stream.m_cCodec, fields[1].c_str(), sizeof(info.stream.m_cCodec) - 1); - info.stream.m_byWordLength = get_codec_word_lenght(fields[1]); + info.stream.m_byWordLength = get_codec_word_length(fields[1]); info.stream.m_ui32SamplingRate = std::stoul(fields[2]); if (info.stream.m_byNbOfChannels != std::stoi(fields[3])) { BOOST_LOG_TRIVIAL(warning) @@ -275,14 +288,15 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const { std::shared_ptr SessionManager::create( std::shared_ptr driver, + std::shared_ptr browser, std::shared_ptr config) { // no need to be thread-safe here static std::weak_ptr instance; if (auto ptr = instance.lock()) { return ptr; } - auto ptr = - std::shared_ptr(new SessionManager(driver, config)); + auto ptr = std::shared_ptr( + new SessionManager(driver, browser, config)); instance = ptr; return ptr; } @@ -481,7 +495,7 @@ std::error_code SessionManager::add_source(const StreamSource& source) { sizeof(info.stream.m_cName) - 1); info.stream.m_ucDSCP = source.dscp; // IPv4 DSCP info.stream.m_byPayloadType = source.payload_type; - info.stream.m_byWordLength = get_codec_word_lenght(source.codec); + info.stream.m_byWordLength = get_codec_word_length(source.codec); info.stream.m_byNbOfChannels = source.map.size(); strncpy(info.stream.m_cCodec, source.codec.c_str(), sizeof(info.stream.m_cCodec) - 1); @@ -613,8 +627,7 @@ std::string SessionManager::get_source_sdp_(uint32_t id, ss << "v=0\n" << "o=- " << info.session_id << " " << info.session_version << " IN IP4 " << ip::address_v4(info.stream.m_ui32SrcIP).to_string() << "\n" - << "s=" << config_->get_node_id() << " " - << info.stream.m_cName << "\n" + << "s=" << config_->get_node_id() << " " << info.stream.m_cName << "\n" << "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string(); if (IN_MULTICAST(info.stream.m_ui32DestIP)) { ss << "/" << static_cast(info.stream.m_byTTL); @@ -642,8 +655,8 @@ std::string SessionManager::get_source_sdp_(uint32_t id, if (info.refclk_ptp_traceable) { ss << "traceable\n"; } else { - ss << ptp_status_.gmid << ":" - << static_cast(ptp_config_.domain) << "\n"; + ss << ptp_status_.gmid << ":" << static_cast(ptp_config_.domain) + << "\n"; } ss << "a=recvonly\n"; @@ -1009,6 +1022,56 @@ size_t SessionManager::process_sap() { return sdp_len_sum; } +std::list SessionManager::get_updated_sinks( + const std::list& sources_list) { + std::list sinks_list; + std::shared_lock sinks_lock(sinks_mutex_); + for (auto const& [id, info] : sinks_) { + uint64_t newVersion{0}; + StreamSink sink{get_sink_(id, info)}; + for (auto& source : sources_list) { + // if no remote source origin specified, skip + if (source.origin.session_id == "") + continue; + + // search for the largest corresponding remote source version + if (sinks_[sink.id].origin == source.origin && sink.sdp != source.sdp && + sinks_[sink.id].origin.session_version < + source.origin.session_version && + newVersion < source.origin.session_version) { + newVersion = source.origin.session_version; + sink.sdp = source.sdp; + } + } + + if (newVersion) { + BOOST_LOG_TRIVIAL(info) + << "session_manager:: sink " << std::to_string(sink.id) + << " SDP change detected version " << newVersion << " updating"; + sinks_list.emplace_back(std::move(sink)); + } + } + return sinks_list; +} + +void SessionManager::update_sinks() { + if (config_->get_auto_sinks_update()) { + uint32_t last_update = browser_->get_last_update_ts(); + // check remote sources only if an update arrived + if (last_update && last_sink_update_ != last_update) { + BOOST_LOG_TRIVIAL(debug) << "Updating sinks ..."; + std::list remote_sources = browser_->get_remote_sources(); + auto sinks_list = get_updated_sinks(remote_sources); + for (auto& sink : sinks_list) { + // Re-add sink with new SDP, since the sink.id is the same there will be + // an update + add_sink(sink); + } + last_sink_update_ = last_update; + } + } +} + void SessionManager::on_update_sources() { // trigger sources SDP file update sources_mutex_.lock(); @@ -1039,8 +1102,9 @@ void SessionManager::on_ptp_status_changed(const std::string& status) const { for (int i = STDERR_FILENO + 1; i < fdlimit; i++) close(i); - char* argv_list[] = {const_cast(config_->get_ptp_status_script().c_str()), - const_cast(status.c_str()), NULL}; + char* argv_list[] = { + const_cast(config_->get_ptp_status_script().c_str()), + const_cast(status.c_str()), NULL}; execv(config_->get_ptp_status_script().c_str(), argv_list); exit(0); @@ -1157,6 +1221,8 @@ bool SessionManager::worker() { << sap_interval << " secs"; } + update_sinks(); + std::this_thread::sleep_for(std::chrono::seconds(1)); } diff --git a/daemon/session_manager.hpp b/daemon/session_manager.hpp index 3fcc16f..d7d4d66 100644 --- a/daemon/session_manager.hpp +++ b/daemon/session_manager.hpp @@ -25,9 +25,11 @@ #include #include #include +#include #include "config.hpp" #include "driver_interface.hpp" +#include "browser.hpp" #include "igmp.hpp" #include "sap.hpp" @@ -93,6 +95,7 @@ struct StreamInfo { std::string sink_sdp; uint32_t session_id{0}; uint32_t session_version{0}; + SDPOrigin origin; }; class SessionManager { @@ -101,6 +104,7 @@ class SessionManager { static std::shared_ptr create( std::shared_ptr driver, + std::shared_ptr browser, std::shared_ptr config); SessionManager() = delete; SessionManager(const SessionManager&) = delete; @@ -111,6 +115,9 @@ class SessionManager { bool init() { if (!running_) { running_ = true; + g_session_version = std::chrono::system_clock::now().time_since_epoch() / + std::chrono::seconds(1); + // to have an increasing session versions between restarts res_ = std::async(std::launch::async, &SessionManager::worker, this); } return true; @@ -165,6 +172,10 @@ class SessionManager { constexpr static const char ptp_primary_mcast_addr[] = "224.0.1.129"; constexpr static const char ptp_pdelay_mcast_addr[] = "224.0.1.107"; + std::list get_updated_sinks( + const std::list& sources_list); + void update_sinks(); + void on_add_source(const StreamSource& source, const StreamInfo& info); void on_remove_source(const StreamInfo& info); @@ -183,16 +194,21 @@ class SessionManager { StreamSource get_source_(uint8_t id, const StreamInfo& info) const; StreamSink get_sink_(uint8_t id, const StreamInfo& info) const; + bool sink_is_still_valid(const std::string sdp, + const std::list sources_list) const; + bool parse_sdp(const std::string sdp, StreamInfo& info) const; bool worker(); // singleton, use create() to build SessionManager(std::shared_ptr driver, + std::shared_ptr browser, std::shared_ptr config) - : driver_(driver), config_(config) { + : browser_(browser), driver_(driver), config_(config) { ptp_config_.domain = config->get_ptp_domain(); ptp_config_.dscp = config->get_ptp_dscp(); }; + std::shared_ptr browser_; std::shared_ptr driver_; std::shared_ptr config_; std::future res_; @@ -229,9 +245,10 @@ class SessionManager { SAP sap_{config_->get_sap_mcast_addr()}; IGMP igmp_; + uint32_t last_sink_update_{0}; /* used to handle session versioning */ - inline static std::atomic g_session_version{0}; + inline static std::atomic g_session_version{0}; }; #endif diff --git a/daemon/tests/daemon.conf b/daemon/tests/daemon.conf index 853fd97..7831362 100644 --- a/daemon/tests/daemon.conf +++ b/daemon/tests/daemon.conf @@ -18,9 +18,10 @@ "status_file": "", "interface_name": "lo", "mdns_enabled": true, - "mac_addr": "00:00:00:00:00:00", - "ip_addr": "127.0.0.1", "custom_node_id": "test node", "node_id": "test node", - "ptp_status_script": "" + "ptp_status_script": "", + "mac_addr": "00:00:00:00:00:00", + "ip_addr": "127.0.0.1", + "auto_sinks_update": true } diff --git a/daemon/tests/daemon_test.cpp b/daemon/tests/daemon_test.cpp index bb7501f..1b6ddad 100644 --- a/daemon/tests/daemon_test.cpp +++ b/daemon/tests/daemon_test.cpp @@ -398,6 +398,7 @@ BOOST_AUTO_TEST_CASE(get_config) { auto interface_name = pt.get("interface_name"); auto mac_addr = pt.get("mac_addr"); auto ip_addr = pt.get("ip_addr"); + auto auto_sinks_update = pt.get("auto_sinks_update"); BOOST_CHECK_MESSAGE(http_port == 9999, "config as excepcted"); // BOOST_CHECK_MESSAGE(log_severity == 5, "config as excepcted"); BOOST_CHECK_MESSAGE(playout_delay == 0, "config as excepcted"); @@ -419,6 +420,7 @@ BOOST_AUTO_TEST_CASE(get_config) { BOOST_CHECK_MESSAGE(ptp_status_script == "", "config as excepcted"); BOOST_CHECK_MESSAGE(node_id == "test node", "config as excepcted"); BOOST_CHECK_MESSAGE(custom_node_id == "test node", "config as excepcted"); + BOOST_CHECK_MESSAGE(auto_sinks_update == true, "config as excepcted"); } BOOST_AUTO_TEST_CASE(get_ptp_status) { diff --git a/daemon/utils.cpp b/daemon/utils.cpp index 1f3c753..9533a1c 100644 --- a/daemon/utils.cpp +++ b/daemon/utils.cpp @@ -19,6 +19,7 @@ // #include "utils.hpp" +#include "log.hpp" #include #include @@ -86,9 +87,9 @@ std::string get_host_node_id(uint32_t ip_addr) { } std::string sdp_get_subject(const std::string& sdp) { - std::stringstream ssstrem(sdp); + std::stringstream sstrem(sdp); std::string line; - while (getline(ssstrem, line, '\n')) { + while (getline(sstrem, line, '\n')) { if (line.substr(0, 2) == "s=") { auto subject = line.substr(2); boost::trim(subject); @@ -97,3 +98,40 @@ std::string sdp_get_subject(const std::string& sdp) { } return ""; } + +SDPOrigin sdp_get_origin(const std::string sdp) { + SDPOrigin origin; + try { + std::stringstream sstream(sdp); + std::string line; + while (getline(sstream, line, '\n')) { + boost::trim(line); + if (line[1] != '=') { + BOOST_LOG_TRIVIAL(error) << "session_manager:: invalid SDP file"; + break; + } + std::string val = line.substr(2); + if (line[0] == 'o') { + std::vector fields; + boost::split(fields, val, [line](char c) { return c == ' '; }); + if (fields.size() < 6) { + BOOST_LOG_TRIVIAL(error) << "session_manager:: invalid origin"; + break; + } + + origin.username = fields[0]; + origin.session_id = fields[1]; + origin.session_version = std::stoull(fields[2]); + origin.network_type = fields[3]; + origin.address_type = fields[4]; + origin.unicast_address = fields[5]; + break; + } + } + } catch (...) { + BOOST_LOG_TRIVIAL(fatal) << "session_manager:: invalid SDP" + << ", cannot extract SDP identifier"; + } + + return origin; +} diff --git a/daemon/utils.hpp b/daemon/utils.hpp index 343f75d..c0757db 100644 --- a/daemon/utils.hpp +++ b/daemon/utils.hpp @@ -38,4 +38,23 @@ std::string get_host_node_id(uint32_t ip_addr); std::string sdp_get_subject(const std::string& sdp); +struct SDPOrigin { + std::string username; + std::string session_id; + uint64_t session_version{0}; + std::string network_type; + std::string address_type; + std::string unicast_address; + + bool operator==(const SDPOrigin& rhs) const { + // session_version is not part of comparison, see RFC 4566 + return username == rhs.username && session_id == rhs.session_id && + network_type == rhs.network_type && + address_type == rhs.address_type && + unicast_address == rhs.unicast_address; + } +}; + +SDPOrigin sdp_get_origin(const std::string sdp); + #endif diff --git a/systemd/aes67-daemon.conf b/systemd/aes67-daemon.conf new file mode 100644 index 0000000..12ff58e --- /dev/null +++ b/systemd/aes67-daemon.conf @@ -0,0 +1,2 @@ +#Type Name ID GECOS Home directory Shell +u aes67-daemon - "AES67 daemon user" diff --git a/systemd/aes67-daemon.service b/systemd/aes67-daemon.service new file mode 100644 index 0000000..35c4128 --- /dev/null +++ b/systemd/aes67-daemon.service @@ -0,0 +1,60 @@ +[Unit] +Description=AES67 daemon service +Before=multi-user.target +After=network.target + +[Service] +Type=notify +# Will be adjusted by service during startup +WatchdogSec=10 + +# Run as separate user created via sysusers.d +User=aes67-daemon + +ExecStart=/usr/local/bin/aes67-daemon + +# Security filters. +CapabilityBoundingSet= +DevicePolicy=closed +LockPersonality=yes +MemoryDenyWriteExecute=yes +NoNewPrivileges=yes +PrivateDevices=yes +PrivateMounts=yes +PrivateTmp=yes +PrivateUsers=yes +# interface::get_mac_from_arp_cache() reads from /proc/net/arp +ProcSubset=all +ProtectClock=yes +ProtectControlGroups=yes +ProtectHome=yes +ProtectHostname=yes +ProtectKernelLogs=yes +ProtectKernelModules=yes +ProtectKernelTunables=yes +ProtectProc=invisible +ProtectSystem=strict +RemoveIPC=yes +RestrictAddressFamilies=AF_INET AF_NETLINK AF_UNIX +RestrictNamespaces=yes +RestrictRealtime=yes +RestrictSUIDSGID=yes +SystemCallArchitectures=native +SystemCallFilter=~@clock +SystemCallFilter=~@cpu-emulation +SystemCallFilter=~@debug +SystemCallFilter=~@module +SystemCallFilter=~@mount +SystemCallFilter=~@obsolete +SystemCallFilter=~@privileged +SystemCallFilter=~@raw-io +SystemCallFilter=~@reboot +SystemCallFilter=~@resources +SystemCallFilter=~@swap +UMask=077 +# Paths matching daemon.conf +ReadWritePaths=/etc/daemon.conf +ReadWritePaths=/etc/status.json + +[Install] +WantedBy=multi-user.target diff --git a/test/daemon.conf b/test/daemon.conf index 279bae0..bf88a92 100644 --- a/test/daemon.conf +++ b/test/daemon.conf @@ -22,5 +22,6 @@ "ip_addr": "127.0.0.1", "node_id": "AES67 daemon 007f0100", "custom_node_id": "", - "ptp_status_script": "" + "ptp_status_script": "", + "auto_sinks_update": true } diff --git a/webui/src/Config.jsx b/webui/src/Config.jsx index ecc6513..99af251 100644 --- a/webui/src/Config.jsx +++ b/webui/src/Config.jsx @@ -59,7 +59,8 @@ class Config extends Component { ipAddr: '', errors: 0, isConfigLoading: false, - isVersionLoading: false + isVersionLoading: false, + autoSinksUpdate: false }; this.onSubmit = this.onSubmit.bind(this); this.inputIsValid = this.inputIsValid.bind(this); @@ -104,6 +105,7 @@ class Config extends Component { macAddr: data.mac_addr, ipAddr: data.ip_addr, nodeId: data.node_id, + autoSinksUpdate: data.auto_sinks_update, isConfigLoading: false })) .catch(err => this.setState({isConfigLoading: false})); @@ -139,7 +141,8 @@ class Config extends Component { this.state.sapMcastAddr, this.state.sapInterval, this.state.mdnsEnabled, - this.state.customNodeId) + this.state.customNodeId, + this.state.autoSinksUpdate) .then(response => toast.success('Applying new configuration ...')); } @@ -226,6 +229,10 @@ class Config extends Component { this.setState({mdnsEnabled: e.target.checked})} checked={this.state.mdnsEnabled ? true : undefined}/> + + + this.setState({autoSinksUpdate: e.target.checked})} checked={this.state.autoSinksUpdate ? true : undefined}/> + diff --git a/webui/src/Services.js b/webui/src/Services.js index 35ffab6..d3a6174 100644 --- a/webui/src/Services.js +++ b/webui/src/Services.js @@ -84,7 +84,7 @@ export default class RestAPI { }); } - static setConfig(log_severity, syslog_proto, syslog_server, rtp_mcast_base, rtp_port, rtsp_port, playout_delay, tic_frame_size_at_1fs, sample_rate, max_tic_frame_size, sap_mcast_addr, sap_interval, mdns_enabled, custom_node_id) { + static setConfig(log_severity, syslog_proto, syslog_server, rtp_mcast_base, rtp_port, rtsp_port, playout_delay, tic_frame_size_at_1fs, sample_rate, max_tic_frame_size, sap_mcast_addr, sap_interval, mdns_enabled, custom_node_id, auto_sinks_update) { return this.doFetch(config, { body: JSON.stringify({ log_severity: parseInt(log_severity, 10), @@ -100,7 +100,8 @@ export default class RestAPI { sap_mcast_addr: sap_mcast_addr, sap_interval: parseInt(sap_interval, 10), custom_node_id: custom_node_id, - mdns_enabled: mdns_enabled + mdns_enabled: mdns_enabled, + auto_sinks_update: auto_sinks_update }), method: 'POST' }).catch(err => {