Fix to RtspClient::process() method to avoid ending up into an infinite loop in certain cases when the client gets stopped via RtspClient::stop() or RtspClient::stop_all() methods.

The bug was also preventing the daemon from terminating. It was possible to reproduce the problem using the add_remove_check_mdns_browser_update_all regression test.
This commit is contained in:
Andrea Bondavalli 2020-07-29 21:21:58 +02:00
parent fe08f3c3bd
commit 7d6a8289d5
3 changed files with 56 additions and 32 deletions

View File

@ -87,6 +87,39 @@ RtspResponse read_response(tcp::iostream& s, uint16_t max_length) {
return res;
}
struct RtspActiveClientRemover {
RtspActiveClientRemover() = delete;
RtspActiveClientRemover(ip::tcp::iostream* s,
const std::string& name,
const std::string& domain,
bool wait_for_updates)
: stream_(s),
name_(name),
domain_(domain),
wait_for_updates_(wait_for_updates) {
if (stream_ != nullptr && wait_for_updates_) {
RtspClient::g_mutex.lock();
RtspClient::g_active_clients[{name_, domain_}] = stream_;
RtspClient::g_mutex.unlock();
}
}
~RtspActiveClientRemover() {
if (stream_ != nullptr && wait_for_updates_) {
std::lock_guard<std::mutex> lock(RtspClient::g_mutex);
auto it = RtspClient::g_active_clients.find({name_, domain_});
if (it != RtspClient::g_active_clients.end() && it->second == stream_) {
RtspClient::g_active_clients.erase(it);
}
}
}
private:
ip::tcp::iostream* stream_{nullptr};
const std::string& name_;
const std::string& domain_;
bool wait_for_updates_{false};
};
std::pair<bool, RtspSource> RtspClient::process(RtspClient::Observer callback,
const std::string& name,
const std::string& domain,
@ -96,11 +129,12 @@ std::pair<bool, RtspSource> RtspClient::process(RtspClient::Observer callback,
bool wait_for_updates) {
RtspSource rtsp_source;
ip::tcp::iostream s;
RtspActiveClientRemover clientRemover(&s, name, domain, wait_for_updates);
try {
BOOST_LOG_TRIVIAL(debug) << "rtsp_client:: connecting to "
<< "rtsp://" << address << ":" << port << path;
s.connect(address, port.length() ? port : dft_port);
if (!s) {
if (!s || s.error()) {
BOOST_LOG_TRIVIAL(warning)
<< "rtsp_client:: unable to connect to " << address << ":" << port;
return {false, rtsp_source};
@ -125,7 +159,7 @@ std::pair<bool, RtspSource> RtspClient::process(RtspClient::Observer callback,
std::string request;
std::getline(s, request);
if (!s || rtsp_version.substr(0, 5) != "RTSP/") {
if (!s || s.error() || rtsp_version.substr(0, 5) != "RTSP/") {
BOOST_LOG_TRIVIAL(error) << "rtsp_client:: invalid response from "
<< "rtsp://" << address << ":" << port << path;
return {false, rtsp_source};
@ -169,6 +203,17 @@ std::pair<bool, RtspSource> RtspClient::process(RtspClient::Observer callback,
rtsp_source.source = "mDNS";
rtsp_source.address = address;
rtsp_source.sdp = std::move(res.body);
if (is_announce) {
s << "RTSP/1.0 200 OK\r\n";
s << "CSeq: " << res.cseq << "\r\n";
s << "\r\n";
} else if (!is_describe) {
s << "RTSP/1.0 405 Method Not Allowed\r\n";
s << "CSeq: " << res.cseq << "\r\n";
s << "\r\n";
}
BOOST_LOG_TRIVIAL(info) << "rtsp_client:: completed "
<< "rtsp://" << address << ":" << port << path;
@ -180,28 +225,14 @@ std::pair<bool, RtspSource> RtspClient::process(RtspClient::Observer callback,
callback(announced_name.empty() ? name : announced_name, domain,
rtsp_source);
}
if (is_announce) {
s << "RTSP/1.0 200 OK\r\n";
s << "CSeq: " << res.cseq << "\r\n";
s << "\r\n";
} else if (!is_describe) {
s << "RTSP/1.0 405 Method Not Allowed\r\n";
s << "CSeq: " << res.cseq << "\r\n";
s << "\r\n";
}
}
if (wait_for_updates) {
g_mutex.lock();
g_active_clients[{name, domain}] = &s;
g_mutex.unlock();
/* we start waiting for updates */
do {
std::getline(s, request);
} while (request.empty() && !s.error());
if (s.error()) {
} while (request.empty() && !s.error() && is_active(name, domain));
if (s.error() || !is_active(name, domain)) {
BOOST_LOG_TRIVIAL(info)
<< "rtsp_client:: end: " << s.error().message();
break;
@ -227,21 +258,13 @@ std::pair<bool, RtspSource> RtspClient::process(RtspClient::Observer callback,
is_announce = true;
}
}
} while (wait_for_updates);
} while (wait_for_updates && is_active(name, domain));
} catch (std::exception& e) {
BOOST_LOG_TRIVIAL(warning)
<< "rtsp_client:: error with "
<< "rtsp://" << address << ":" << port << path << ": " << e.what();
}
if (wait_for_updates) {
std::lock_guard<std::mutex> lock(g_mutex);
auto it = g_active_clients.find({name, domain});
if (it != g_active_clients.end() && it->second == &s) {
g_active_clients.erase(it);
}
}
return {true, rtsp_source};
}
@ -260,6 +283,11 @@ void RtspClient::stop(const std::string& name, const std::string& domain) {
}
}
bool RtspClient::is_active(const std::string& name, const std::string& domain) {
std::lock_guard<std::mutex> lock(g_mutex);
return g_active_clients.find({name, domain}) != g_active_clients.end();
}
void RtspClient::stop_all() {
std::lock_guard<std::mutex> lock(g_mutex);
auto it = g_active_clients.begin();

View File

@ -48,9 +48,9 @@ class RtspClient {
const std::string& port = dft_port,
bool wait_for_updates = true);
static bool is_active(const std::string& name, const std::string& domain);
static void stop(const std::string& name, const std::string& domain);
static void stop_all();
static std::pair<bool, RtspSource> describe(
const std::string& path,
const std::string& address,

View File

@ -854,10 +854,6 @@ BOOST_AUTO_TEST_CASE(add_remove_check_mdns_browser_update_all) {
BOOST_REQUIRE_MESSAGE(cli.add_source(id),
std::string("added source ") + std::to_string(id));
}
for (int id = 0; id < g_stream_num_max; id++) {
BOOST_REQUIRE_MESSAGE(cli.update_source(id),
std::string("updated source ") + std::to_string(id));
}
std::vector<std::string> sdps{g_stream_num_max};
for (int id = 0; id < g_stream_num_max; id++) {
auto sdp = cli.get_source_sdp(id);