diff --git a/daemon/main.cpp b/daemon/main.cpp index 2f4ca8f..e02dfb4 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -24,6 +24,7 @@ #include "config.hpp" #include "driver_manager.hpp" #include "http_server.hpp" +#include "mdns_server.hpp" #include "rtsp_server.hpp" #include "log.hpp" #include "session_manager.hpp" @@ -117,6 +118,12 @@ int main(int argc, char* argv[]) { std::string("SessionManager:: init failed")); } + /* start mDNS server */ + MDNSServer mdns_server(session_manager, config); + if (config->get_mdns_enabled() && !mdns_server.init()) { + throw std::runtime_error(std::string("MDNSServer:: init failed")); + } + /* start rtsp server */ RtspServer rtsp_server(session_manager, config); if (!rtsp_server.init()) { @@ -154,8 +161,6 @@ int main(int argc, char* argv[]) { break; } - rtsp_server.process(); - std::this_thread::sleep_for(std::chrono::seconds(1)); } @@ -180,6 +185,14 @@ int main(int argc, char* argv[]) { std::string("RtspServer:: terminate failed")); } + /* stop mDNS server */ + if (config->get_mdns_enabled()) { + if (!mdns_server.terminate()) { + throw std::runtime_error( + std::string("MDNServer:: terminate failed")); + } + } + /* stop session manager */ if (!session_manager->terminate()) { throw std::runtime_error( diff --git a/daemon/mdns_server.cpp b/daemon/mdns_server.cpp index 68c0fd6..aec2432 100644 --- a/daemon/mdns_server.cpp +++ b/daemon/mdns_server.cpp @@ -320,6 +320,16 @@ bool MDNSServer::init() { (void)avahi_threaded_poll_start(poll_.get()); #endif + + session_manager_->add_source_observer( + SessionManager::ObserverType::add_source, + std::bind(&MDNSServer::add_service, this, + std::placeholders::_2, std::placeholders::_3)); + + session_manager_->add_source_observer( + SessionManager::ObserverType::remove_source, + std::bind(&MDNSServer::remove_service, this, std::placeholders::_2)); + running_ = true; return true; } diff --git a/daemon/mdns_server.hpp b/daemon/mdns_server.hpp index 1c8139e..a056c60 100644 --- a/daemon/mdns_server.hpp +++ b/daemon/mdns_server.hpp @@ -33,12 +33,17 @@ #include #include +#include "session_manager.hpp" #include "config.hpp" #include "utils.hpp" class MDNSServer { public: - MDNSServer(std::shared_ptr config) : config_(config){}; + MDNSServer(std::shared_ptr session_manager, + std::shared_ptr config) + : session_manager_(session_manager), + config_(config){} + MDNSServer() = delete; MDNSServer(const MDNSServer&) = delete; MDNSServer& operator=(const MDNSServer&) = delete; @@ -52,6 +57,7 @@ class MDNSServer { protected: std::atomic_bool running_{false}; + std::shared_ptr session_manager_; std::shared_ptr config_; std::string node_id_{get_node_id()}; diff --git a/daemon/rtsp_server.cpp b/daemon/rtsp_server.cpp index fdb2dfa..a821f17 100644 --- a/daemon/rtsp_server.cpp +++ b/daemon/rtsp_server.cpp @@ -20,18 +20,22 @@ using boost::asio::ip::tcp; -void RtspServer::process() { - /* cleanup of expired sessions */ + +bool RtspServer::add_source(uint8_t id, + const std::string& name, + const std::string& sdp) { + BOOST_LOG_TRIVIAL(info) << "rtsp_server:: added source " << name; + bool ret = false; std::lock_guard lock(mutex_); for (unsigned int i = 0; i < sessions_.size(); i++) { - if (duration_cast(steady_clock::now() - sessions_start_point_[i]) - .count() > RtspSession::session_tout_secs) { auto session = sessions_[i].lock(); if (session != nullptr) { - session->stop(); + ret |= session->announce(id, name, sdp, + config_->get_ip_addr_str(), + config_->get_rtsp_port()); } - } } + return ret; } void RtspServer::accept() { @@ -63,6 +67,37 @@ void RtspServer::accept() { }); } +bool RtspSession::announce(uint8_t id, + const std::string& name, + const std::string& sdp, + const std::string& address, + uint16_t port) { + /* if a describe request is currently not beeing process + * and the specified source id has been described on this session send update */ + if (cseq_ < 0 && source_ids_.find(id) != source_ids_.end()) { + std::string path(std::string("/by-name/") + get_node_id() + " " + name); + std::stringstream ss; + ss << "ANNOUNCE rtsp://" << address << ":" << std::to_string(port) + << httplib::detail::encode_url(path) << " RTSP/1.0\r\n" + << "User-Agent: aes67-daemon\r\n" + << "connection: Keep-Alive" << "\r\n" + << "CSeq: " << announce_cseq_++ << "\r\n" + << "Content-Length: " << sdp.length() << "\r\n" + << "Content-Type: application/sdp\r\n" + << "\r\n" + << sdp; + + BOOST_LOG_TRIVIAL(info) + << "rtsp_server:: " << "ANNOUNCE sent to " + << socket_.remote_endpoint(); + + send_response(ss.str()); + return true; + } + return false; +} + + bool RtspSession::process_request() { /* DESCRIBE rtsp://127.0.0.1:8080/by-name/test RTSP/1.0 @@ -164,6 +199,7 @@ void RtspSession::build_response(const std::string& url) { << "rtsp_server:: " << request_ << " response 200 to " << socket_.remote_endpoint(); send_response(ss.str()); + source_ids_.insert(id); return; } } diff --git a/daemon/rtsp_server.hpp b/daemon/rtsp_server.hpp index 07c5b0f..0794e43 100644 --- a/daemon/rtsp_server.hpp +++ b/daemon/rtsp_server.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "session_manager.hpp" @@ -52,13 +53,18 @@ class RtspSession : public std::enable_shared_from_this { void start(); void stop(); + bool announce(uint8_t source_id, + const std::string& name, + const std::string& sdp, + const std::string& address, + uint16_t port); + private: bool process_request(); void build_response(const std::string& url); void read_request(); void send_error(int status_code, const std::string& description); void send_response(const std::string& response); - std::shared_ptr session_manager_; tcp::socket socket_; char data_[max_length + 1]; @@ -66,6 +72,9 @@ class RtspSession : public std::enable_shared_from_this { size_t length_{0}; int32_t cseq_{-1}; size_t consumed_{0}; + int32_t announce_cseq_{0}; + /* set with the ids described on this session */ + std::unordered_set source_ids_; }; class RtspServer { @@ -87,6 +96,14 @@ class RtspServer { accept(); /* start rtsp server on a separate thread */ res_ = std::async([this](){ io_service_.run(); }); + + session_manager_->add_source_observer( + SessionManager::ObserverType::add_source, + std::bind(&RtspServer::add_source, this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3)); + return true; } @@ -97,9 +114,9 @@ class RtspServer { return true; } - void process(); - private: + /* a new source has been added or updated */ + bool add_source(uint8_t id, const std::string& name, const std::string& sdp); void accept(); std::mutex mutex_; diff --git a/daemon/session_manager.cpp b/daemon/session_manager.cpp index 31ae8d6..56986ea 100644 --- a/daemon/session_manager.cpp +++ b/daemon/session_manager.cpp @@ -420,18 +420,33 @@ uint8_t SessionManager::get_source_id(const std::string& name) const { return it != source_names_.end() ? it->second : (stream_id_max + 1); } +void SessionManager::add_source_observer(ObserverType type, Observer cb) { + switch(type) { + case ObserverType::add_source: + add_source_observers.push_back(cb); + break; + case ObserverType::remove_source: + remove_source_observers.push_back(cb); + break; + } +} + void SessionManager::on_add_source(const StreamSource& source, const StreamInfo& info) { + for (auto cb : add_source_observers) { + cb(source.id, source.name, get_source_sdp_(source.id, info)); + } igmp_.join(config_->get_ip_addr_str(), ip::address_v4(info.stream.m_ui32DestIP).to_string()); - mdns_.add_service(source.name, get_source_sdp_(source.id, info)); source_names_[source.name] = source.id; } void SessionManager::on_remove_source(const StreamInfo& info) { + for (auto cb : remove_source_observers) { + cb(info.stream.m_uiId, info.stream.m_cName, {}); + } igmp_.leave(config_->get_ip_addr_str(), ip::address_v4(info.stream.m_ui32DestIP).to_string()); - mdns_.remove_service(info.stream.m_cName); source_names_.erase(info.stream.m_cName); } diff --git a/daemon/session_manager.hpp b/daemon/session_manager.hpp index 020d905..11c88c2 100644 --- a/daemon/session_manager.hpp +++ b/daemon/session_manager.hpp @@ -30,7 +30,6 @@ #include "driver_manager.hpp" #include "igmp.hpp" #include "sap.hpp" -#include "mdns_server.hpp" struct StreamSource { uint8_t id{0}; @@ -107,10 +106,6 @@ class SessionManager { // session manager interface bool init() { if (!running_) { - /* init mDNS server */ - if (config_->get_mdns_enabled() && !mdns_.init()) { - return false; - } running_ = true; res_ = std::async(std::launch::async, &SessionManager::worker, this); } @@ -127,9 +122,6 @@ class SessionManager { for (auto sink : get_sinks()) { remove_sink(sink.id); } - if (config_->get_mdns_enabled()) { - mdns_.terminate(); - } return ret; } return true; @@ -142,6 +134,11 @@ class SessionManager { std::error_code remove_source(uint32_t id); uint8_t get_source_id(const std::string& name) const; + enum class ObserverType { add_source, remove_source }; + using Observer = std::function; + void add_source_observer(ObserverType type, Observer cb); + std::error_code add_sink(const StreamSink& sink); std::error_code get_sink(uint8_t id, StreamSink& sink) const; std::list get_sinks() const; @@ -210,7 +207,9 @@ class SessionManager { PTPStatus ptp_status_; mutable std::shared_mutex ptp_mutex_; - MDNSServer mdns_{config_}; + std::list add_source_observers; + std::list remove_source_observers; + SAP sap_{config_->get_sap_mcast_addr()}; IGMP igmp_; };