Commit 2cb6e845 authored by Sergio Ammirata's avatar Sergio Ammirata Committed by Thomas Guillem

access: add RIST module

RIST for the Reliable Internet Stream Transport Protocol

The implementation follows the Video Services Forum (VSF) Technical
Recommendation TR-06 which defines an ARQ based UDP transmission protocol
for real-time streaming over lossy networks (internet, wifi, etc).
Signed-off-by: Thomas Guillem's avatarThomas Guillem <thomas@gllm.fr>
parent abf93d8f
......@@ -29,6 +29,7 @@ Codecs:
Access:
* Enable SMB2 / SMB3 support on mobile ports with libsmb2
* Added support for the RIST (Reliable Internet Stream Transport) Protocol
Video output:
* Remove aa plugin
......
......@@ -17,6 +17,7 @@ $Id$
* access_output_srt: SRT (Secure Reliable Transport) access_output module
* access_output_udp: UDP Network access_output module
* access_qtsound: Quicktime Audio Capture
* access_rist: RIST (Reliable Internet Stream Transport) access module
* access_srt: SRT(Secure Reliable Transport) access module
* access_wasapi: WASAPI audio input
* accesstweaks: access control tweaking module (dev tool)
......
......@@ -428,3 +428,11 @@ libaccess_srt_plugin_la_LIBADD = $(SRT_LIBS)
libaccess_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(accessdir)'
access_LTLIBRARIES += $(LTLIBaccess_srt)
EXTRA_LTLIBRARIES += libaccess_srt_plugin.la
### RIST ###
librist_plugin_la_SOURCES = access/rist.c access/rist.h
librist_plugin_la_CFLAGS = $(AM_CFLAGS) $(BITSTREAM_CFLAGS)
if HAVE_BITSTREAM
access_LTLIBRARIES += librist_plugin.la
endif
/*****************************************************************************
* rist.c: RIST (Reliable Internet Stream Transport) input module
*****************************************************************************
* Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
* Copyright (C) 2018, SipRadius LLC
*
* Authors: Sergio Ammirata <sergio@ammirata.net>
* Daniele Lacamera <root@danielinux.net>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <vlc_common.h>
#include <vlc_interrupt.h>
#include <vlc_plugin.h>
#include <vlc_access.h>
#include <vlc_threads.h>
#include <vlc_network.h>
#include <vlc_block.h>
#include <vlc_url.h>
#ifdef HAVE_POLL
#include <poll.h>
#endif
#include <bitstream/ietf/rtcp_rr.h>
#include <bitstream/ietf/rtcp_sdes.h>
#include <bitstream/ietf/rtcp_fb.h>
#include <bitstream/ietf/rtp.h>
#include "rist.h"
/* The default latency is 1000 ms */
#define RIST_DEFAULT_LATENCY 1000
/* The default nack retry interval */
#define RIST_DEFAULT_RETRY_INTERVAL 132
/* The default packet re-ordering buffer */
#define RIST_DEFAULT_REORDER_BUFFER 70
/* The default max packet size */
#define RIST_MAX_PACKET_SIZE 1472
/* The default timeout is 5 ms */
#define RIST_DEFAULT_POLL_TIMEOUT 5
/* The max retry count for nacks */
#define RIST_MAX_RETRIES 10
/* The rate at which we process and send nack requests */
#define NACK_INTERVAL 5 /*ms*/
/* Calculate and print stats once per second */
#define STATS_INTERVAL 1000 /*ms*/
static const int nack_type[] = {
0, 1,
};
static const char *const nack_type_names[] = {
N_("Range"), N_("Bitmask"),
};
enum NACK_TYPE {
NACK_FMT_RANGE = 0,
NACK_FMT_BITMASK
};
typedef struct
{
struct rist_flow *flow;
char sender_name[MAX_CNAME];
enum NACK_TYPE nack_type;
uint64_t last_data_rx;
uint64_t last_nack_tx;
vlc_thread_t thread;
int i_max_packet_size;
int i_poll_timeout;
int i_poll_timeout_current;
bool eof_on_reset;
block_fifo_t *p_fifo;
vlc_mutex_t lock;
uint64_t last_message;
uint64_t last_reset;
/* stat variables */
uint32_t i_poll_timeout_zero_count;
uint32_t i_poll_timeout_nonzero_count;
uint64_t i_last_stat;
float vbr_ratio;
uint16_t vbr_ratio_count;
uint32_t i_lost_packets;
uint32_t i_recovered_packets;
uint32_t i_reordered_packets;
uint32_t i_total_packets;
} stream_sys_t;
static int Control(stream_t *p_access, int i_query, va_list args)
{
switch( i_query )
{
case STREAM_CAN_SEEK:
case STREAM_CAN_FASTSEEK:
case STREAM_CAN_PAUSE:
case STREAM_CAN_CONTROL_PACE:
*va_arg( args, bool * ) = false;
break;
case STREAM_GET_PTS_DELAY:
*va_arg( args, vlc_tick_t * ) = VLC_TICK_FROM_MS(
var_InheritInteger(p_access, "network-caching") );
break;
default:
return VLC_EGENERIC;
}
return VLC_SUCCESS;
}
static struct rist_flow *rist_init_rx(void)
{
struct rist_flow *flow = calloc(1, sizeof(struct rist_flow));
if (!flow)
return NULL;
flow->reset = 2;
flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
if ( unlikely( flow->buffer == NULL ) )
{
free(flow);
return NULL;
}
return flow;
}
static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock, int fd, const void *buf, size_t len,
const struct sockaddr *peer, socklen_t slen)
{
vlc_mutex_lock( &lock );
rist_WriteTo_i11e(fd, buf, len, peer, slen);
vlc_mutex_unlock( &lock );
}
static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url)
{
stream_sys_t *p_sys = p_access->p_sys;
msg_Info( p_access, "Opening Rist Flow Receiver at %s:%d and %s:%d",
parsed_url->psz_host, parsed_url->i_port,
parsed_url->psz_host, parsed_url->i_port+1);
p_sys->flow = rist_init_rx();
if (!p_sys->flow)
return NULL;
p_sys->flow->fd_in = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port, NULL,
0, IPPROTO_UDP);
if (p_sys->flow->fd_in < 0)
{
msg_Err( p_access, "cannot open input socket" );
return NULL;
}
p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
NULL, 0, IPPROTO_UDP);
if (p_sys->flow->fd_nack < 0)
{
msg_Err( p_access, "cannot open nack socket" );
return NULL;
}
populate_cname(p_sys->flow->fd_nack, p_sys->flow->cname);
msg_Info(p_access, "our cname is %s", p_sys->flow->cname);
return p_sys->flow;
}
static int is_index_in_range(struct rist_flow *flow, uint16_t idx)
{
if (flow->ri <= flow->wi) {
return ((idx > flow->ri) && (idx <= flow->wi));
} else {
return ((idx > flow->ri) || (idx <= flow->wi));
}
}
static void send_rtcp_feedback(stream_t *p_access, struct rist_flow *flow)
{
stream_sys_t *p_sys = p_access->p_sys;
int namelen = strlen(flow->cname) + 1;
/* we need to make sure it is a multiple of 4, pad if necessary */
if ((namelen - 2) & 0x3)
namelen = ((((namelen - 2) >> 2) + 1) << 2) + 2;
int rtcp_feedback_size = RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE + namelen;
uint8_t *buf = malloc(rtcp_feedback_size);
if ( unlikely( buf == NULL ) )
return;
/* Populate RR */
uint8_t *rr = buf;
rtp_set_hdr(rr);
rtcp_rr_set_pt(rr);
rtcp_set_length(rr, 1);
rtcp_fb_set_int_ssrc_pkt_sender(rr, 0);
/* Populate SDES */
uint8_t *p_sdes = (buf + RTCP_EMPTY_RR_SIZE);
rtp_set_hdr(p_sdes);
rtp_set_cc(p_sdes, 1); /* Actually it is source count in this case */
rtcp_sdes_set_pt(p_sdes);
rtcp_set_length(p_sdes, (namelen >> 2) + 2);
rtcp_sdes_set_cname(p_sdes, 1);
rtcp_sdes_set_name_length(p_sdes, strlen(flow->cname));
uint8_t *p_sdes_name = (buf + RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE);
strlcpy((char *)p_sdes_name, flow->cname, namelen);
/* Write to Socket */
rist_WriteTo_i11e_Locked(p_sys->lock, flow->fd_nack, buf, rtcp_feedback_size,
(struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
free(buf);
buf = NULL;
}
static void send_bbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
{
stream_sys_t *p_sys = p_access->p_sys;
struct rist_flow *flow = p_sys->flow;
int len = 0;
int bbnack_bufsize = RTCP_FB_HEADER_SIZE +
RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
uint8_t *buf = malloc(bbnack_bufsize);
if ( unlikely( buf == NULL ) )
return;
/* Populate NACKS */
uint8_t *nack = buf;
rtp_set_hdr(nack);
rtcp_fb_set_fmt(nack, NACK_FMT_BITMASK);
rtcp_set_pt(nack, RTCP_PT_RTPFB);
rtcp_set_length(nack, 2 + nack_count);
/*uint8_t name[4] = "RIST";*/
/*rtcp_fb_set_ssrc_media_src(nack, name);*/
len += RTCP_FB_HEADER_SIZE;
/* TODO : group together */
uint16_t nacks[MAX_NACKS];
memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
for (int i = 0; i < nack_count; i++) {
uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
rtcp_fb_nack_set_packet_id(nack_record, nacks[i]);
rtcp_fb_nack_set_bitmask_lost(nack_record, 0);
}
len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
/* Write to Socket */
rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
(struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
free(buf);
buf = NULL;
}
static void send_rbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
{
stream_sys_t *p_sys = p_access->p_sys;
struct rist_flow *flow = p_sys->flow;
int len = 0;
int rbnack_bufsize = RTCP_FB_HEADER_SIZE +
RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
uint8_t *buf = malloc(rbnack_bufsize);
if ( unlikely( buf == NULL ) )
return;
/* Populate NACKS */
uint8_t *nack = buf;
rtp_set_hdr(nack);
rtcp_fb_set_fmt(nack, NACK_FMT_RANGE);
rtcp_set_pt(nack, RTCP_PT_RTPFR);
rtcp_set_length(nack, 2 + nack_count);
uint8_t name[4] = "RIST";
rtcp_fb_set_ssrc_media_src(nack, name);
len += RTCP_FB_HEADER_SIZE;
/* TODO : group together */
uint16_t nacks[MAX_NACKS];
memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
for (int i = 0; i < nack_count; i++)
{
uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
rtcp_fb_nack_set_range_start(nack_record, nacks[i]);
rtcp_fb_nack_set_range_extra(nack_record, 0);
}
len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
/* Write to Socket */
rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
(struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
free(buf);
buf = NULL;
}
static void send_nacks(stream_t *p_access, struct rist_flow *flow)
{
stream_sys_t *p_sys = p_access->p_sys;
struct rtp_pkt *pkt;
uint16_t idx;
uint64_t last_ts = 0;
uint16_t null_count = 0;
int nacks_len = 0;
uint16_t nacks[MAX_NACKS];
idx = flow->ri;
while(idx++ != flow->wi)
{
pkt = &(flow->buffer[idx]);
if (pkt->buffer == NULL)
{
if (nacks_len + 1 >= MAX_NACKS)
{
break;
}
else
{
null_count++;
/* TODO: after adding average spacing calculation, change this formula
to extrapolated_ts = last_ts + null_count * avg_delta_ts; */
uint64_t extrapolated_ts = last_ts;
/* Find out the age and add it only if necessary */
int retry_count = flow->nacks_retries[idx];
uint64_t age = flow->hi_timestamp - extrapolated_ts;
uint64_t expiration;
if (retry_count == 0){
expiration = flow->reorder_buffer;
} else {
expiration = flow->nacks_retries[idx] * flow->retry_interval;
}
if (age > expiration && retry_count <= flow->max_retries)
{
flow->nacks_retries[idx]++;
nacks[nacks_len++] = idx;
msg_Dbg(p_access, "Sending NACK for seq %d, age %"PRId64" ms, retry %u, " \
"expiration %"PRId64" ms", idx, ts_get_from_rtp(age)/1000,
flow->nacks_retries[idx], ts_get_from_rtp(expiration)/1000);
}
}
}
else
{
last_ts = pkt->rtp_ts;
null_count = 0;
}
}
if (nacks_len > 0)
{
block_t *pkt_nacks = block_Alloc(nacks_len * 2);
if (pkt_nacks)
{
memcpy(pkt_nacks->p_buffer, nacks, nacks_len * 2);
pkt_nacks->i_buffer = nacks_len * 2;
block_FifoPut( p_sys->p_fifo, pkt_nacks );
}
}
}
static int sockaddr_cmp(struct sockaddr *x, struct sockaddr *y)
{
#define CMP(a, b) if (a != b) return a < b ? -1 : 1
CMP(x->sa_family, y->sa_family);
if (x->sa_family == AF_INET)
{
struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
CMP(ntohl(xin->sin_addr.s_addr), ntohl(yin->sin_addr.s_addr));
CMP(ntohs(xin->sin_port), ntohs(yin->sin_port));
}
else if (x->sa_family == AF_INET6)
{
struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
int r = memcmp(xin6->sin6_addr.s6_addr, yin6->sin6_addr.s6_addr,
sizeof(xin6->sin6_addr.s6_addr));
if (r != 0)
return r;
CMP(ntohs(xin6->sin6_port), ntohs(yin6->sin6_port));
CMP(xin6->sin6_flowinfo, yin6->sin6_flowinfo);
CMP(xin6->sin6_scope_id, yin6->sin6_scope_id);
}
#undef CMP
return 0;
}
static void print_sockaddr_info_change(stream_t *p_access, struct sockaddr *x, struct sockaddr *y)
{
if (x->sa_family == AF_INET)
{
struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
inet_ntoa(xin->sin_addr), ntohs(xin->sin_port), inet_ntoa(yin->sin_addr),
ntohs(yin->sin_port));
}
else if (x->sa_family == AF_INET6)
{
struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
char oldstr[INET6_ADDRSTRLEN];
char newstr[INET6_ADDRSTRLEN];
inet_ntop(xin6->sin6_family, &xin6->sin6_addr, oldstr, sizeof(struct in6_addr));
inet_ntop(yin6->sin6_family, &yin6->sin6_addr, newstr, sizeof(struct in6_addr));
msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
oldstr, ntohs(xin6->sin6_port), newstr, ntohs(yin6->sin6_port));
}
}
static void print_sockaddr_info(stream_t *p_access, struct sockaddr *x)
{
if (x->sa_family == AF_INET)
{
struct sockaddr_in *xin = (void*)x;
msg_Info(p_access, "Peer IP:Port %s:%d", inet_ntoa(xin->sin_addr), ntohs(xin->sin_port));
}
else if (x->sa_family == AF_INET6)
{
struct sockaddr_in6 *xin6 = (void*)x;
char str[INET6_ADDRSTRLEN];
inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
msg_Info(p_access, "Peer IP:Port %s:%d", str, ntohs(xin6->sin6_port));
}
}
static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_in, size_t len,
struct sockaddr *peer, socklen_t slen)
{
stream_sys_t *p_sys = p_access->p_sys;
uint8_t ptype;
uint16_t processed_bytes = 0;
uint16_t records;
char new_sender_name[MAX_CNAME];
uint8_t *buf;
while (processed_bytes < len) {
buf = buf_in + processed_bytes;
/* safety checks */
uint16_t bytes_left = len - processed_bytes + 1;
if ( bytes_left < 4 )
{
/* we must have at least 4 bytes */
msg_Err(p_access, "Rist rtcp packet must have at least 4 bytes, we have %d",
bytes_left);
return;
}
else if (!rtp_check_hdr(buf))
{
/* check for a valid rtp header */
msg_Err(p_access, "Malformed rtcp packet starting with %02x, ignoring.", buf[0]);
return;
}
ptype = rtcp_get_pt(buf);
records = rtcp_get_length(buf);
uint16_t bytes = (uint16_t)(4 * (1 + records));
if (bytes > bytes_left)
{
/* check for a sane number of bytes */
msg_Err(p_access, "Malformed rtcp packet, wrong len %d, expecting %u bytes in the " \
"packet, got a buffer of %u bytes.", rtcp_get_length(buf), bytes, bytes_left);
return;
}
switch(ptype) {
case RTCP_PT_RTPFR:
case RTCP_PT_RTPFB:
break;
case RTCP_PT_RR:
break;
case RTCP_PT_SDES:
{
/* Check for changes in source IP address or port */
int8_t name_length = rtcp_sdes_get_name_length(buf);
if (name_length > bytes_left)
{
/* check for a sane number of bytes */
msg_Err(p_access, "Malformed SDES packet, wrong cname len %u, got a " \
"buffer of %u bytes.", name_length, bytes_left);
return;
}
bool ip_port_changed = false;
if (sockaddr_cmp((struct sockaddr *)&flow->peer_sockaddr, peer) != 0)
{
ip_port_changed = true;
if(flow->peer_socklen > 0)
print_sockaddr_info_change(p_access,
(struct sockaddr *)&flow->peer_sockaddr, peer);
else
print_sockaddr_info(p_access, peer);
vlc_mutex_lock( &p_sys->lock );
memcpy(&flow->peer_sockaddr, peer, sizeof(struct sockaddr_storage));
flow->peer_socklen = slen;
vlc_mutex_unlock( &p_sys->lock );
}
/* Check for changes in cname */
bool peer_name_changed = false;
memset(new_sender_name, 0, MAX_CNAME);
memcpy(new_sender_name, buf + RTCP_SDES_SIZE, name_length);
if (memcmp(new_sender_name, p_sys->sender_name, name_length) != 0)
{
peer_name_changed = true;
if (strcmp(p_sys->sender_name, "") == 0)
msg_Info(p_access, "Peer Name: %s", new_sender_name);
else
msg_Info(p_access, "Peer Name change detected: old Name: %s, new " \
"Name: %s", p_sys->sender_name, new_sender_name);
memset(p_sys->sender_name, 0, MAX_CNAME);
memcpy(p_sys->sender_name, buf + RTCP_SDES_SIZE, name_length);
}
/* Reset the buffer as the source must have been restarted */
if (peer_name_changed || ip_port_changed)
{
/* reset the buffer */
flow->reset = 1;
}
}
break;
case RTCP_PT_SR:
break;
default:
msg_Err(p_access, " Unrecognized RTCP packet with PTYPE=%02x!!", ptype);
}
processed_bytes += (4 * (1 + records));
}
}
static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf, size_t len)
{
stream_sys_t *p_sys = p_access->p_sys;
/* safety checks */
if ( len < RTP_HEADER_SIZE )
{
/* check if packet size >= rtp header size */
msg_Err(p_access, "Rist rtp packet must have at least 12 bytes, we have %lu", len);
return false;
}
else if (!rtp_check_hdr(buf))
{
/* check for a valid rtp header */
msg_Err(p_access, "Malformed rtp packet header starting with %02x, ignoring.", buf[0]);
return false;
}
uint16_t idx = rtp_get_seqnum(buf);
uint32_t pkt_ts = rtp_get_timestamp(buf);
bool retrasnmitted = false;
bool success = true;
uint64_t now = vlc_tick_now();
if (flow->reset == 2)
{
if ((uint64_t)(now - p_sys->last_message) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) ) {
msg_Info(p_access, "Waiting for Sender's Coordinates, i.e. rtcp handshake ...");
}
p_sys->last_message = now;
return success;
}
else if (flow->reset == 1)
{
msg_Info(p_access, "Traffic detected after buffer reset");
/* First packet in the queue */
flow->hi_timestamp = pkt_ts;
msg_Info(p_access, "ts@%u", flow->hi_timestamp);
flow->wi = idx;
flow->ri = idx;
flow->reset = 0;
}
/* Check to see if this is a retransmission or a regular packet */
if (buf[11] & (1 << 0))
{
msg_Dbg(p_access, "Packet %d RECOVERED, Window: [%d:%d-->%d]", idx, flow->ri, flow->wi,
flow->wi-flow->ri);
p_sys->i_recovered_packets++;
retrasnmitted = true;
}
else if (flow->wi != flow->ri)
{
/* Reset counter to 0 on incoming holes */
/* Regular packets only as retransmits are expected to come in out of order */
uint16_t idxnext = (uint16_t)(flow->wi + 1);
if (idx != idxnext)
{
if (idx > idxnext) {
msg_Dbg(p_access, "Gap, got %d, expected %d, %d packet gap, Window: [%d:%d-->%d]",
idx, idxnext, idx - idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
} else {
p_sys->i_reordered_packets++;
msg_Dbg(p_access, "Out of order, got %d, expected %d, Window: [%d:%d-->%d]", idx,
idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
}
uint16_t zero_counter = (uint16_t)(flow->wi + 1);
while(zero_counter++ != idx) {
flow->nacks_retries[zero_counter] = 0;
}
/*msg_Dbg(p_access, "Gap, reseting %d packets as zero nacks %d to %d",
idx - flow->wi - 1, (uint16_t)(flow->wi + 1), idx);*/
}
}
/* Always replace the existing one with the new one */
struct rtp_pkt *pkt;
pkt = &(flow->buffer[idx]);
if (pkt->buffer && pkt->buffer->i_buffer > 0)
{
block_Release(pkt->buffer);
pkt->buffer = NULL;
}
pkt->buffer = block_Alloc(len);
if (!pkt->buffer)
return false;
pkt->buffer->i_buffer = len;
memcpy(pkt->buffer->p_buffer, buf, len);
pkt->rtp_ts = pkt_ts;
p_sys->last_data_rx = vlc_tick_now();
/* Reset the try counter regardless of wether it was a retransmit or not */
flow->nacks_retries[idx] = 0;
if (retrasnmitted)
return success;
p_sys->i_total_packets++;
/* Perform discontinuity checks and udpdate counters */
if (!is_index_in_range(flow, idx) && pkt_ts >= flow->hi_timestamp)
{
if ((pkt_ts - flow->hi_timestamp) > flow->hi_timestamp/10)
{
msg_Info(p_access, "Forward stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
flow->wi, pkt_ts, flow->hi_timestamp);
flow->reset = 1;
success = false;
}
else
{
flow->wi = idx;
flow->hi_timestamp = pkt_ts;
}
}
else if (!is_index_in_range(flow, idx))
{
/* incoming timestamp just jumped back in time or index is outside of scope */
msg_Info(p_access, "Backwards stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
flow->wi, pkt_ts, flow->hi_timestamp);
flow->reset = 1;
success = false;
}
return success;
}
static block_t *rist_dequeue(stream_t *p_access, struct rist_flow *flow)
{
stream_sys_t *p_sys = p_access->p_sys;
block_t *pktout = NULL;
struct rtp_pkt *pkt;
uint16_t idx;
if (flow->ri == flow->wi || flow->reset > 0)
return NULL;
idx = flow->ri;
bool found_data = false;
uint16_t loss_amount = 0;
while(idx++ != flow->wi) {
pkt = &(flow->buffer[idx]);
if (!pkt->buffer)