set of small enhancements and fixes

This commit is contained in:
Andrea Bondavalli 2020-02-07 10:10:15 -08:00
parent fa7b64038e
commit a343123d81
4 changed files with 34 additions and 42 deletions

View File

@ -38,11 +38,12 @@ class IGMP {
};
bool join(const std::string& interface_ip, const std::string& mcast_ip) {
uint32_t mcast_ip_addr = ip::address_v4::from_string(mcast_ip.c_str()).to_ulong();
std::lock_guard<std::mutex> lock(mutex);
auto it = mcast_ref.find(mcast_ip);
auto it = mcast_ref.find(mcast_ip_addr);
if (it != mcast_ref.end() && (*it).second > 0) {
mcast_ref[mcast_ip]++;
mcast_ref[mcast_ip_addr]++;
return true;
}
@ -66,19 +67,20 @@ class IGMP {
BOOST_LOG_TRIVIAL(info) << "igmp:: joined multicast group "
<< mcast_ip << " on " << interface_ip;
mcast_ref[mcast_ip] = 1;
mcast_ref[mcast_ip_addr] = 1;
return true;
}
bool leave(const std::string& interface_ip, const std::string& mcast_ip) {
uint32_t mcast_ip_addr = ip::address_v4::from_string(mcast_ip.c_str()).to_ulong();
std::lock_guard<std::mutex> lock(mutex);
auto it = mcast_ref.find(mcast_ip);
auto it = mcast_ref.find(mcast_ip_addr);
if (it == mcast_ref.end() || (*it).second == 0) {
return false;
}
if (--mcast_ref[mcast_ip] > 0) {
if (--mcast_ref[mcast_ip_addr] > 0) {
return true;
}
@ -102,7 +104,7 @@ class IGMP {
io_service io_service_;
ip::udp::socket socket_{io_service_};
udp::endpoint listen_endpoint_{udp::endpoint(address_v4::any(), 0)};
std::map<std::string, int> mcast_ref;
std::unordered_map<uint32_t, int> mcast_ref;
std::mutex mutex;
};

View File

@ -536,7 +536,8 @@ std::string SessionManager::get_source_sdp_(uint32_t id,
<< static_cast<double>(info.stream.m_ui32MaxSamplesPerPacket) * 1000 /
static_cast<double>(sample_rate);
std::string ptime = ss_ptime.str();
ptime.erase(ptime.find_last_not_of('0') + 1, std::string::npos); // remove trailing zeros
// remove trailing zeros or dot
ptime.erase(ptime.find_last_not_of("0.") + 1, std::string::npos);
// build SDP
std::stringstream ss;
@ -855,11 +856,10 @@ size_t SessionManager::process_sap() {
// compute source hash
uint32_t msg_id_hash = (static_cast<uint32_t>(id) << 16) + msg_crc;
// add/update this source in the announced sources
announced_sources_[msg_id_hash] =
std::make_pair(id, info.stream.m_ui32RTCPSrcIP);
// add this source to the active sources
announced_sources_[msg_id_hash] = info.stream.m_ui32RTCPSrcIP;
// add this source to the currently active sources
active_sources.insert(msg_id_hash);
// remove this source from deleted sources
// remove this source from deleted sources (if present)
deleted_sources_count_.erase(msg_id_hash);
// send announcement for this source
sap_.announcement(msg_crc, info.stream.m_ui32RTCPSrcIP, sdp);
@ -869,15 +869,12 @@ size_t SessionManager::process_sap() {
}
// check for sources that are no longer announced and send deletion/s
for (auto const& [msg_id_hash, pair] : announced_sources_) {
const auto &id = pair.first;
const auto &src_addr = pair.second;
for (auto const& [msg_id_hash, src_addr] : announced_sources_) {
// check if this source is no longer announced
if (active_sources.find(msg_id_hash) ==
active_sources.end()) {
// retrieve deleted source SDP
std::string sdp = get_removed_source_sdp_(id, src_addr);
std::string sdp = get_removed_source_sdp_(msg_id_hash >> 16, src_addr);
// send deletion for this source
sap_.deletion(static_cast<uint16_t>(msg_id_hash), src_addr, sdp);
// update amount of byte sent
@ -943,10 +940,8 @@ bool SessionManager::worker() {
(reinterpret_cast<uint8_t*>(&ptp_status.ui64GMID)[5]),
(reinterpret_cast<uint8_t*>(&ptp_status.ui64GMID)[6]),
(reinterpret_cast<uint8_t*>(&ptp_status.ui64GMID)[7]));
// update PTP clock status
std::unique_lock ptp_lock(ptp_mutex_);
// update status
ptp_status_.gmid = ptp_clock_id;
ptp_status_.jitter = ptp_status.i32Jitter;
@ -1001,15 +996,11 @@ bool SessionManager::worker() {
}
// at end, send deletion for all announced sources
for (auto const& [msg_id_hash, pair] : announced_sources_) {
const auto &id = pair.first;
const auto &addr = pair.second;
for (auto const& [msg_id_hash, src_addr] : announced_sources_) {
// retrieve deleted source SDP
std::string sdp = get_removed_source_sdp_(id, addr);
std::string sdp = get_removed_source_sdp_(msg_id_hash >> 16, src_addr);
// send deletion for this source
sap_.deletion(static_cast<uint16_t>(msg_id_hash), addr, sdp);
sap_.deletion(static_cast<uint16_t>(msg_id_hash), src_addr, sdp);
}
// leave PTP multicast addresses

View File

@ -179,12 +179,11 @@ class SessionManager {
mutable std::shared_mutex sinks_mutex_;
/* current announced sources */
std::map<uint32_t /* msg_id_hash */,
std::pair<uint8_t /* id */, uint32_t /* src_addr */> >
std::map<uint32_t /* msg_id_hash */, uint32_t /* src_addr */>
announced_sources_;
/* number of deletions sent for a a deleted source */
std::map<uint32_t /* msg_id_hash */, int /* count */>
std::unordered_map<uint32_t /* msg_id_hash */, int /* count */>
deleted_sources_count_;
PTPConfig ptp_config_;

View File

@ -39,8 +39,8 @@ constexpr static const char g_daemon_address[] = "127.0.0.1";
constexpr static uint16_t g_daemon_port = 9999;
constexpr static const char g_sap_address[] = "224.2.127.254";
constexpr static uint16_t g_sap_port = 9875;
constexpr static uint16_t udp_size = 1024;
constexpr static uint16_t sap_header = 24;
constexpr static uint16_t g_udp_size = 1024;
constexpr static uint16_t g_sap_header_len = 24;
using namespace boost::process;
using namespace boost::asio::ip;
@ -232,17 +232,17 @@ struct Client {
}
bool sap_wait_announcement(int id, const std::string& sdp, int count = 1) {
char data[udp_size];
char data[g_udp_size];
while (count-- > 0) {
BOOST_TEST_MESSAGE("waiting announcement for source " +
std::to_string(id));
std::string sap_sdp;
do {
auto len = socket_.receive(boost::asio::buffer(data, udp_size));
if (len <= sap_header) {
auto len = socket_.receive(boost::asio::buffer(data, g_udp_size));
if (len <= g_sap_header_len) {
continue;
}
sap_sdp.assign(data + sap_header, data + len);
sap_sdp.assign(data + g_sap_header_len, data + len);
} while(data[0] != 0x20 || sap_sdp != sdp);
BOOST_CHECK_MESSAGE(true, "SAP announcement SDP and source SDP match");
}
@ -250,14 +250,14 @@ struct Client {
}
void sap_wait_all_deletions() {
char data[udp_size];
char data[g_udp_size];
std::set<uint8_t> ids;
while (ids.size() < 64) {
auto len = socket_.receive(boost::asio::buffer(data, udp_size));
if (len <= sap_header) {
auto len = socket_.receive(boost::asio::buffer(data, g_udp_size));
if (len <= g_sap_header_len) {
continue;
}
std::string sap_sdp_(data + sap_header, data + len);
std::string sap_sdp_(data + g_sap_header_len, data + len);
if (data[0] == 0x24 && sap_sdp_.length() > 3) {
//o=- 56 0 IN IP4 127.0.0.1
ids.insert(std::atoi(sap_sdp_.c_str() + 3));
@ -268,16 +268,16 @@ struct Client {
}
bool sap_wait_deletion(int id, const std::string& sdp, int count = 1) {
char data[udp_size];
char data[g_udp_size];
while (count-- > 0) {
BOOST_TEST_MESSAGE("waiting deletion for source " + std::to_string(id));
std::string sap_sdp;
do {
auto len = socket_.receive(boost::asio::buffer(data, udp_size));
if (len <= sap_header) {
auto len = socket_.receive(boost::asio::buffer(data, g_udp_size));
if (len <= g_sap_header_len) {
continue;
}
sap_sdp.assign(data + sap_header, data + len);
sap_sdp.assign(data + g_sap_header_len, data + len);
} while(data[0] != 0x24 || sdp.find(sap_sdp) == std::string::npos);
BOOST_CHECK_MESSAGE(true, "SAP deletion SDP matches");
}