Commit d0f58fef authored by Sergio Ammirata's avatar Sergio Ammirata
Browse files

Fix peer and flow timeout checks. They were incorrectly being marked as dead...

Fix peer and flow timeout checks. They were incorrectly being marked as dead when  the configured buffer was smaller than the RTCP intervals.
parent 9604b753
......@@ -217,6 +217,7 @@ static struct rist_flow *create_flow(struct rist_receiver *ctx, uint32_t flow_id
atomic_init(&f->dataout_fifo_queue_read_index, 0);
f->session_timeout = RIST_DEFAULT_SESSION_TIMEOUT * RIST_CLOCK;
f->flow_timeout = 250 * RIST_CLOCK;
/* Append flow to list */
rist_flow_append(&ctx->common.FLOWS, f);
......@@ -301,6 +302,11 @@ int rist_receiver_associate_flow(struct rist_peer *p, uint32_t flow_id)
if (f->recovery_buffer_ticks > f->session_timeout)
f->session_timeout = 2ULL * f->recovery_buffer_ticks;
}
// Set the flow timeout as the buffer size for the flow
// However, we start with 250 ms as the minimum/default
// to make sure it is larger than the RTCP interval
if (f->recovery_buffer_ticks > f->flow_timeout)
f->flow_timeout = f->recovery_buffer_ticks;
uint64_t stats_report_time = get_cctx(p)->stats_report_time;
if (stats_report_time != 0 && stats_report_time != f->stats_report_time)
f->stats_report_time = stats_report_time;
......
......@@ -583,7 +583,6 @@ static int receiver_enqueue(struct rist_peer *peer, uint64_t source_time, const
packet_time = next->packet_time;
}
}
f->last_recv_ts = now_monotonic;
// Now, get the new position and check what is there
/* We need to check if the reader queue has progressed passed this packet, if
......@@ -955,13 +954,16 @@ static void receiver_output(struct rist_receiver *ctx, struct rist_flow *f)
output_idx = (output_idx + 1)& (f->receiver_queue_max -1);
atomic_store_explicit(&f->receiver_queue_output_idx, output_idx, memory_order_release);
if (atomic_load_explicit(&f->receiver_queue_size, memory_order_acquire) == 0) {
if (f->last_output_time == 0)
f->last_output_time = now;
uint64_t delta = now - f->last_output_time;
rist_log_priv(&ctx->common, RIST_LOG_DEBUG, "Buffer is empty, it has been for %"PRIu64" < %"PRIu64" (ms)!\n",
delta / RIST_CLOCK, recovery_buffer_ticks / RIST_CLOCK);
// if the entire buffer is empty, something is very wrong, reset the queue ...
if (delta > recovery_buffer_ticks)
{
rist_log_priv(&ctx->common, RIST_LOG_ERROR, "stream is dead, re-initializing flow\n");
rist_log_priv(&ctx->common, RIST_LOG_ERROR, "stream is dead (%"PRIu64" ms), re-initializing flow\n",
delta/ RIST_CLOCK);
f->receiver_queue_has_items = false;
}
// exit the function and wait 5ms (max jitter time)
......@@ -1243,7 +1245,13 @@ struct rist_peer *rist_receiver_peer_insert_local(struct rist_receiver *ctx,
}
if (config->session_timeout > 0) {
p->session_timeout = config->session_timeout * RIST_CLOCK;
if (p->session_timeout < 250) {
rist_log_priv(&ctx->common, RIST_LOG_WARN, "The configured (%d ms) peer session timeout is too small, using %d ms instead\n",
config->session_timeout, 250);
p->session_timeout = 250 * RIST_CLOCK;
}
else
p->session_timeout = config->session_timeout * RIST_CLOCK;
}
else {
p->session_timeout = 250 * RIST_CLOCK;
......@@ -2508,6 +2516,8 @@ protocol_bypass:
dead_time / RIST_CLOCK, p->adv_peer_id);
}
p->last_rtcp_received = now;
if (p->flow)
p->flow->last_recv_ts = now;
payload.peer = p;
if (cctx->profile == RIST_PROFILE_SIMPLE)
{
......@@ -3568,9 +3578,11 @@ PTHREAD_START_FUNC(receiver_pthread_protocol, arg)
continue;
}
if (now > f->checks_next_time) {
if (f->last_recv_ts == 0)
f->last_recv_ts = now;
uint64_t flow_age = (now - f->last_recv_ts);
f->checks_next_time += f->recovery_buffer_ticks;
if (flow_age > f->recovery_buffer_ticks) {
if (flow_age > f->flow_timeout) {
if (f->dead != 1) {
f->dead = 1;
rist_log_priv(&ctx->common, RIST_LOG_WARN,
......
......@@ -243,6 +243,7 @@ struct rist_flow {
/* Session timeouts variables */
uint64_t session_timeout;
uint64_t flow_timeout;
uint64_t last_recv_ts;
/* Receiver timed async data output */
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment