Added support for unicast RTP stream address to daemon and WebUI, see #30

- added address field to Source struct definition to contain the destination address. If this field is left empty the default multicast address is used.
- if a unicast address is provided when creating a new Source the daemon looks in the ARP cache to retrieve the corresponding MAC address.
  If this is not found the daemon tries to connect to echo TCP service of the destination host and looks into the ARP cache up to 3 times and then returns and error.
- added RTP address field in the Source creating form of the WebUI. If this field is left empty the default multicast address is used instead.
- updated documentation and regression test suite
This commit is contained in:
Andrea Bondavalli 2021-03-04 18:31:34 +01:00
parent 399e22216a
commit 1af74dcbf0
14 changed files with 205 additions and 30 deletions

View File

@ -315,6 +315,7 @@ Example:
"name": "ALSA Source 0",
"io": "Audio Device",
"codec": "L16",
"address": "",
"max_samples_per_packet": 48,
"ttl": 15,
"payload_type": 98,
@ -338,6 +339,11 @@ where:
> JSON string specifying codec in use.
> Valid values are L16, L24 and AM824 (L32).
> **address**
> JSON string specifying the destination address to use.
> If this field contains a valid address it's used instead of the default multicast address for the source.
> In case an unicast address is provided this must be of an host currently active on the local network.
> **max\_sample\_per\_packet**
> JSON number specifying the max number of samples contained in one RTP packet.
> Valid values are 12, 16, 48, 96, 192.

View File

@ -16,6 +16,9 @@
"syslog_proto": "none",
"syslog_server": "255.255.255.254:1234",
"status_file": "./status.json",
"interface_name": "enp2s0",
"mdns_enabled": true,
"interface_name": "lo"
"mac_addr": "84:39:be:6d:dc:74",
"ip_addr": "10.0.0.8",
"node_id": "AES67 daemon 000a0800"
}

View File

