From 5c1f4a06ab46e16e6bba42fe7381d8353020a1fd Mon Sep 17 00:00:00 2001 From: Gijs Peskens <gijs@in2ip.nl> Date: Thu, 22 Jul 2021 09:17:55 +0200 Subject: [PATCH] Move per packet message to debug level & release data earlier Per packet messages where flooding a system, creating a unrecoverable situation. Flushing data earlier when real time spend in buffer exceeds 1.1 * buffer duration ensure that it's more likely to be useful. We can then also safely drop any stale packets we find that are there longer than 2 buffer durations (there shouldn't be any). --- src/rist-common.c | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/rist-common.c b/src/rist-common.c index 315e0aee..c4b14b29 100755 --- a/src/rist-common.c +++ b/src/rist-common.c @@ -641,8 +641,7 @@ static int receiver_enqueue(struct rist_peer *peer, uint64_t source_time, uint64 return 1; } else { - //This case should never occur with the check against the read index above - rist_log_priv(get_cctx(peer), RIST_LOG_ERROR, "Invalid Dupe (possible seq discontinuity)! %"PRIu32", freeing buffer ...\n", seq); + rist_log_priv(get_cctx(peer), RIST_LOG_DEBUG, "Invalid Dupe (possible seq discontinuity)! %"PRIu32", freeing buffer ...\n", seq); free_rist_buffer(get_cctx(peer), b); f->receiver_queue[idx] = NULL; } @@ -873,21 +872,28 @@ static void receiver_output(struct rist_receiver *ctx, struct rist_flow *f) now = timestampNTP_u64(); uint64_t delay_rtc = (now - b->time); - if (RIST_UNLIKELY(delay_rtc > (2LLU * recovery_buffer_ticks))) { + if (RIST_UNLIKELY(delay_rtc > (1.1 * recovery_buffer_ticks))) { // Double check the age of the packet within our receiver queue // Safety net for discontinuities in source timestamp, clock drift or improperly scaled timestamp uint64_t delay = now > b->packet_time ? (now - b->packet_time) : 0; - rist_log_priv(&ctx->common, RIST_LOG_WARN, - "Packet %"PRIu32" (%zu bytes) is too old %"PRIu64"/%"PRIu64" ms, deadline = %"PRIu64", offset = %"PRId64" ms, releasing data\n", - b->seq, b->size, delay_rtc / RIST_CLOCK, delay / RIST_CLOCK, recovery_buffer_ticks / RIST_CLOCK, f->time_offset / RIST_CLOCK); - //Reset the flow if we keep hitting packets that are too late, likely our time offset is wrong. - f->too_late_ctr++; - if (f->too_late_ctr > 100) { - rist_log_priv(&ctx->common, RIST_LOG_ERROR, "Too many old packets, resetting buffer\n"); - f->receiver_queue_has_items = false; - return; - } - + bool drop = false; + //This should be impossible as we should catch it with the normal case + if (RIST_UNLIKELY(delay_rtc > (2ULL * recovery_buffer_ticks))) { + f->too_late_ctr++; + drop = true; + goto next; + } + rist_log_priv(&ctx->common, RIST_LOG_DEBUG, + "Packet %"PRIu32" (%zu bytes) is too old %"PRIu64"/%"PRIu64" ms, deadline = %"PRIu64", offset = %"PRId64" ms, %s data\n", + b->seq, b->size, + delay_rtc / RIST_CLOCK, delay / RIST_CLOCK, + recovery_buffer_ticks / RIST_CLOCK, f->time_offset / RIST_CLOCK, + drop? "dropping" : "releasing"); + if (f->too_late_ctr > 100) { + rist_log_priv(&ctx->common, RIST_LOG_ERROR, "Too many old packets, resetting buffer\n"); + f->receiver_queue_has_items = false; + return; + } } else if (b->target_output_time >= now) { // This is how we keep the buffer at the correct level @@ -971,6 +977,7 @@ static void receiver_output(struct rist_receiver *ctx, struct rist_flow *f) // fprintf(stderr, "rtcp skip at %"PRIu32", just removing it from queue\n", b->seq); f->last_seq_output = b->seq; +next: atomic_fetch_sub_explicit(&f->receiver_queue_size, b->size, memory_order_relaxed); f->receiver_queue[output_idx] = NULL; free_rist_buffer(&ctx->common, b); -- GitLab