From 1af74dcbf0ccf32a39508a0afa67addcc8adb7e4 Mon Sep 17 00:00:00 2001 From: Andrea Bondavalli Date: Thu, 4 Mar 2021 18:31:34 +0100 Subject: [PATCH] Added support for unicast RTP stream address to daemon and WebUI, see #30 - added address field to Source struct definition to contain the destination address. If this field is left empty the default multicast address is used. - if a unicast address is provided when creating a new Source the daemon looks in the ARP cache to retrieve the corresponding MAC address. If this is not found the daemon tries to connect to echo TCP service of the destination host and looks into the ARP cache up to 3 times and then returns and error. - added RTP address field in the Source creating form of the WebUI. If this field is left empty the default multicast address is used instead. - updated documentation and regression test suite --- daemon/README.md | 6 ++ daemon/daemon.conf | 5 +- daemon/error_code.cpp | 2 + daemon/error_code.hpp | 1 + daemon/interface.cpp | 89 ++++++++++++++++++++++++++++++ daemon/interface.hpp | 5 ++ daemon/json.cpp | 4 ++ daemon/main.cpp | 2 +- daemon/sap.cpp | 2 +- daemon/session_manager.cpp | 103 ++++++++++++++++++++++++++--------- daemon/session_manager.hpp | 1 + daemon/tests/daemon_test.cpp | 2 + webui/src/Services.js | 3 +- webui/src/SourceEdit.js | 10 +++- 14 files changed, 205 insertions(+), 30 deletions(-) diff --git a/daemon/README.md b/daemon/README.md index 4dc9091..3943d2c 100644 --- a/daemon/README.md +++ b/daemon/README.md @@ -315,6 +315,7 @@ Example: "name": "ALSA Source 0", "io": "Audio Device", "codec": "L16", + "address": "", "max_samples_per_packet": 48, "ttl": 15, "payload_type": 98, @@ -338,6 +339,11 @@ where: > JSON string specifying codec in use. > Valid values are L16, L24 and AM824 (L32). +> **address** +> JSON string specifying the destination address to use. +> If this field contains a valid address it's used instead of the default multicast address for the source. +> In case an unicast address is provided this must be of an host currently active on the local network. + > **max\_sample\_per\_packet** > JSON number specifying the max number of samples contained in one RTP packet. > Valid values are 12, 16, 48, 96, 192. diff --git a/daemon/daemon.conf b/daemon/daemon.conf index 8204681..8afbf89 100644 --- a/daemon/daemon.conf +++ b/daemon/daemon.conf @@ -16,6 +16,9 @@ "syslog_proto": "none", "syslog_server": "255.255.255.254:1234", "status_file": "./status.json", + "interface_name": "enp2s0", "mdns_enabled": true, - "interface_name": "lo" + "mac_addr": "84:39:be:6d:dc:74", + "ip_addr": "10.0.0.8", + "node_id": "AES67 daemon 000a0800" } diff --git a/daemon/error_code.cpp b/daemon/error_code.cpp index ee8f7ac..95bcf7c 100644 --- a/daemon/error_code.cpp +++ b/daemon/error_code.cpp @@ -95,6 +95,8 @@ std::string DaemonErrCategory::message(int ev) const { return "stream id is in use"; case DaemonErrc::stream_name_in_use: return "stream name is in use"; + case DaemonErrc::cannot_retrieve_mac: + return "cannot retrieve MAC address for IP"; case DaemonErrc::stream_id_not_in_use: return "stream not in use"; case DaemonErrc::invalid_url: diff --git a/daemon/error_code.hpp b/daemon/error_code.hpp index df1ef16..e492a11 100644 --- a/daemon/error_code.hpp +++ b/daemon/error_code.hpp @@ -52,6 +52,7 @@ enum class DaemonErrc { cannot_retrieve_sdp = 44, // daemon cannot retrieve SDP cannot_parse_sdp = 45, // daemon cannot parse SDP stream_name_in_use = 46, // daemon source or sink name in use + cannot_retrieve_mac = 47, // daemon cannot retrieve MAC for IP send_invalid_size = 50, // daemon data size too big for buffer send_u2k_failed = 51, // daemon failed to send command to driver send_k2u_failed = 52, // daemon failed to send event response to driver diff --git a/daemon/interface.cpp b/daemon/interface.cpp index 998ac9f..fa23019 100644 --- a/daemon/interface.cpp +++ b/daemon/interface.cpp @@ -19,6 +19,10 @@ // #include +#include +#include +#include +#include #include #include "log.hpp" @@ -110,3 +114,88 @@ int get_interface_index(const std::string& interface_name) { return ifr.ifr_ifindex; } + + +std::pair, std::string> get_mac_from_arp_cache( + const std::string& interface_name, + const std::string& ip) { + const std::string arpProcPath("/proc/net/arp"); + std::array mac{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + std::ifstream stream(arpProcPath); + + while (stream) { + std::string line; + std::vector tokens; + + std::getline(stream, line); + if (line.find(ip)) { + continue; + } + boost::split(tokens, line, boost::is_any_of(" "), boost::token_compress_on); + /* check that IP is on the correct interface */ + if (tokens.size() >= 6 && tokens[5] == interface_name) { + std::vector vec; + /* parse MAC */ + boost::split(vec, tokens[3], boost::is_any_of(":")); + int j = 0; + bool check = false; + for (auto const& item : vec) { + mac[j] = strtol(item.c_str(), NULL, 16); + check |= mac[j]; + j++; + } + if (check) { + return {mac, tokens[3]}; + } + } + } + BOOST_LOG_TRIVIAL(debug) + << "get_mac_from_arp_cache:: cannot retrieve MAC for IP " << ip + << " on interface " << interface_name; + return {mac, ""}; +} + + +bool ping(const std::string& ip) { + static uint16_t sequence_number(0); + uint16_t identifier(0xABAB); + uint8_t buffer[10]; + + // Create an ICMP header for an echo request. + buffer[0] = 0x8; // echo request + buffer[1] = 0x0; // code + memcpy(buffer + 2, &identifier, 2); // identifier + memcpy(buffer + 4, &sequence_number, 2); // sequence number + memcpy(buffer + 6, "ping", 4); // body + + // this requires root priv + try { + io_service io_service; + icmp::socket socket{io_service, icmp::v4()}; + ip::icmp::endpoint destination(ip::icmp::v4(), + ip::address_v4::from_string(ip).to_ulong()); + socket.send_to(boost::asio::buffer(buffer, sizeof buffer), destination); + } catch (...) { + BOOST_LOG_TRIVIAL(error) << "ping:: send_to() failed"; + return false; + } + return true; +} + + +bool echo_try_connect(const std::string& ip) { + ip::tcp::iostream s; + BOOST_LOG_TRIVIAL(debug) << "echo_connect:: connecting to " << ip; +#if BOOST_VERSION < 106600 + s.expires_from_now(boost::posix_time::seconds(1)); +#else + s.expires_after(boost::asio::chrono::seconds(1)); +#endif + s.connect(ip, "7"); + if (!s || s.error()) { + BOOST_LOG_TRIVIAL(debug) << "echo_connect:: unable to connect to " << ip; + return false; + } + s.close(); + return true; +} diff --git a/daemon/interface.hpp b/daemon/interface.hpp index 2a7fdb0..cca228e 100644 --- a/daemon/interface.hpp +++ b/daemon/interface.hpp @@ -25,5 +25,10 @@ std::pair get_interface_ip( std::pair, std::string> get_interface_mac( const std::string& interface_name); int get_interface_index(const std::string& interface_name); +std::pair, std::string> get_mac_from_arp_cache( + const std::string& interface_name, + const std::string& ip); +bool ping(const std::string& ip); +bool echo_try_connect(const std::string& ip); #endif diff --git a/daemon/json.cpp b/daemon/json.cpp index 3f7a38f..649ef29 100644 --- a/daemon/json.cpp +++ b/daemon/json.cpp @@ -117,6 +117,7 @@ std::string source_to_json(const StreamSource& source) { << ",\n \"io\": \"" << escape_json(source.io) << "\"" << ",\n \"max_samples_per_packet\": " << source.max_samples_per_packet << ",\n \"codec\": \"" << escape_json(source.codec) << "\"" + << ",\n \"address\": \"" << escape_json(source.address) << "\"" << ",\n \"ttl\": " << unsigned(source.ttl) << ",\n \"payload_type\": " << unsigned(source.payload_type) << ",\n \"dscp\": " << +unsigned(source.dscp) @@ -363,6 +364,7 @@ StreamSource json_to_source(const std::string& id, const std::string& json) { "map": [ 0, 1, 2, 3, 4, 5, 6, 7 ], "max_samples_per_packet": 48, "codec": "L24", + "address": "", "ttl": 15, "payload_type": 98, "dscp": 34, @@ -386,6 +388,7 @@ StreamSource json_to_source(const std::string& id, const std::string& json) { } source.max_samples_per_packet = pt.get("max_samples_per_packet"); source.codec = remove_undesired_chars(pt.get("codec")); + source.address = remove_undesired_chars(pt.get("address")); source.ttl = pt.get("ttl"); source.payload_type = pt.get("payload_type"); source.dscp = pt.get("dscp"); @@ -472,6 +475,7 @@ static void parse_json_sources(boost::property_tree::ptree& pt, source.max_samples_per_packet = v.second.get("max_samples_per_packet"); source.codec = v.second.get("codec"); + source.address = v.second.get("address"); source.ttl = v.second.get("ttl"); source.payload_type = v.second.get("payload_type"); source.dscp = v.second.get("dscp"); diff --git a/daemon/main.cpp b/daemon/main.cpp index 3c6fd75..a73dd4e 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -35,7 +35,7 @@ namespace po = boost::program_options; namespace postyle = boost::program_options::command_line_style; namespace logging = boost::log; -static std::string version("bondagit-1.0"); +static std::string version("bondagit-1.1-beta"); static std::atomic terminate = false; void termination_handler(int signum) { diff --git a/daemon/sap.cpp b/daemon/sap.cpp index 804940d..3149ef5 100644 --- a/daemon/sap.cpp +++ b/daemon/sap.cpp @@ -44,7 +44,7 @@ bool SAP::set_multicast_interface(const std::string& interface_ip) { << "sap::outbound_interface option " << ec.message(); return false; } - /* we don't want receive self announced sessions */ + /* we don't want to receive our sessions */ ip::multicast::enable_loopback el_option(false); socket_.set_option(el_option, ec); if (ec) { diff --git a/daemon/session_manager.cpp b/daemon/session_manager.cpp index e6fcb7d..2c00a0e 100644 --- a/daemon/session_manager.cpp +++ b/daemon/session_manager.cpp @@ -36,6 +36,7 @@ #include "rtsp_client.hpp" #include "utils.hpp" #include "session_manager.hpp" +#include "interface.hpp" static uint8_t get_codec_word_lenght(const std::string& codec) { if (codec == "L16") { @@ -219,6 +220,7 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const { } case 'c': /* c=IN IP4 239.1.0.12/15 */ + /* c=IN IP4 10.0.0.1 */ /* connection info of audio media */ if (status == sdp_parser_status::media || /* generic connection info */ @@ -226,7 +228,7 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const { std::vector fields; boost::split(fields, val, [line](char c) { return c == ' ' || c == '/'; }); - if (fields.size() != 4) { + if (fields.size() < 3) { BOOST_LOG_TRIVIAL(error) << "session_manager:: invalid connection in SDP at line " << num; @@ -246,10 +248,19 @@ bool SessionManager::parse_sdp(const std::string sdp, StreamInfo& info) const { << num; return false; } - info.stream.m_byTTL = std::stoi(fields[3]); + if (fields.size() > 3) { + info.stream.m_byTTL = std::stoi(fields[3]); + } else { + info.stream.m_byTTL = 64; + } } break; default: + if (line[0] < 'a' || line[0] > 'z') { + BOOST_LOG_TRIVIAL(fatal) + << "session_manager:: invalid SDP at line " << num; + return false; + } break; } } @@ -329,6 +340,7 @@ StreamSource SessionManager::get_source_(uint8_t id, info.io, info.stream.m_ui32MaxSamplesPerPacket, info.stream.m_cCodec, + ip::address_v4(info.stream.m_ui32DestIP).to_string(), info.stream.m_byTTL, info.stream.m_byPayloadType, info.stream.m_ucDSCP, @@ -436,8 +448,10 @@ void SessionManager::on_add_source(const StreamSource& source, for (auto cb : add_source_observers) { cb(source.id, source.name, get_source_sdp_(source.id, info)); } - igmp_.join(config_->get_ip_addr_str(), - ip::address_v4(info.stream.m_ui32DestIP).to_string()); + if (IN_MULTICAST(info.stream.m_ui32DestIP)) { + igmp_.join(config_->get_ip_addr_str(), + ip::address_v4(info.stream.m_ui32DestIP).to_string()); + } source_names_[source.name] = source.id; } @@ -445,8 +459,10 @@ void SessionManager::on_remove_source(const StreamInfo& info) { for (auto cb : remove_source_observers) { cb(info.stream.m_uiId, info.stream.m_cName, {}); } - igmp_.leave(config_->get_ip_addr_str(), - ip::address_v4(info.stream.m_ui32DestIP).to_string()); + if (IN_MULTICAST(info.stream.m_ui32DestIP)) { + igmp_.leave(config_->get_ip_addr_str(), + ip::address_v4(info.stream.m_ui32DestIP).to_string()); + } source_names_.erase(info.stream.m_cName); } @@ -463,8 +479,7 @@ std::error_code SessionManager::add_source(const StreamSource& source) { info.stream.m_ui32CRTP_stream_info_sizeof = sizeof(info.stream); strncpy(info.stream.m_cName, source.name.c_str(), sizeof(info.stream.m_cName) - 1); - info.stream.m_ucDSCP = source.dscp; // IPv4 DSCP - info.stream.m_byTTL = source.ttl; + info.stream.m_ucDSCP = source.dscp; // IPv4 DSCP info.stream.m_byPayloadType = source.payload_type; info.stream.m_byWordLength = get_codec_word_lenght(source.codec); info.stream.m_byNbOfChannels = source.map.size(); @@ -476,17 +491,47 @@ std::error_code SessionManager::add_source(const StreamSource& source) { info.stream.m_uiId = source.id; info.stream.m_ui32RTCPSrcIP = config_->get_ip_addr(); info.stream.m_ui32SrcIP = config_->get_ip_addr(); // only for Source - info.stream.m_ui32DestIP = - ip::address_v4::from_string(config_->get_rtp_mcast_base().c_str()) - .to_ulong() + - source.id; + boost::system::error_code ec; + ip::address_v4::from_string(source.address, ec); + if (!ec) { + info.stream.m_ui32DestIP = + ip::address_v4::from_string(source.address).to_ulong(); + } else { + info.stream.m_ui32DestIP = + ip::address_v4::from_string(config_->get_rtp_mcast_base().c_str()) + .to_ulong() + + source.id; + } info.stream.m_usSrcPort = config_->get_rtp_port(); info.stream.m_usDestPort = config_->get_rtp_port(); info.stream.m_ui32SSRC = rand() % 65536; // use random number std::copy(source.map.begin(), source.map.end(), info.stream.m_aui32Routing); - auto mcast_mac_addr = get_mcast_mac_addr(info.stream.m_ui32DestIP); - std::copy(std::begin(mcast_mac_addr), std::end(mcast_mac_addr), - info.stream.m_ui8DestMAC); + + if (IN_MULTICAST(info.stream.m_ui32DestIP)) { + auto mac_addr = get_mcast_mac_addr(info.stream.m_ui32DestIP); + std::copy(std::begin(mac_addr), std::end(mac_addr), + info.stream.m_ui8DestMAC); + info.stream.m_byTTL = source.ttl; + } else { + auto mac_addr = get_mac_from_arp_cache(config_->get_interface_name(), + ip::address_v4(info.stream.m_ui32DestIP).to_string()); + int retry = 3; + while (!mac_addr.second.length() && retry--) { + // if not in cache already try to populate the MAC cache + (void)echo_try_connect(ip::address_v4(info.stream.m_ui32DestIP).to_string()); + mac_addr = get_mac_from_arp_cache(config_->get_interface_name(), + ip::address_v4(info.stream.m_ui32DestIP).to_string()); + } + if (!mac_addr.second.length()) { + BOOST_LOG_TRIVIAL(error) + << "session_manager:: cannot retrieve MAC address for IP " + << config_->get_rtp_mcast_base(); + return DaemonErrc::cannot_retrieve_mac; + } + std::copy(std::begin(mac_addr.first), std::end(mac_addr.first), + info.stream.m_ui8DestMAC); + info.stream.m_byTTL = 64; + } info.refclk_ptp_traceable = source.refclk_ptp_traceable; info.enabled = source.enabled; @@ -567,16 +612,20 @@ std::string SessionManager::get_source_sdp_(uint32_t id, << ip::address_v4(info.stream.m_ui32SrcIP).to_string() << "\n" << "s=" << get_node_id(config_->get_ip_addr()) << " " << info.stream.m_cName << "\n" - << "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string() - << "/" << static_cast(info.stream.m_byTTL) << "\n" - << "t=0 0\n" + << "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string(); + if (IN_MULTICAST(info.stream.m_ui32DestIP)) { + ss << "/" << static_cast(info.stream.m_byTTL); + } + ss << "\nt=0 0\n" << "a=clock-domain:PTPv2 " << static_cast(ptp_config_.domain) << "\n" << "m=audio " << info.stream.m_usSrcPort << " RTP/AVP " << static_cast(info.stream.m_byPayloadType) << "\n" - << "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string() - << "/" << static_cast(info.stream.m_byTTL) << "\n" - << "a=rtpmap:" << static_cast(info.stream.m_byPayloadType) << " " + << "c=IN IP4 " << ip::address_v4(info.stream.m_ui32DestIP).to_string(); + if (IN_MULTICAST(info.stream.m_ui32DestIP)) { + ss << "/" << static_cast(info.stream.m_byTTL); + } + ss << "\na=rtpmap:" << static_cast(info.stream.m_byPayloadType) << " " << info.stream.m_cCodec << "/" << sample_rate << "/" << static_cast(info.stream.m_byNbOfChannels) << "\n" << "a=sync-time:0\n" @@ -645,14 +694,18 @@ uint8_t SessionManager::get_sink_id(const std::string& name) const { void SessionManager::on_add_sink(const StreamSink& sink, const StreamInfo& info) { - igmp_.join(config_->get_ip_addr_str(), - ip::address_v4(info.stream.m_ui32DestIP).to_string()); + if (IN_MULTICAST(info.stream.m_ui32DestIP)) { + igmp_.join(config_->get_ip_addr_str(), + ip::address_v4(info.stream.m_ui32DestIP).to_string()); + } sink_names_[sink.name] = sink.id; } void SessionManager::on_remove_sink(const StreamInfo& info) { - igmp_.leave(config_->get_ip_addr_str(), - ip::address_v4(info.stream.m_ui32DestIP).to_string()); + if (IN_MULTICAST(info.stream.m_ui32DestIP)) { + igmp_.leave(config_->get_ip_addr_str(), + ip::address_v4(info.stream.m_ui32DestIP).to_string()); + } sink_names_.erase(info.stream.m_cName); } diff --git a/daemon/session_manager.hpp b/daemon/session_manager.hpp index 5e219c8..7dedd34 100644 --- a/daemon/session_manager.hpp +++ b/daemon/session_manager.hpp @@ -38,6 +38,7 @@ struct StreamSource { std::string io; uint32_t max_samples_per_packet{0}; std::string codec; + std::string address; uint8_t ttl{0}; uint8_t payload_type{0}; uint8_t dscp{0}; diff --git a/daemon/tests/daemon_test.cpp b/daemon/tests/daemon_test.cpp index 9dd8c18..b13568a 100644 --- a/daemon/tests/daemon_test.cpp +++ b/daemon/tests/daemon_test.cpp @@ -144,6 +144,7 @@ struct Client { "map": [ 0, 1 ], "max_samples_per_packet": 48, "codec": "L16", + "address": "", "ttl": 15, "payload_type": 98, "dscp": 34, @@ -167,6 +168,7 @@ struct Client { "map": [ 0, 1 ], "max_samples_per_packet": 192, "codec": "L24", + "address": "", "ttl": 15, "payload_type": 98, "dscp": 34, diff --git a/webui/src/Services.js b/webui/src/Services.js index 9b81275..733801c 100644 --- a/webui/src/Services.js +++ b/webui/src/Services.js @@ -128,13 +128,14 @@ export default class RestAPI { }); } - static addSource(id, enabled, name, io, max_samples_per_packet, codec, ttl, payload_type, dscp, refclk_ptp_traceable, map, is_edit) { + static addSource(id, enabled, name, io, max_samples_per_packet, codec, address, ttl, payload_type, dscp, refclk_ptp_traceable, map, is_edit) { return this.doFetch(source + '/' + id, { body: JSON.stringify({ enabled: enabled, name: name, io: io, codec: codec, + address: address, map: map, max_samples_per_packet: parseInt(max_samples_per_packet, 10), ttl: parseInt(ttl, 10), diff --git a/webui/src/SourceEdit.js b/webui/src/SourceEdit.js index c1929ad..eb32cca 100644 --- a/webui/src/SourceEdit.js +++ b/webui/src/SourceEdit.js @@ -61,6 +61,8 @@ class SourceEdit extends Component { nameErr: false, io: this.props.source.io, codec: this.props.source.codec, + address: this.props.source.address, + addressErr: false, ttl: this.props.source.ttl, ttlErr: false, payloadType: this.props.source.payload_type, @@ -105,6 +107,7 @@ class SourceEdit extends Component { this.state.io, this.state.maxSamplesPerPacket, this.state.codec, + this.state.address ? this.state.address : "", this.state.ttl, this.state.payloadType, this.state.dscp, @@ -212,7 +215,8 @@ class SourceEdit extends Component { return !this.state.nameErr && !this.state.ttlErr && !this.state.channelsErr && - !this.state.payloadTypeErr; + !this.state.payloadTypeErr && + !this.state.addressErr; } render() { @@ -260,6 +264,10 @@ class SourceEdit extends Component { + + + this.setState({address: e.target.value, addressErr: !e.currentTarget.checkValidity()})} optional/> + this.setState({payloadType: e.target.value, payloadTypeErr: !e.currentTarget.checkValidity()})} required/>