Additional set of changes to complete support for mDNS sources update via RTSP ANNOUNCE method as described by Ravenna.
The following changes enable support for RTCP client persistent connections used to receive RTSP server ANNOUNCE messages. These in turn update the remote sources in the Browser. - added new process static method to RtspClient class to receive and update source SDP files via DESCRIBE and ANNOUNCE methods. This method can handle persistent connections with the RTSP server and triggers an observer callback when a source SDP file is received or updated. A map is used to track all active RTSP clients. - added new stop and stop_all static methods to RtspClient class respectively to stop a specific RTCP client and to stop all clients. - modified MDNSClient class to call RtspClient::process method when a new Ravenna service is added and the RtspClient::stop method when a service is removed. The RtspClient::stop_all method is called when the daemon terminates. - modified Browser class to receive and handle remote RTSP sources SDP file add and change notifications. This class receives the updates via the RtspClient observer callback - updated documentation
This commit is contained in:
parent
aeb4e8b0a0
commit
e05253d444
@ -46,7 +46,7 @@ The daemon can be cross-compiled for multiple platforms and implements the follo
|
||||
* HTTP REST API for the daemon control and configuration
|
||||
* SAP sources discovery and advertisement compatible with AES67 standard
|
||||
* mDNS sources discovery and advertisement (using Linux Avahi) compatible with Ravenna standard
|
||||
* RTSP client and server to retrieve or return SDP files via DESCRIBE method compatible with Ravenna standard
|
||||
* RTSP client and server to retrieve, return and update SDP files via DESCRIBE and ANNOUNCE methods according to Ravenna standard
|
||||
* IGMP handling for SAP, PTP and RTP sessions
|
||||
|
||||
The directory also contains the daemon regression tests in the [tests](daemon/tests) subdirectory.
|
||||
|
@ -10,7 +10,7 @@ The daemon is responsible for:
|
||||
* HTTP REST API for the daemon control and configuration
|
||||
* SAP sources discovery and advertisement compatible with AES67 standard
|
||||
* mDNS sources discovery and advertisement (using Linux Avahi) compatible with Ravenna standard
|
||||
* RTSP client and server to retrieve or return SDP files via DESCRIBE method compatible with Ravenna standard
|
||||
* RTSP client and server to retrieve, return and update SDP files via DESCRIBE and ANNOUNCE methods according to Ravenna standard
|
||||
* IGMP handling for SAP, PTP and RTP sessions
|
||||
|
||||
|
||||
|
@ -107,9 +107,9 @@ bool Browser::worker() {
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< "browser:: refreshing SAP source " << it->id;
|
||||
// annoucement, update last seen and announce period
|
||||
auto upd_source{*it};
|
||||
uint32_t last_seen =
|
||||
duration_cast<second_t>(steady_clock::now() - startup_).count();
|
||||
auto upd_source{*it};
|
||||
upd_source.announce_period = last_seen - upd_source.last_seen;
|
||||
upd_source.last_seen = last_seen;
|
||||
sources_.replace(it, upd_source);
|
||||
@ -155,26 +155,32 @@ bool Browser::worker() {
|
||||
return true;
|
||||
}
|
||||
|
||||
void Browser::on_new_rtsp_source(const std::string& name,
|
||||
const std::string& domain,
|
||||
const RtspSource& s) {
|
||||
void Browser::on_change_rtsp_source(const std::string& name,
|
||||
const std::string& domain,
|
||||
const RtspSource& s) {
|
||||
uint32_t last_seen = duration_cast<second_t>(steady_clock::now() -
|
||||
startup_).count();
|
||||
std::unique_lock sources_lock(sources_mutex_);
|
||||
/* search by name */
|
||||
auto rng = sources_.get<name_tag>().equal_range(name);
|
||||
while(rng.first != rng.second){
|
||||
const auto& it = rng.first;
|
||||
if (it->source == "mDNS" && it->domain == domain) {
|
||||
/* conflict ? */
|
||||
BOOST_LOG_TRIVIAL(warning) << "browser:: mDNS source conflict on"
|
||||
<< " name " << it->name
|
||||
<< " domain " << it->domain
|
||||
<< ", skipping ... ";
|
||||
/* mDNS source with same name and domain -> update */
|
||||
BOOST_LOG_TRIVIAL(info) << "browser:: updating RTSP source " << s.id
|
||||
<< " name " << name
|
||||
<< " domain " << domain;
|
||||
auto upd_source{*it};
|
||||
upd_source.id = s.id;
|
||||
upd_source.sdp = s.sdp;
|
||||
upd_source.address = s.address;
|
||||
upd_source.last_seen = last_seen;
|
||||
sources_.get<name_tag>().replace(it, upd_source);
|
||||
return;
|
||||
}
|
||||
++rng.first;
|
||||
}
|
||||
|
||||
/* entry not found -> add */
|
||||
BOOST_LOG_TRIVIAL(info) << "browser:: adding RTSP source " << s.id
|
||||
<< " name " << name
|
||||
<< " domain " << domain;
|
||||
|
@ -73,7 +73,7 @@ class Browser : public MDNSClient {
|
||||
|
||||
bool worker();
|
||||
|
||||
virtual void on_new_rtsp_source(
|
||||
virtual void on_change_rtsp_source(
|
||||
const std::string& name,
|
||||
const std::string& domain,
|
||||
const RtspSource& source) override;
|
||||
|
@ -17,14 +17,13 @@
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
//
|
||||
|
||||
#include "mdns_client.hpp"
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
|
||||
#include "config.hpp"
|
||||
#include "interface.hpp"
|
||||
#include "log.hpp"
|
||||
#include "rtsp_client.hpp"
|
||||
#include "mdns_client.hpp"
|
||||
|
||||
#ifdef _USE_AVAHI_
|
||||
void MDNSClient::resolve_callback(AvahiServiceResolver* r,
|
||||
@ -89,6 +88,7 @@ void MDNSClient::resolve_callback(AvahiServiceResolver* r,
|
||||
(mdns.config_->get_interface_name() == "lo"))) {
|
||||
std::lock_guard<std::mutex> lock(mdns.sources_res_mutex_);
|
||||
|
||||
/* process RTSP client in async task */
|
||||
mdns.sources_res_.emplace_back(std::async(
|
||||
std::launch::async,
|
||||
[&mdns,
|
||||
@ -96,11 +96,12 @@ void MDNSClient::resolve_callback(AvahiServiceResolver* r,
|
||||
domain_ = std::forward<std::string>(domain),
|
||||
addr_ = std::forward<std::string>(addr),
|
||||
port_ = std::forward<std::string>(std::to_string(port))] {
|
||||
auto res = RtspClient::describe(std::string("/by-name/") + name_,
|
||||
addr_, port_);
|
||||
if (res.first) {
|
||||
mdns.on_new_rtsp_source(name_, domain_, res.second);
|
||||
}
|
||||
RtspClient::process(
|
||||
std::bind(&MDNSClient::on_change_rtsp_source, &mdns,
|
||||
std::placeholders::_1, std::placeholders::_2,
|
||||
std::placeholders::_3),
|
||||
name_, domain_, std::string("/by-name/") + name_,
|
||||
addr_, port_);
|
||||
}));
|
||||
}
|
||||
|
||||
@ -153,6 +154,7 @@ void MDNSClient::browse_callback(AvahiServiceBrowser* b,
|
||||
BOOST_LOG_TRIVIAL(info) << "mdns_client:: (Browser) REMOVE: "
|
||||
<< "service " << name << " of type " << type
|
||||
<< " in domain " << domain;
|
||||
RtspClient::stop(name, domain);
|
||||
mdns.on_remove_rtsp_source(name, domain);
|
||||
break;
|
||||
|
||||
@ -259,6 +261,7 @@ void MDNSClient::process_results() {
|
||||
bool MDNSClient::terminate() {
|
||||
if (running_) {
|
||||
running_ = false;
|
||||
RtspClient::stop_all();
|
||||
#ifdef _USE_AVAHI_
|
||||
/* wait for all pending results and remove from list */
|
||||
std::lock_guard<std::mutex> lock(sources_res_mutex_);
|
||||
|
@ -48,11 +48,11 @@ class MDNSClient {
|
||||
virtual bool terminate();
|
||||
|
||||
protected:
|
||||
virtual void on_new_rtsp_source(const std::string& name,
|
||||
const std::string& domain,
|
||||
const RtspSource& source) = 0;
|
||||
virtual void on_change_rtsp_source(const std::string& name,
|
||||
const std::string& domain,
|
||||
const RtspSource& source){};
|
||||
virtual void on_remove_rtsp_source(const std::string& name,
|
||||
const std::string& domain) = 0;
|
||||
const std::string& domain){};
|
||||
|
||||
void process_results();
|
||||
std::list<std::future<void> > sources_res_;
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
#include <chrono>
|
||||
#include <map>
|
||||
|
||||
#include "log.hpp"
|
||||
#include "utils.hpp"
|
||||
@ -36,13 +37,27 @@ using namespace boost::asio;
|
||||
using namespace boost::asio::ip;
|
||||
using namespace boost::algorithm;
|
||||
|
||||
|
||||
struct RtspResponse {
|
||||
uint32_t cseq;
|
||||
int32_t cseq{-1};
|
||||
std::string content_type;
|
||||
uint64_t content_length;
|
||||
uint64_t content_length{0};
|
||||
std::string body;
|
||||
};
|
||||
|
||||
static std::string sdp_get_subject(const std::string& sdp) {
|
||||
std::stringstream ssstrem(sdp);
|
||||
std::string line;
|
||||
while (getline(ssstrem, line, '\n')) {
|
||||
if (line.substr(0, 2) == "s=") {
|
||||
auto subject = line.substr(2);
|
||||
trim(subject);
|
||||
return subject;
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
RtspResponse read_response(tcp::iostream& s, uint16_t max_length) {
|
||||
RtspResponse res;
|
||||
std::string header;
|
||||
@ -73,6 +88,8 @@ RtspResponse read_response(tcp::iostream& s, uint16_t max_length) {
|
||||
<< "cannot perform number conversion";
|
||||
}
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: reading body length "
|
||||
<< res.content_length;
|
||||
// read up to max_length
|
||||
if (res.content_length > 0 && res.content_length < max_length) {
|
||||
res.body.reserve(res.content_length);
|
||||
@ -83,19 +100,17 @@ RtspResponse read_response(tcp::iostream& s, uint16_t max_length) {
|
||||
return res;
|
||||
}
|
||||
|
||||
std::pair<bool, RtspSource> RtspClient::describe(const std::string& path,
|
||||
const std::string& address,
|
||||
const std::string& port) {
|
||||
RtspSource rs;
|
||||
bool success{false};
|
||||
std::pair<bool, RtspSource> RtspClient::process(
|
||||
RtspClient::Observer callback,
|
||||
const std::string& name,
|
||||
const std::string& domain,
|
||||
const std::string& path,
|
||||
const std::string& address,
|
||||
const std::string& port,
|
||||
bool wait_for_updates) {
|
||||
RtspSource rtsp_source;
|
||||
try {
|
||||
tcp::iostream s;
|
||||
|
||||
#if BOOST_VERSION < 106700
|
||||
s.expires_from_now(boost::posix_time::seconds(client_timeout));
|
||||
#else
|
||||
s.expires_from_now(std::chrono::seconds(client_timeout));
|
||||
#endif
|
||||
ip::tcp::iostream s;
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: connecting to "
|
||||
<< "rtsp://" << address << ":" << port << path;
|
||||
@ -103,10 +118,10 @@ std::pair<bool, RtspSource> RtspClient::describe(const std::string& path,
|
||||
if (!s) {
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< "rtsp_client:: unable to connect to " << address << ":" << port;
|
||||
return std::make_pair(success, rs);
|
||||
return std::make_pair(false, rtsp_source);
|
||||
}
|
||||
|
||||
uint16_t cseq = seq_number++;
|
||||
uint16_t cseq = g_seq_number++;
|
||||
s << "DESCRIBE rtsp://" << address << ":" << port
|
||||
<< httplib::detail::encode_url(path) << " RTSP/1.0\r\n";
|
||||
s << "CSeq: " << cseq << "\r\n";
|
||||
@ -122,56 +137,157 @@ std::pair<bool, RtspSource> RtspClient::describe(const std::string& path,
|
||||
s >> rtsp_version;
|
||||
unsigned int status_code;
|
||||
s >> status_code;
|
||||
std::string status_message;
|
||||
std::getline(s, status_message);
|
||||
std::string request;
|
||||
std::getline(s, request);
|
||||
|
||||
if (!s || rtsp_version.substr(0, 5) != "RTSP/") {
|
||||
BOOST_LOG_TRIVIAL(error) << "rtsp_client:: invalid response from "
|
||||
<< "rtsp://" << address << ":" << port << path;
|
||||
return std::make_pair(success, rs);
|
||||
return std::make_pair(false, rtsp_source);
|
||||
}
|
||||
|
||||
if (status_code != 200) {
|
||||
BOOST_LOG_TRIVIAL(error) << "rtsp_client:: response with status code "
|
||||
<< status_code << " from "
|
||||
<< "rtsp://" << address << ":" << port << path;
|
||||
return std::make_pair(success, rs);
|
||||
return std::make_pair(false, rtsp_source);
|
||||
}
|
||||
|
||||
auto res = read_response(s, max_body_length);
|
||||
if (res.content_type.rfind("application/sdp", 0) == std::string::npos) {
|
||||
BOOST_LOG_TRIVIAL(error) << "rtsp_client:: unsupported content-type "
|
||||
<< res.content_type << " from "
|
||||
<< "rtsp://" << address << ":" << port << path;
|
||||
return std::make_pair(success, rs);
|
||||
}
|
||||
if (res.cseq != cseq) {
|
||||
BOOST_LOG_TRIVIAL(error)
|
||||
<< "rtsp_client:: invalid response sequence " << res.cseq << " from "
|
||||
<< "rtsp://" << address << ":" << port << path;
|
||||
return std::make_pair(success, rs);
|
||||
}
|
||||
bool is_announce = false;
|
||||
bool is_describe = true;
|
||||
std::string announced_name;
|
||||
do {
|
||||
auto res = read_response(s, max_body_length);
|
||||
if (is_describe && res.cseq != cseq) {
|
||||
BOOST_LOG_TRIVIAL(error)
|
||||
<< "rtsp_client:: invalid response sequence " << res.cseq
|
||||
<< " from rtsp://" << address << ":" << port << path;
|
||||
return std::make_pair(false, rtsp_source);
|
||||
}
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "rtsp:" << std::hex
|
||||
<< crc16(reinterpret_cast<const uint8_t*>(res.body.c_str()),
|
||||
res.body.length());
|
||||
/*<< std::hex << ip::address_v4::from_string(address.c_str()).to_ulong();*/
|
||||
if (res.content_type.rfind("application/sdp", 0) == std::string::npos) {
|
||||
BOOST_LOG_TRIVIAL(error) << "rtsp_client:: unsupported content-type "
|
||||
<< res.content_type << " from "
|
||||
<< "rtsp://" << address << ":" << port << path;
|
||||
if (is_describe) {
|
||||
return std::make_pair(false, rtsp_source);
|
||||
}
|
||||
} else {
|
||||
std::stringstream ss;
|
||||
ss << "rtsp:" << std::hex
|
||||
<< crc16(reinterpret_cast<const uint8_t*>(res.body.c_str()),
|
||||
res.body.length());
|
||||
/*<< std::hex << ip::address_v4::from_string(address.c_str()).to_ulong();*/
|
||||
rtsp_source.id = ss.str();
|
||||
rtsp_source.source = "mDNS";
|
||||
rtsp_source.address = address;
|
||||
rtsp_source.sdp = std::move(res.body);
|
||||
BOOST_LOG_TRIVIAL(info) << "rtsp_client:: completed "
|
||||
<< "rtsp://" << address << ":" << port << path;
|
||||
|
||||
rs.id = ss.str();
|
||||
rs.source = "mDNS";
|
||||
rs.address = address;
|
||||
rs.sdp = std::move(res.body);
|
||||
if (is_announce || is_describe) {
|
||||
if (is_announce && announced_name.empty()) {
|
||||
/* if no name from URL we try from SDP file */
|
||||
announced_name = sdp_get_subject(rtsp_source.sdp);
|
||||
}
|
||||
callback(announced_name.empty() ? name : announced_name, domain,
|
||||
rtsp_source);
|
||||
}
|
||||
|
||||
BOOST_LOG_TRIVIAL(info) << "rtsp_client:: describe completed "
|
||||
<< "rtsp://" << address << ":" << port << path;
|
||||
if (is_announce) {
|
||||
s << "RTSP/1.0 200 OK\r\n";
|
||||
s << "CSeq: " << res.cseq << "\r\n";
|
||||
s << "\r\n";
|
||||
} else if (!is_describe) {
|
||||
s << "RTSP/1.0 405 Method Not Allowed\r\n";
|
||||
s << "CSeq: " << res.cseq << "\r\n";
|
||||
s << "\r\n";
|
||||
}
|
||||
}
|
||||
|
||||
if (wait_for_updates) {
|
||||
auto name_domain = std::make_pair(name, domain);
|
||||
g_mutex.lock();
|
||||
g_active_clients[name_domain] = &s;
|
||||
g_mutex.unlock();
|
||||
|
||||
/* we start waiting for updates */
|
||||
do {
|
||||
std::getline(s, request);
|
||||
} while (request.empty() && !s.error());
|
||||
if (s.error()) {
|
||||
BOOST_LOG_TRIVIAL(info) << "rtsp_client:: end: "
|
||||
<< s.error().message();
|
||||
break;
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(info) << "rtsp_client:: received " << request;
|
||||
boost::trim(request);
|
||||
is_describe = is_announce = false;
|
||||
announced_name = "";
|
||||
std::vector<std::string> fields;
|
||||
split(fields, request, boost::is_any_of(" "));
|
||||
if (fields.size() >= 2 && fields[0] == "ANNOUNCE") {
|
||||
auto const [ok, protocol, host, port, path] = parse_url(fields[1]);
|
||||
if (ok) {
|
||||
/* if we find a valid announced source name we use it
|
||||
* otherwise we try from SDP file or we use the mDNS name */
|
||||
if (path.rfind("/by-name/") != std::string::npos) {
|
||||
announced_name = path.substr(9);
|
||||
BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: found announced name "
|
||||
<< announced_name;
|
||||
}
|
||||
}
|
||||
is_announce = true;
|
||||
}
|
||||
}
|
||||
} while (wait_for_updates);
|
||||
|
||||
success = true;
|
||||
} catch (std::exception& e) {
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< "rtsp_client:: error with "
|
||||
<< "rtsp://" << address << ":" << port << path << ": " << e.what();
|
||||
return std::make_pair(false, rtsp_source);
|
||||
}
|
||||
|
||||
return std::make_pair(success, rs);
|
||||
return std::make_pair(true, rtsp_source);
|
||||
}
|
||||
|
||||
|
||||
void RtspClient::stop(const std::string& name, const std::string& domain) {
|
||||
std::lock_guard<std::mutex> lock(g_mutex);
|
||||
auto it = g_active_clients.find(std::make_pair(name, domain));
|
||||
if (it != g_active_clients.end()) {
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "rtsp_client:: stopping client " << name << " " << domain;
|
||||
#if BOOST_VERSION < 106600
|
||||
it->second->close();
|
||||
#else
|
||||
it->second->socket().shutdown(tcp::socket::shutdown_both);
|
||||
#endif
|
||||
g_active_clients.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void RtspClient::stop_all() {
|
||||
std::lock_guard<std::mutex> lock(g_mutex);
|
||||
auto it = g_active_clients.begin();
|
||||
while (it != g_active_clients.end()) {
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "rtsp_client:: stopping client "
|
||||
<< it->first.first << " " << it->first.second;
|
||||
#if BOOST_VERSION < 106600
|
||||
it->second->close();
|
||||
#else
|
||||
it->second->socket().shutdown(tcp::socket::shutdown_both);
|
||||
#endif
|
||||
it = g_active_clients.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<bool, RtspSource> RtspClient::describe(
|
||||
const std::string& path,
|
||||
const std::string& address,
|
||||
const std::string& port) {
|
||||
return RtspClient::process({}, {}, {}, path, address, port, false);
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,9 @@
|
||||
#ifndef _RTSP_CLIENT_HPP_
|
||||
#define _RTSP_CLIENT_HPP_
|
||||
|
||||
#include <mutex>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
struct RtspSource {
|
||||
std::string id;
|
||||
std::string source;
|
||||
@ -33,12 +36,33 @@ class RtspClient {
|
||||
constexpr static uint16_t client_timeout = 10; // sec
|
||||
constexpr static const char dft_port[] = "554";
|
||||
|
||||
static std::pair<bool, RtspSource> describe(
|
||||
const std::string& path,
|
||||
const std::string& address,
|
||||
const std::string& port = dft_port);
|
||||
using Observer = std::function<void(
|
||||
const std::string& name,
|
||||
const std::string& domain,
|
||||
const RtspSource& source)>;
|
||||
|
||||
static std::pair<bool, RtspSource> process(
|
||||
Observer callback,
|
||||
const std::string& name,
|
||||
const std::string& domain,
|
||||
const std::string& path,
|
||||
const std::string& address,
|
||||
const std::string& port = dft_port,
|
||||
bool wait_for_updates = true);
|
||||
|
||||
static void stop(const std::string& name, const std::string& domain);
|
||||
static void stop_all();
|
||||
|
||||
static std::pair<bool, RtspSource> describe(
|
||||
const std::string& path,
|
||||
const std::string& address,
|
||||
const std::string& port = dft_port);
|
||||
|
||||
inline static std::atomic<uint16_t> g_seq_number{0};
|
||||
inline static std::map<std::pair<std::string /*name*/, std::string /*domain*/>,
|
||||
boost::asio::ip::tcp::iostream* /*stream*/> g_active_clients;
|
||||
inline static std::mutex g_mutex;
|
||||
|
||||
inline static std::atomic<uint16_t> seq_number;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
Loading…
x
Reference in New Issue
Block a user