@ -95,6 +95,8 @@ std::string DaemonErrCategory::message(int ev) const {
return "stream id is in use";
case DaemonErrc::stream_name_in_use:
return "stream name is in use";
case DaemonErrc::cannot_retrieve_mac:
return "cannot retrieve MAC address for IP";
case DaemonErrc::stream_id_not_in_use:
return "stream not in use";
case DaemonErrc::invalid_url:

View File

@ -52,6 +52,7 @@ enum class DaemonErrc {
cannot_retrieve_sdp = 44, // daemon cannot retrieve SDP
cannot_parse_sdp = 45, // daemon cannot parse SDP
stream_name_in_use = 46, // daemon source or sink name in use
cannot_retrieve_mac = 47, // daemon cannot retrieve MAC for IP
send_invalid_size = 50, // daemon data size too big for buffer
send_u2k_failed = 51, // daemon failed to send command to driver
send_k2u_failed = 52, // daemon failed to send event response to driver

View File

@ -19,6 +19,10 @@
//
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/lexical_cast.hpp>
#include <fstream>
#include <utility>
#include "log.hpp"
@ -110,3 +114,88 @@ int get_interface_index(const std::string& interface_name) {
return ifr.ifr_ifindex;
}
std::pair<std::array<uint8_t, 6>, std::string> get_mac_from_arp_cache(
const std::string& interface_name,
const std::string& ip) {
const std::string arpProcPath("/proc/net/arp");
std::array<uint8_t, 6> mac{0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
std::ifstream stream(arpProcPath);
while (stream) {
std::string line;
std::vector<std::string> tokens;
std::getline(stream, line);
if (line.find(ip)) {
continue;
}
boost::split(tokens, line, boost::is_any_of(" "), boost::token_compress_on);
/* check that IP is on the correct interface */
if (tokens.size() >= 6 && tokens[5] == interface_name) {
std::vector<std::string> vec;
/* parse MAC */
boost::split(vec, tokens[3], boost::is_any_of(":"));
int j = 0;
bool check = false;
for (auto const& item : vec) {
mac[j] = strtol(item.c_str(), NULL, 16);
check |= mac[j];
j++;
}
if (check) {
return {mac, tokens[3]};
}
}
}
BOOST_LOG_TRIVIAL(debug)
<< "get_mac_from_arp_cache:: cannot retrieve MAC for IP " << ip
<< " on interface " << interface_name;
return {mac, ""};
}
bool ping(const std::string& ip) {
static uint16_t sequence_number(0);
uint16_t identifier(0xABAB);
uint8_t buffer[10];
// Create an ICMP header for an echo request.
buffer[0] = 0x8; // echo request
buffer[1] = 0x0; // code
memcpy(buffer + 2, &identifier, 2); // identifier
memcpy(buffer + 4, &sequence_number, 2); // sequence number
memcpy(buffer + 6, "ping", 4); // body
// this requires root priv
try {
io_service io_service;
icmp::socket socket{io_service, icmp::v4()};
ip::icmp::endpoint destination(ip::icmp::v4(),
ip::address_v4::from_string(ip).to_ulong());
socket.send_to(boost::asio::buffer(buffer, sizeof buffer), destination);
} catch (...) {
BOOST_LOG_TRIVIAL(error) << "ping:: send_to() failed";
return false;
}
return true;
}
bool echo_try_connect(const std::string& ip) {
ip::tcp::iostream s;
BOOST_LOG_TRIVIAL(debug) << "echo_connect:: connecting to " << ip;
#if BOOST_VERSION < 106600
s.expires_from_now(boost::posix_time::seconds(1));
#else
s.expires_after(boost::asio::chrono::seconds(1));
#endif
s.connect(ip, "7");
if (!s || s.error()) {
BOOST_LOG_TRIVIAL(debug) << "echo_connect:: unable to connect to " << ip;
return false;
}
s.close();
return true;
}

View File

@ -25,5 +25,10 @@ std::pair<uint32_t, std::string> get_interface_ip(
std::pair<std::array<uint8_t, 6>, std::string> get_interface_mac(
const std::string& interface_name);
int get_interface_index(const std::string& interface_name);
std::pair<std::array<uint8_t, 6>, std::string> get_mac_from_arp_cache(
const std::string& interface_name,
const std::string& ip);
bool ping(const std::string& ip);
bool echo_try_connect(const std::string& ip);
#endif

View File

@ -117,6 +117,7 @@ std::string source_to_json(const StreamSource& source) {
<< ",\n \"io\": \"" << escape_json(source.io) << "\""
<< ",\n \"max_samples_per_packet\": " << source.max_samples_per_packet
<< ",\n \"codec\": \"" << escape_json(source.codec) << "\""
<< ",\n \"address\": \"" << escape_json(source.address) << "\""
<< ",\n \"ttl\": " << unsigned(source.ttl)
<< ",\n \"payload_type\": " << unsigned(source.payload_type)
<< ",\n \"dscp\": " << +unsigned(source.dscp)
@ -363,6 +364,7 @@ StreamSource json_to_source(const std::string& id, const std::string& json) {
"map": [ 0, 1, 2, 3, 4, 5, 6, 7 ],
"max_samples_per_packet": 48,
"codec": "L24",
"address": "",
"ttl": 15,
"payload_type": 98,
"dscp": 34,
@ -386,6 +388,7 @@ StreamSource json_to_source(const std::string& id, const std::string& json) {
}
source.max_samples_per_packet = pt.get<uint32_t>("max_samples_per_packet");
source.codec = remove_undesired_chars(pt.get<std::string>("codec"));
source.address = remove_undesired_chars(pt.get<std::string>("address"));
source.ttl = pt.get<uint8_t>("ttl");
source.payload_type = pt.get<uint8_t>("payload_type");
source.dscp = pt.get<uint8_t>("dscp");
@ -472,6 +475,7 @@ static void parse_json_sources(boost::property_tree::ptree& pt,
source.max_samples_per_packet =
v.second.get<uint32_t>("max_samples_per_packet");
source.codec = v.second.get<std::string>("codec");
source.address = v.second.get<std::string>("address");
source.ttl = v.second.get<uint8_t>("ttl");
source.payload_type = v.second.get<uint8_t>("payload_type");
source.dscp = v.second.get<uint8_t>("dscp");

View File

@ -35,7 +35,7 @@ namespace po = boost::program_options;
namespace postyle = boost::program_options::command_line_style;
namespace logging = boost::log;
static std::string version("bondagit-1.0");
static std::string version("bondagit-1.1-beta");
static std::atomic<bool> terminate = false;
void termination_handler(int signum) {

View File

@ -44,7 +44,7 @@ bool SAP::set_multicast_interface(const std::string& interface_ip) {
<< "sap::outbound_interface option " << ec.message();
return false;
}
/* we don't want receive self announced sessions */
/* we don't want to receive our sessions */
ip::multicast::enable_loopback el_option(false);
socket_.set_option(el_option, ec);
if (ec) {

View File

@ -36,6 +36,7 @@
#include "rtsp_client.hpp"
#include "utils.hpp"
#include "session_manager.hpp"
#include "interface.hpp"
static uint8_t get_codec_word_lenght(const std::string& codec) {
if (codec == "L16") {
@ -219,6 +220,7 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const {
}
case 'c':
/* c=IN IP4 239.1.0.12/15 */
/* c=IN IP4 10.0.0.1 */
/* connection info of audio media */
if (status == sdp_parser_status::media ||
/* generic connection info */
@ -226,7 +228,7 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const {
std::vector<std::string> fields;
boost::split(fields, val,
[line](char c) { return c == ' ' || c == '/'; });
if (fields.size() != 4) {
if (fields.size() < 3) {
BOOST_LOG_TRIVIAL(error)
<< "session_manager:: invalid connection in SDP at line "
<< num;
@ -246,10 +248,19 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const {
<< num;
return false;
}
info.stream.m_byTTL = std::stoi(fields[3]);
if (fields.size() > 3) {
info.stream.m_byTTL = std::stoi(fields[3]);
} else {
info.stream.m_byTTL = 64;
}
}
break;
default:
if (line[0] < 'a' || line[0] > 'z') {
BOOST_LOG_TRIVIAL(fatal)
<< "session_manager:: invalid SDP at line " << num;
return false;
}
break;
}
}
@ -329,6 +340,7 @@ StreamSource SessionManager::get_source_(uint8_t id,
info.io,
info.stream.m_ui32MaxSamplesPerPacket,
info.stream.m_cCodec,
ip::address_v4(info.stream.m_ui32DestIP).to_string(),
info.stream.m_byTTL,
info.stream.m_byPayloadType,
info.stream.m_ucDSCP,
@ -436,8 +448,10 @@ void SessionManager::on_add_source(const StreamSource& source,
for (auto cb : add_source_observers) {
cb(source.id, source.name, get_source_sdp_(source.id, info));
}
igmp_.join(config_->get_ip_addr_str(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
igmp_.join(config_->get_ip_addr_str(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
}
source_names_[source.name] = source.id;
}
@ -445,8 +459,10 @@ void SessionManager::on_remove_source(const StreamInfo& info) {
for (auto cb : remove_source_observers) {
cb(info.stream.m_uiId, info.stream.m_cName, {});
}
igmp_.leave(config_->get_ip_addr_str(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
igmp_.leave(config_->get_ip_addr_str(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
}
source_names_.erase(info.stream.m_cName);
}
@ -463,8 +479,7 @@ std::error_code SessionManager::add_source(const StreamSource& source) {
info.stream.m_ui32CRTP_stream_info_sizeof = sizeof(info.stream);
strncpy(info.stream.m_cName, source.name.c_str(),
sizeof(info.stream.m_cName) - 1);
info.stream.m_ucDSCP = source.dscp; // IPv4 DSCP
info.stream.m_byTTL = source.ttl;
info.stream.m_ucDSCP = source.dscp; // IPv4 DSCP
info.stream.m_byPayloadType = source.payload_type;
info.stream.m_byWordLength = get_codec_word_lenght(source.codec);
info.stream.m_byNbOfChannels = source.map.size();
@ -476,17 +491,47 @@ std::error_code SessionManager::add_source(const StreamSource& source) {
info.stream.m_uiId = source.id;
info.stream.m_ui32RTCPSrcIP = config_->get_ip_addr();
info.stream.m_ui32SrcIP = config_->get_ip_addr(); // only for Source
info.stream.m_ui32DestIP =
ip::address_v4::from_string(config_->get_rtp_mcast_base().c_str())
.to_ulong() +
source.id;
boost::system::error_code ec;
ip::address_v4::from_string(source.address, ec);
if (!ec) {
info.stream.m_ui32DestIP =
ip::address_v4::from_string(source.address).to_ulong();
} else {
info.stream.m_ui32DestIP =
ip::address_v4::from_string(config_->get_rtp_mcast_base().c_str())
.to_ulong() +
source.id;
}
info.stream.m_usSrcPort = config_->get_rtp_port();
info.stream.m_usDestPort = config_->get_rtp_port();
info.stream.m_ui32SSRC = rand() % 65536; // use random number
std::copy(source.map.begin(), source.map.end(), info.stream.m_aui32Routing);
auto mcast_mac_addr = get_mcast_mac_addr(info.stream.m_ui32DestIP);
std::copy(std::begin(mcast_mac_addr), std::end(mcast_mac_addr),
info.stream.m_ui8DestMAC);
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
auto mac_addr = get_mcast_mac_addr(info.stream.m_ui32DestIP);
std::copy(std::begin(mac_addr), std::end(mac_addr),
info.stream.m_ui8DestMAC);
info.stream.m_byTTL = source.ttl;
} else {
auto mac_addr = get_mac_from_arp_cache(config_->get_interface_name(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
int retry = 3;
while (!mac_addr.second.length() && retry--) {
// if not in cache already try to populate the MAC cache
(void)echo_try_connect(ip::address_v4(info.stream.m_ui32DestIP).to_string());
mac_addr = get_mac_from_arp_cache(config_->get_interface_name(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
}
if (!mac_addr.second.length()) {
BOOST_LOG_TRIVIAL(error)
<< "session_manager:: cannot retrieve MAC address for IP "
<< config_->get_rtp_mcast_base();
return DaemonErrc::cannot_retrieve_mac;
}
std::copy(std::begin(mac_addr.first), std::end(mac_addr.first),
info.stream.m_ui8DestMAC);
info.stream.m_byTTL = 64;
}
info.refclk_ptp_traceable = source.refclk_ptp_traceable;
info.enabled = source.enabled;
@ -567,16 +612,20 @@ std::string SessionManager::get_source_sdp_(uint32_t id,
<< ip::address_v4(info.stream.m_ui32SrcIP).to_string() << "\n"
<< "s=" << get_node_id(config_->get_ip_addr()) << " "
<< info.stream.m_cName << "\n"
<< "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string()
<< "/" << static_cast<unsigned>(info.stream.m_byTTL) << "\n"
<< "t=0 0\n"
<< "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string();
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
ss << "/" << static_cast<unsigned>(info.stream.m_byTTL);
}
ss << "\nt=0 0\n"
<< "a=clock-domain:PTPv2 " << static_cast<unsigned>(ptp_config_.domain)
<< "\n"
<< "m=audio " << info.stream.m_usSrcPort << " RTP/AVP "
<< static_cast<unsigned>(info.stream.m_byPayloadType) << "\n"
<< "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string()
<< "/" << static_cast<unsigned>(info.stream.m_byTTL) << "\n"
<< "a=rtpmap:" << static_cast<unsigned>(info.stream.m_byPayloadType) << " "
<< "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string();
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
ss << "/" << static_cast<unsigned>(info.stream.m_byTTL);
}
ss << "\na=rtpmap:" << static_cast<unsigned>(info.stream.m_byPayloadType) << " "
<< info.stream.m_cCodec << "/" << sample_rate << "/"
<< static_cast<unsigned>(info.stream.m_byNbOfChannels) << "\n"
<< "a=sync-time:0\n"
@ -645,14 +694,18 @@ uint8_t SessionManager::get_sink_id(const std::string& name) const {
void SessionManager::on_add_sink(const StreamSink& sink,
const StreamInfo& info) {
igmp_.join(config_->get_ip_addr_str(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
igmp_.join(config_->get_ip_addr_str(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
}
sink_names_[sink.name] = sink.id;
}
void SessionManager::on_remove_sink(const StreamInfo& info) {
igmp_.leave(config_->get_ip_addr_str(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
if (IN_MULTICAST(info.stream.m_ui32DestIP)) {
igmp_.leave(config_->get_ip_addr_str(),
ip::address_v4(info.stream.m_ui32DestIP).to_string());
}
sink_names_.erase(info.stream.m_cName);
}

View File

@ -38,6 +38,7 @@ struct StreamSource {
std::string io;
uint32_t max_samples_per_packet{0};
std::string codec;
std::string address;
uint8_t ttl{0};
uint8_t payload_type{0};
uint8_t dscp{0};

View File

@ -144,6 +144,7 @@ struct Client {
"map": [ 0, 1 ],
"max_samples_per_packet": 48,
"codec": "L16",
"address": "",
"ttl": 15,
"payload_type": 98,
"dscp": 34,
@ -167,6 +168,7 @@ struct Client {
"map": [ 0, 1 ],
"max_samples_per_packet": 192,
"codec": "L24",
"address": "",
"ttl": 15,
"payload_type": 98,
"dscp": 34,

View File

@ -128,13 +128,14 @@ export default class RestAPI {
});
}
static addSource(id, enabled, name, io, max_samples_per_packet, codec, ttl, payload_type, dscp, refclk_ptp_traceable, map, is_edit) {
static addSource(id, enabled, name, io, max_samples_per_packet, codec, address, ttl, payload_type, dscp, refclk_ptp_traceable, map, is_edit) {
return this.doFetch(source + '/' + id, {
body: JSON.stringify({
enabled: enabled,
name: name,
io: io,
codec: codec,
address: address,
map: map,
max_samples_per_packet: parseInt(max_samples_per_packet, 10),
ttl: parseInt(ttl, 10),

View File

@ -61,6 +61,8 @@ class SourceEdit extends Component {
nameErr: false,
io: this.props.source.io,
codec: this.props.source.codec,
address: this.props.source.address,
addressErr: false,
ttl: this.props.source.ttl,
ttlErr: false,
payloadType: this.props.source.payload_type,
@ -105,6 +107,7 @@ class SourceEdit extends Component {
this.state.io,
this.state.maxSamplesPerPacket,
this.state.codec,
this.state.address ? this.state.address : "",
this.state.ttl,
this.state.payloadType,
this.state.dscp,
@ -212,7 +215,8 @@ class SourceEdit extends Component {
return !this.state.nameErr &&
!this.state.ttlErr &&
!this.state.channelsErr &&
!this.state.payloadTypeErr;
!this.state.payloadTypeErr &&
!this.state.addressErr;
}
render() {
@ -260,6 +264,10 @@ class SourceEdit extends Component {
</select>
</th>
</tr>
<tr>
<th align="left"> <label>RTP address</label> </th>
<th align="left"> <input type="text" minLength="7" maxLength="15" size="15" pattern="^$|^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$" value={this.state.address} onChange={e => this.setState({address: e.target.value, addressErr: !e.currentTarget.checkValidity()})} optional/> </th>
</tr>
<tr>
<th align="left"> <label>Payload Type</label> </th>
<th align="left"> <input type='number' min='77' max='127' className='input-number' value={this.state.payloadType} onChange={e => this.setState({payloadType: e.target.value, payloadTypeErr: !e.currentTarget.checkValidity()})} required/> </th>