Changes to SessionManager class to handle notifications for source updates triggered when the PTP GMID clock changes or the driver sample rate changes.

SessionManager allows the registration of source update observers and delivers via callbacks the above notifications.
RTSP server modified to register to source update notifications and to send updates to clients via ANNOUNCE method.
This commit is contained in:
Andrea Bondavalli 2020-06-04 09:14:17 -07:00
parent 3a60c0492f
commit 371090e7b9
4 changed files with 65 additions and 14 deletions

View File

@ -21,7 +21,7 @@
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
bool RtspServer::add_source(uint8_t id, bool RtspServer::update_source(uint8_t id,
const std::string& name, const std::string& name,
const std::string& sdp) { const std::string& sdp) {
bool ret = false; bool ret = false;
@ -88,7 +88,7 @@ bool RtspSession::announce(uint8_t id,
<< sdp; << sdp;
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< "rtsp_server:: " << "ANNOUNCE for source " << id << " sent to " << "rtsp_server:: " << "ANNOUNCE for source " << name << " sent to "
<< socket_.remote_endpoint(); << socket_.remote_endpoint();
send_response(ss.str()); send_response(ss.str());

View File

@ -101,7 +101,14 @@ class RtspServer {
session_manager_->add_source_observer( session_manager_->add_source_observer(
SessionManager::ObserverType::add_source, SessionManager::ObserverType::add_source,
std::bind(&RtspServer::add_source, this, std::bind(&RtspServer::update_source, this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
session_manager_->add_source_observer(
SessionManager::ObserverType::update_source,
std::bind(&RtspServer::update_source, this,
std::placeholders::_1, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_2,
std::placeholders::_3)); std::placeholders::_3));
@ -117,8 +124,8 @@ class RtspServer {
} }
private: private:
/* a new source has been added or updated */ /* a source was updated */
bool add_source(uint8_t id, const std::string& name, const std::string& sdp); bool update_source(uint8_t id, const std::string& name, const std::string& sdp);
void accept(); void accept();
std::mutex mutex_; std::mutex mutex_;

View File

@ -428,6 +428,9 @@ void SessionManager::add_source_observer(ObserverType type, Observer cb) {
case ObserverType::remove_source: case ObserverType::remove_source:
remove_source_observers.push_back(cb); remove_source_observers.push_back(cb);
break; break;
case ObserverType::update_source:
update_source_observers.push_back(cb);
break;
} }
} }
@ -916,6 +919,20 @@ size_t SessionManager::process_sap() {
return sdp_len_sum; return sdp_len_sum;
} }
void SessionManager::on_update_sources() {
// trigger sources SDP file update
std::shared_lock sources_lock(sources_mutex_);
for (auto const& [id, info]: sources_) {
for (auto cb : update_source_observers) {
cb(id, info.stream.m_cName, get_source_sdp_(id, info));
}
}
}
void SessionManager::on_ptp_status_locked() {
// set sample rate, this may require seconds
(void)driver_->set_sample_rate(driver_->get_current_sample_rate());
}
using namespace std::chrono; using namespace std::chrono;
using second_t = duration<double, std::ratio<1> >; using second_t = duration<double, std::ratio<1> >;
@ -927,6 +944,7 @@ bool SessionManager::worker() {
auto ptp_timepoint = steady_clock::now(); auto ptp_timepoint = steady_clock::now();
int sap_interval = 1; int sap_interval = 1;
int ptp_interval = 0; int ptp_interval = 0;
uint32_t sample_rate = driver_->get_current_sample_rate();
sap_.set_multicast_interface(config_->get_ip_addr_str()); sap_.set_multicast_interface(config_->get_ip_addr_str());
@ -950,10 +968,16 @@ bool SessionManager::worker() {
"%02X-%02X-%02X-%02X-%02X-%02X-%02X-%02X", "%02X-%02X-%02X-%02X-%02X-%02X-%02X-%02X",
pui64GMID[0], pui64GMID[1], pui64GMID[2], pui64GMID[3], pui64GMID[0], pui64GMID[1], pui64GMID[2], pui64GMID[3],
pui64GMID[4], pui64GMID[5], pui64GMID[6], pui64GMID[7]); pui64GMID[4], pui64GMID[5], pui64GMID[6], pui64GMID[7]);
bool ptp_changed_gmid = false;
bool ptp_changed_to_locked = false;
// update PTP clock status // update PTP clock status
std::unique_lock ptp_lock(ptp_mutex_); ptp_mutex_.lock();
// update status // update status
if (ptp_status_.gmid != ptp_clock_id) {
ptp_status_.gmid = ptp_clock_id; ptp_status_.gmid = ptp_clock_id;
ptp_changed_gmid = true;
}
ptp_status_.jitter = ptp_status.i32Jitter; ptp_status_.jitter = ptp_status.i32Jitter;
std::string new_ptp_status; std::string new_ptp_status;
switch (ptp_status.nPTPLockStatus) { switch (ptp_status.nPTPLockStatus) {
@ -967,15 +991,30 @@ bool SessionManager::worker() {
new_ptp_status = "locked"; new_ptp_status = "locked";
break; break;
} }
if (ptp_status_.status != new_ptp_status) { if (ptp_status_.status != new_ptp_status) {
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< "session_manager:: new PTP clock status " << new_ptp_status; << "session_manager:: new PTP clock status " << new_ptp_status;
ptp_status_.status = new_ptp_status; ptp_status_.status = new_ptp_status;
if (new_ptp_status == "locked") { if (new_ptp_status == "locked") {
// set sample rate, this may require seconds ptp_changed_to_locked = true;
(void)driver_->set_sample_rate(driver_->get_current_sample_rate());
} }
} }
// end update PTP clock status
ptp_mutex_.unlock();
if (ptp_changed_to_locked) {
on_ptp_status_locked();
}
if (ptp_changed_gmid ||
sample_rate != driver_->get_current_sample_rate()) {
/* master clock id changed or sample rate changed
* we need to update all the sources */
sample_rate = driver_->get_current_sample_rate();
on_update_sources();
}
} }
ptp_interval = 10; ptp_interval = 10;
} }

