Commit 666f2728 authored by Christophe Massiot's avatar Christophe Massiot

aggregartp/reordertp: allow retx without dedicated connection

parent 5c92f2e6
/*****************************************************************************
* aggregartp.c: split an RTP stream for several contribution links
*****************************************************************************
* Copyright (C) 2009, 2011 VideoLAN
* Copyright (C) 2009, 2011, 2014 VideoLAN
* $Id$
*
* Authors: Christophe Massiot <massiot@via.ecp.fr>
......@@ -46,12 +46,6 @@
/*****************************************************************************
* Local declarations
*****************************************************************************/
typedef union
{
struct sockaddr_storage ss;
struct sockaddr so;
} sockaddr_t;
typedef struct block_t
{
uint8_t *p_data;
......@@ -152,6 +146,7 @@ static void RetxQueue( block_t *p_block, uint64_t i_current_date )
{
p_block->i_date = i_current_date;
p_block->p_next = NULL;
rtp_set_marker( p_block->p_data );
/* Queue block */
if ( p_retx_last != NULL )
......@@ -177,14 +172,14 @@ static void RetxQueue( block_t *p_block, uint64_t i_current_date )
/*****************************************************************************
* RetxHandle: handle a retx query
*****************************************************************************/
static void RetxHandle(void)
static void RetxHandle( int i_fd )
{
ssize_t i_size = RETX_HEADER_SIZE - p_retx_block->i_size;
uint8_t *p_buffer = p_retx_block->p_data + p_retx_block->i_size;
sockaddr_t sout;
socklen_t i_len = sizeof(sout);
i_size = recvfrom( i_retx_fd, p_buffer, i_size, 0, &sout.so, &i_len );
i_size = recvfrom( i_fd, p_buffer, i_size, 0, &sout.so, &i_len );
if ( i_size < 0 && errno != EAGAIN && errno != EINTR &&
errno != ECONNREFUSED )
{
......@@ -229,7 +224,15 @@ static void RetxHandle(void)
while ( i_num && p_block != NULL )
{
SendBlock( i_retx_fd, i_len ? &sout.so : NULL, i_len, p_block );
if ( i_retx_fd == -1 )
{
output_t *p_output = NextOutput();
SendBlock( p_output->i_fd, NULL, 0, p_block );
}
else
{
SendBlock( i_retx_fd, i_len ? &sout.so : NULL, i_len, p_block );
}
p_block = p_block->p_next;
i_num--;
}
......@@ -248,7 +251,14 @@ int main( int i_argc, char **pp_argv )
int i_priority = -1;
int i_ttl = 0;
bool b_udp = false;
struct pollfd pfd[2];
struct pollfd *pfd = malloc(sizeof(struct pollfd));
int i_nb_retx = 1;
int i_fd;
#define ADD_RETX \
pfd = realloc( pfd, ++i_nb_retx * sizeof(struct pollfd) ); \
pfd[i_nb_retx - 1].fd = i_fd; \
pfd[i_nb_retx - 1].events = POLLIN;
while ( (c = getopt( i_argc, pp_argv, "i:t:wo:x:X:Um:R:h" )) != -1 )
{
......@@ -278,21 +288,21 @@ int main( int i_argc, char **pp_argv )
case 'x':
i_retx_buffer = strtoll( optarg, NULL, 0 ) * 27000;
p_retx_block = malloc( sizeof(block_t) + RETX_HEADER_SIZE );
p_retx_block->p_data = (uint8_t *)p_retx_block + sizeof(block_t);
p_retx_block->i_size = 0;
break;
case 'X':
i_retx_fd = OpenSocket( optarg, 0, 0, 0, NULL, &b_retx_tcp, NULL );
if ( i_retx_fd == -1 )
i_retx_fd = i_fd = OpenSocket( optarg, 0, 0, 0, NULL, &b_retx_tcp, NULL );
if ( i_fd == -1 )
{
msg_Err( NULL, "unable to set up retx with %s\n", optarg );
exit(EXIT_FAILURE);
}
pfd[1].fd = i_retx_fd;
pfd[1].events = POLLIN | POLLERR | POLLRDHUP | POLLHUP;
p_retx_block = malloc( sizeof(block_t) + RETX_HEADER_SIZE );
p_retx_block->p_data = (uint8_t *)p_retx_block + sizeof(block_t);
p_retx_block->i_size = 0;
ADD_RETX
break;
case 'U':
......@@ -331,7 +341,7 @@ int main( int i_argc, char **pp_argv )
while ( optind < i_argc )
{
p_outputs = realloc( p_outputs, ++i_nb_outputs * sizeof(output_t) );
p_outputs[i_nb_outputs - 1].i_fd =
p_outputs[i_nb_outputs - 1].i_fd = i_fd =
OpenSocket( pp_argv[optind++], i_ttl, 0, DEFAULT_PORT,
&p_outputs[i_nb_outputs - 1].i_weight, NULL, NULL );
if ( p_outputs[i_nb_outputs - 1].i_fd == -1 )
......@@ -343,6 +353,11 @@ int main( int i_argc, char **pp_argv )
p_outputs[i_nb_outputs - 1].i_weighted_size =
p_outputs[i_nb_outputs - 1].i_remainder = 0;
i_max_weight += p_outputs[i_nb_outputs - 1].i_weight;
if ( i_retx_fd == -1 )
{
ADD_RETX
}
}
msg_Dbg( NULL, "%d outputs weight %u%s", i_nb_outputs, i_max_weight,
i_retx_fd != -1 ? ", with retx" : "" );
......@@ -365,7 +380,7 @@ int main( int i_argc, char **pp_argv )
for ( ; ; )
{
uint64_t i_current_date;
if ( poll( pfd, i_retx_fd == -1 ? 1 : 2, -1 ) < 0 )
if ( poll( pfd, i_nb_retx + 1, -1 ) < 0 )
{
int saved_errno = errno;
msg_Warn( NULL, "couldn't poll(): %s", strerror(errno) );
......@@ -456,11 +471,7 @@ int main( int i_argc, char **pp_argv )
p_output->i_remainder = (i_size + p_output->i_remainder)
% p_output->i_weight;
if ( i_retx_fd != -1 )
RetxQueue( p_input_block, i_current_date );
else
free( p_input_block );
RetxQueue( p_input_block, i_current_date );
p_input_block = NULL;
}
else if ( (pfd[0].revents & (POLLERR | POLLRDHUP | POLLHUP)) )
......@@ -469,14 +480,13 @@ int main( int i_argc, char **pp_argv )
exit(EXIT_FAILURE);
}
if ( i_retx_fd != -1 && (pfd[1].revents & POLLIN) )
RetxHandle();
else if ( i_retx_fd != -1 &&
(pfd[1].revents & (POLLERR | POLLRDHUP | POLLHUP)) )
int i;
for ( i = 0; i < i_nb_retx; i++ )
{
msg_Err( NULL, "poll error\n" );
exit(EXIT_FAILURE);
if ( pfd[i + 1].revents & POLLIN )
{
RetxHandle( pfd[i + 1].fd );
}
}
}
......
/*****************************************************************************
* reordertp.c: rebuild an RTP stream from several aggregated links
*****************************************************************************
* Copyright (C) 2009, 2011 VideoLAN
* Copyright (C) 2009, 2011, 2014 VideoLAN
* $Id$
*
* Authors: Christophe Massiot <massiot@via.ecp.fr>
......@@ -72,6 +72,7 @@ typedef struct input_t
int i_fd;
bool b_tcp;
block_t *p_block;
sockaddr_t peer;
} input_t;
static size_t i_asked_payload_size = DEFAULT_PAYLOAD_SIZE;
......@@ -108,6 +109,7 @@ static int i_cr_average = DEFAULT_CR_AVERAGE;
static uint64_t i_retx_delay = DEFAULT_RETX_DELAY * 27000;
static int i_retx_fd = -1;
static unsigned int i_max_retx_burst = DEFAULT_MAX_RETX_BURST;
static int i_last_retx_input = 0;
static void usage(void)
{
......@@ -166,8 +168,6 @@ void clock_NewRef( uint64_t i_clock, uint64_t i_wall )
return;
}
input_clock.last_cr = i_clock;
/* Smooth clock reference variations. */
i_extrapoled_clock = input_clock.cr_ref
+ i_wall - input_clock.wall_ref;
......@@ -191,6 +191,7 @@ void clock_NewRef( uint64_t i_clock, uint64_t i_wall )
return;
}
input_clock.i_nb_space_packets = 0;
input_clock.last_cr = i_clock;
/* Bresenham algorithm to smooth variations. */
/* Gives a lot of importance to the first samples, but we suppose the
......@@ -226,6 +227,30 @@ static void RetxDereference( block_t *p_block )
p_last = p_block->p_prev;
}
static int RetxGetFd(sockaddr_t **pp_sockaddr)
{
if ( i_retx_fd != -1 ) {
*pp_sockaddr = NULL;
return i_retx_fd;
}
int i_nb_tries = 0;
while ( i_nb_tries < i_nb_inputs )
{
i_nb_tries++;
i_last_retx_input++;
i_last_retx_input %= i_nb_inputs;
if ( p_inputs[i_last_retx_input].peer.so.sa_family != AF_UNSPEC )
break;
}
if ( i_nb_tries == i_nb_inputs + 1 )
return -1;
*pp_sockaddr = &p_inputs[i_last_retx_input].peer;
return p_inputs[i_last_retx_input].i_fd;
}
static void RetxCheck( uint64_t i_current_date )
{
int i;
......@@ -265,7 +290,10 @@ static void RetxCheck( uint64_t i_current_date )
{
unsigned int i_nb_packets = (POW2_16 + i_current_seqnum -
(i_prev_seqnum + 1)) % POW2_16;
if ( i_retx_fd != -1 && i_nb_packets <= i_max_retx_burst )
sockaddr_t *p_sockaddr;
int i_fd;
if ( i_nb_packets <= i_max_retx_burst &&
(i_fd = RetxGetFd(&p_sockaddr)) != -1 )
{
uint8_t p_buffer[RETX_HEADER_SIZE];
msg_Dbg( NULL, "missing RTP packets %hu to %hu, retx started",
......@@ -274,7 +302,11 @@ static void RetxCheck( uint64_t i_current_date )
retx_init(p_buffer);
retx_set_seqnum(p_buffer, (i_prev_seqnum + 1) % POW2_16);
retx_set_num(p_buffer, i_nb_packets);
send( i_retx_fd, p_buffer, RETX_HEADER_SIZE, 0 );
if ( p_sockaddr == NULL )
send( i_fd, p_buffer, RETX_HEADER_SIZE, 0 );
else
sendto( i_fd, p_buffer, RETX_HEADER_SIZE, 0,
&p_sockaddr->so, sizeof(sockaddr_t) );
}
else
{
......@@ -354,6 +386,12 @@ static void PacketRecv( block_t *p_block, uint64_t i_date )
break;
}
if ( rtp_check_marker( p_block->p_data ) )
{
i_date = 0;
rtp_clear_marker( p_block->p_data );
}
if ( i_date )
clock_NewRef( i_scaled_timestamp, i_date );
......@@ -370,9 +408,9 @@ static void PacketRecv( block_t *p_block, uint64_t i_date )
{
block_t *p_prev = p_last;
while ( p_prev != NULL &&
((UINT16_MAX * 3 / 2 + (int32_t)p_prev->i_seqnum -
(int32_t)p_block->i_seqnum)
% UINT16_MAX - UINT16_MAX / 2) > 0 )
(POW2_16 * 3 / 2 + (uint32_t)p_prev->i_seqnum -
(uint32_t)p_block->i_seqnum)
% POW2_16 > POW2_16 / 2 )
p_prev = p_prev->p_prev;
if ( p_prev == NULL )
{
......@@ -411,6 +449,7 @@ int main( int i_argc, char **pp_argv )
p_inputs[i_nb_inputs - 1].i_fd = i_fd; \
p_inputs[i_nb_inputs - 1].b_tcp = b_tcp; \
p_inputs[i_nb_inputs - 1].p_block = NULL; \
p_inputs[i_nb_inputs - 1].peer.so.sa_family = AF_UNSPEC; \
pfd = realloc( pfd, i_nb_inputs * sizeof(struct pollfd) ); \
pfd[i_nb_inputs - 1].fd = i_fd; \
pfd[i_nb_inputs - 1].events = POLLIN | POLLERR | POLLRDHUP | POLLHUP;
......@@ -577,7 +616,14 @@ int main( int i_argc, char **pp_argv )
i_size -= p_input->p_block->i_size;
}
i_size = read( p_input->i_fd, p_buffer, i_size );
if ( p_input->b_tcp )
i_size = read( p_input->i_fd, p_buffer, i_size );
else
{
socklen_t len = sizeof(sockaddr_t);
i_size = recvfrom( p_input->i_fd, p_buffer, i_size, 0,
&p_input->peer.so, &len );
}
if ( i_size < 0 && errno != EAGAIN && errno != EINTR &&
errno != ECONNREFUSED )
{
......
......@@ -56,17 +56,6 @@
int i_verbose = VERB_DBG;
/*****************************************************************************
* sockaddr_t: wrapper to avoid strict-aliasing issues
*****************************************************************************/
typedef union
{
struct sockaddr_storage ss;
struct sockaddr so;
struct sockaddr_in sin;
struct sockaddr_in6 sin6;
} sockaddr_t;
/*****************************************************************************
* msg_Info
*****************************************************************************/
......
/*****************************************************************************
* util.h: Utils for the multicat suite
*****************************************************************************
* Copyright (C) 2009, 2011 VideoLAN
* Copyright (C) 2009, 2011, 2014 VideoLAN
* $Id$
*
* Authors: Christophe Massiot <massiot@via.ecp.fr>
......@@ -40,6 +40,17 @@
#define VERB_INFO 2
#define VERB_WARN 1
/*****************************************************************************
* sockaddr_t: wrapper to avoid strict-aliasing issues
*****************************************************************************/
typedef union
{
struct sockaddr_storage ss;
struct sockaddr so;
struct sockaddr_in sin;
struct sockaddr_in6 sin6;
} sockaddr_t;
/*****************************************************************************
* Raw udp packet structure with flexible-array payload
*****************************************************************************/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment