diff --git a/daemon/rtsp_client.cpp b/daemon/rtsp_client.cpp index 9777563..d08b302 100644 --- a/daemon/rtsp_client.cpp +++ b/daemon/rtsp_client.cpp @@ -87,6 +87,39 @@ RtspResponse read_response(tcp::iostream& s, uint16_t max_length) { return res; } +struct RtspActiveClientRemover { + RtspActiveClientRemover() = delete; + RtspActiveClientRemover(ip::tcp::iostream* s, + const std::string& name, + const std::string& domain, + bool wait_for_updates) + : stream_(s), + name_(name), + domain_(domain), + wait_for_updates_(wait_for_updates) { + if (stream_ != nullptr && wait_for_updates_) { + RtspClient::g_mutex.lock(); + RtspClient::g_active_clients[{name_, domain_}] = stream_; + RtspClient::g_mutex.unlock(); + } + } + ~RtspActiveClientRemover() { + if (stream_ != nullptr && wait_for_updates_) { + std::lock_guard lock(RtspClient::g_mutex); + auto it = RtspClient::g_active_clients.find({name_, domain_}); + if (it != RtspClient::g_active_clients.end() && it->second == stream_) { + RtspClient::g_active_clients.erase(it); + } + } + } + + private: + ip::tcp::iostream* stream_{nullptr}; + const std::string& name_; + const std::string& domain_; + bool wait_for_updates_{false}; +}; + std::pair RtspClient::process(RtspClient::Observer callback, const std::string& name, const std::string& domain, @@ -96,11 +129,12 @@ std::pair RtspClient::process(RtspClient::Observer callback, bool wait_for_updates) { RtspSource rtsp_source; ip::tcp::iostream s; + RtspActiveClientRemover clientRemover(&s, name, domain, wait_for_updates); try { BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: connecting to " << "rtsp://" << address << ":" << port << path; s.connect(address, port.length() ? port : dft_port); - if (!s) { + if (!s || s.error()) { BOOST_LOG_TRIVIAL(warning) << "rtsp_client:: unable to connect to " << address << ":" << port; return {false, rtsp_source}; @@ -125,7 +159,7 @@ std::pair RtspClient::process(RtspClient::Observer callback, std::string request; std::getline(s, request); - if (!s || rtsp_version.substr(0, 5) != "RTSP/") { + if (!s || s.error() || rtsp_version.substr(0, 5) != "RTSP/") { BOOST_LOG_TRIVIAL(error) << "rtsp_client:: invalid response from " << "rtsp://" << address << ":" << port << path; return {false, rtsp_source}; @@ -169,6 +203,17 @@ std::pair RtspClient::process(RtspClient::Observer callback, rtsp_source.source = "mDNS"; rtsp_source.address = address; rtsp_source.sdp = std::move(res.body); + + if (is_announce) { + s << "RTSP/1.0 200 OK\r\n"; + s << "CSeq: " << res.cseq << "\r\n"; + s << "\r\n"; + } else if (!is_describe) { + s << "RTSP/1.0 405 Method Not Allowed\r\n"; + s << "CSeq: " << res.cseq << "\r\n"; + s << "\r\n"; + } + BOOST_LOG_TRIVIAL(info) << "rtsp_client:: completed " << "rtsp://" << address << ":" << port << path; @@ -180,28 +225,14 @@ std::pair RtspClient::process(RtspClient::Observer callback, callback(announced_name.empty() ? name : announced_name, domain, rtsp_source); } - - if (is_announce) { - s << "RTSP/1.0 200 OK\r\n"; - s << "CSeq: " << res.cseq << "\r\n"; - s << "\r\n"; - } else if (!is_describe) { - s << "RTSP/1.0 405 Method Not Allowed\r\n"; - s << "CSeq: " << res.cseq << "\r\n"; - s << "\r\n"; - } } if (wait_for_updates) { - g_mutex.lock(); - g_active_clients[{name, domain}] = &s; - g_mutex.unlock(); - /* we start waiting for updates */ do { std::getline(s, request); - } while (request.empty() && !s.error()); - if (s.error()) { + } while (request.empty() && !s.error() && is_active(name, domain)); + if (s.error() || !is_active(name, domain)) { BOOST_LOG_TRIVIAL(info) << "rtsp_client:: end: " << s.error().message(); break; @@ -227,21 +258,13 @@ std::pair RtspClient::process(RtspClient::Observer callback, is_announce = true; } } - } while (wait_for_updates); + } while (wait_for_updates && is_active(name, domain)); } catch (std::exception& e) { BOOST_LOG_TRIVIAL(warning) << "rtsp_client:: error with " << "rtsp://" << address << ":" << port << path << ": " << e.what(); } - if (wait_for_updates) { - std::lock_guard lock(g_mutex); - auto it = g_active_clients.find({name, domain}); - if (it != g_active_clients.end() && it->second == &s) { - g_active_clients.erase(it); - } - } - return {true, rtsp_source}; } @@ -260,6 +283,11 @@ void RtspClient::stop(const std::string& name, const std::string& domain) { } } +bool RtspClient::is_active(const std::string& name, const std::string& domain) { + std::lock_guard lock(g_mutex); + return g_active_clients.find({name, domain}) != g_active_clients.end(); +} + void RtspClient::stop_all() { std::lock_guard lock(g_mutex); auto it = g_active_clients.begin(); diff --git a/daemon/rtsp_client.hpp b/daemon/rtsp_client.hpp index ff36d67..66dd4a1 100644 --- a/daemon/rtsp_client.hpp +++ b/daemon/rtsp_client.hpp @@ -48,9 +48,9 @@ class RtspClient { const std::string& port = dft_port, bool wait_for_updates = true); + static bool is_active(const std::string& name, const std::string& domain); static void stop(const std::string& name, const std::string& domain); static void stop_all(); - static std::pair describe( const std::string& path, const std::string& address, diff --git a/daemon/tests/daemon_test.cpp b/daemon/tests/daemon_test.cpp index accb133..9dd8c18 100644 --- a/daemon/tests/daemon_test.cpp +++ b/daemon/tests/daemon_test.cpp @@ -854,10 +854,6 @@ BOOST_AUTO_TEST_CASE(add_remove_check_mdns_browser_update_all) { BOOST_REQUIRE_MESSAGE(cli.add_source(id), std::string("added source ") + std::to_string(id)); } - for (int id = 0; id < g_stream_num_max; id++) { - BOOST_REQUIRE_MESSAGE(cli.update_source(id), - std::string("updated source ") + std::to_string(id)); - } std::vector sdps{g_stream_num_max}; for (int id = 0; id < g_stream_num_max; id++) { auto sdp = cli.get_source_sdp(id);