View File

@ -134,7 +134,7 @@ class SessionManager {
std::error_code remove_source(uint32_t id); std::error_code remove_source(uint32_t id);
uint8_t get_source_id(const std::string& name) const; uint8_t get_source_id(const std::string& name) const;
enum class ObserverType { add_source, remove_source }; enum class ObserverType{ add_source, remove_source, update_source };
using Observer = std::function<bool(uint8_t id, const std::string& name, using Observer = std::function<bool(uint8_t id, const std::string& name,
const std::string& sdp)>; const std::string& sdp)>;
void add_source_observer(ObserverType type, Observer cb); void add_source_observer(ObserverType type, Observer cb);
@ -165,6 +165,10 @@ class SessionManager {
void on_add_sink(const StreamSink& sink, const StreamInfo& info); void on_add_sink(const StreamSink& sink, const StreamInfo& info);
void on_remove_sink(const StreamInfo& info); void on_remove_sink(const StreamInfo& info);
void on_ptp_status_locked();
void on_update_sources();
std::string get_removed_source_sdp_(uint32_t id, uint32_t src_addr) const; std::string get_removed_source_sdp_(uint32_t id, uint32_t src_addr) const;
std::string get_source_sdp_(uint32_t id, const StreamInfo& info) const; std::string get_source_sdp_(uint32_t id, const StreamInfo& info) const;
StreamSource get_source_(uint8_t id, const StreamInfo& info) const; StreamSource get_source_(uint8_t id, const StreamInfo& info) const;
@ -209,6 +213,7 @@ class SessionManager {
std::list<Observer> add_source_observers; std::list<Observer> add_source_observers;
std::list<Observer> remove_source_observers; std::list<Observer> remove_source_observers;
std::list<Observer> update_source_observers;
SAP sap_{config_->get_sap_mcast_addr()}; SAP sap_{config_->get_sap_mcast_addr()};
IGMP igmp_; IGMP igmp_;