Commit 4df8003c authored by Christophe Massiot's avatar Christophe Massiot

* reordertp.c: Renamed desaggregartp to reordertp

 * multicat.c, aggregartp.c, reordertp.c, util.c: Support for TCP sockets (/tcp)
 * aggregartp.c, reordertp.c: Support for packet retransmission over UDP or TCP
 * ALL: Use biTStream instead of util.h' parsing functions
 * aggregartp.c, reordertp.c: Remove limits on number of inputs/outputs
 * multicat.c: Fix segfault on non-existant aux file
 * reordertp.c: Packets with too much jitter are not taken into account for
   clock ("space packets")
 * reordertp.c: Add options to configure gap, jitter and clock recovery
   thresholds
 * ingests.c: Fix a major timestamping bug where first packets would be sent
   too fast, and the rest of the timestamps are off
 * util.c: Merge IPv4 and IPv6 functions
 * multicat.c, aggregartp.c, reordertp.c: Add an option to specify the RTP
   header size.
parent 447b8976
......@@ -7,19 +7,19 @@ LDLIBS += -lrt
OBJ_MULTICAT = multicat.o util.o
OBJ_INGESTS = ingests.o util.o
OBJ_AGGREGARTP = aggregartp.o util.o
OBJ_DESAGGREGARTP = desaggregartp.o util.o
OBJ_REORDERTP = reordertp.o util.o
OBJ_OFFSETS = offsets.o util.o
PREFIX ?= /usr/local
BIN = $(DESTDIR)/$(PREFIX)/bin
MAN = $(DESTDIR)/$(PREFIX)/share/man/man1
all: multicat ingests aggregartp desaggregartp offsets
all: multicat ingests aggregartp reordertp offsets
$(OBJ_MULTICAT): Makefile util.h
$(OBJ_INGESTS): Makefile util.h
$(OBJ_AGGREGARTP): Makefile util.h
$(OBJ_DESAGGREGARTP): Makefile util.h
$(OBJ_REORDERTP): Makefile util.h
$(OBJ_OFFSETS): Makefile util.h
multicat: $(OBJ_MULTICAT)
......@@ -31,21 +31,21 @@ ingests: $(OBJ_INGESTS)
aggregartp: $(OBJ_AGGREGARTP)
$(CC) -o $@ $(OBJ_AGGREGARTP) $(LDLIBS)
desaggregartp: $(OBJ_DESAGGREGARTP)
$(CC) -o $@ $(OBJ_DESAGGREGARTP) $(LDLIBS)
reordertp: $(OBJ_REORDERTP)
$(CC) -o $@ $(OBJ_REORDERTP) $(LDLIBS)
offsets: $(OBJ_OFFSETS)
$(CC) -o $@ $(OBJ_OFFSETS) $(LDLIBS)
clean:
-rm -f multicat $(OBJ_MULTICAT) ingests $(OBJ_INGESTS) aggregartp $(OBJ_AGGREGARTP) desaggregartp $(OBJ_DESAGGREGARTP) offsets $(OBJ_OFFSETS)
-rm -f multicat $(OBJ_MULTICAT) ingests $(OBJ_INGESTS) aggregartp $(OBJ_AGGREGARTP) reordertp $(OBJ_REORDERTP) offsets $(OBJ_OFFSETS)
install: all
@install -d $(BIN)
@install -d $(MAN)
@install multicat ingests aggregartp desaggregartp offsets $(BIN)
@install multicat.1 ingests.1 aggregartp.1 desaggregartp.1 offsets.1 $(MAN)
@install multicat ingests aggregartp reordertp offsets $(BIN)
@install multicat.1 ingests.1 aggregartp.1 reordertp.1 offsets.1 $(MAN)
uninstall:
@rm $(BIN)/multicat $(BIN)/ingests $(BIN)/aggregartp $(BIN)/desaggregartp $(BIN)/offsets
@rm $(MAN)/multicat.1 $(MAN)/ingests.1 $(MAN)/aggregartp.1 $(MAN)/desaggregartp.1 $(MAN)/offsets.1
@rm $(BIN)/multicat $(BIN)/ingests $(BIN)/aggregartp $(BIN)/reordertp $(BIN)/offsets
@rm $(MAN)/multicat.1 $(MAN)/ingests.1 $(MAN)/aggregartp.1 $(MAN)/reordertp.1 $(MAN)/offsets.1
......@@ -4,8 +4,16 @@ Changes between 1.0 and 2.0:
----------------------------
* Support for directory input/output, where the stream is stored into
"chunks" of fixed duration, accompanied with their auxiliary file
* Support for TCP sockets in multicat, aggregartp and reordertp (/tcp)
* Merge OffseTS functionality into multicat itself (-k and -d)
* Support for IPv6
* Add DVBlast-style options for host parsing (ifindex, ifaddr, ttl and tos)
* Add the ability to output a stream as fast as possible (-f)
* Renamed desaggregartp to reordeRTP
* Support for packet retransmission between aggregaRTP and reordeRTP
* Use biTStream instead of internal parsing functions
* Removed limits on number of inputs/outputs in aggregaRTP and reordeRTP
* Finer jitter control in reordeRTP
* Fix timestamping error in ingesTS with streams where the first packet
doesn't hold a PCR (re-ingest all existing streams)
* Fix miscellaneous bugs
......@@ -24,10 +24,11 @@ necessary for multicat.
OffseTS is another companion application to manipulate auxiliary files.
Given an offset in time from the beginning of the file, it returns the offset
of the position in number of packets.
of the position in number of packets. It is currently deprecated in favour of
using the -k and -d options of multicat.
Finally aggregaRTP and desaggregaRTP can be used to carry a high-bitrate
signal over several contribution links.
Finally aggregaRTP and reordeRTP can be used to carry a high-bitrate
signal over one or several contribution links.
The multicat suite of applications is very lightweight and designed to
operate in tight environments. Memory and CPU usages are kept to a minimum,
......@@ -63,6 +64,7 @@ Options include:
/ifaddr=XXX.XXX.XXX.XXX (binds to a specific network interface, by address)
/ttl=XX (time-to-live of the UDP packet)
/tos=XX (sets the IPv4 Type Of Service option)
/tcp (binds a TCP socket instead of UDP)
Example:
239.255.0.1:5004/ttl=64
......@@ -178,8 +180,8 @@ the multicat program. OffseTS is still distributed for compatibility, but
doesn't support the new directory input.
Using AggregaRTP and DesaggregaRTP
==================================
Using AggregaRTP and ReordeRTP
==============================
Splitting an RTP stream to two streams with different routing policies:
......@@ -187,4 +189,16 @@ aggregartp @239.255.0.1:5004 239.1.0.1:5004@192.168.0.1 239.2.0.1:5004@172.16.0.
At the other end, reassembling the two streams into one usable stream:
desaggregartp 192.168.0.1@239.1.0.1:5004 172.16.0.1@239.2.0.1:5004 239.254.0.1:5004
reordertp 192.168.0.1@239.1.0.1:5004 172.16.0.1@239.2.0.1:5004 239.254.0.1:5004
Transmit a signal over a lossy link:
aggregartp @239.255.0.1:5004 239.1.0.1:5004 -X @:5006
reordertp @239.1.0.1:5004 239.254.0.1:5004 -X 192.168.0.1:5006
The same, but using inverted TCP for retransmission (for NAT traversal for
instance):
reordertp @239.1.0.1:5004 239.254.0.1:5004 -X @:5006/tcp
aggregartp @239.255.0.1:5004 239.1.0.1:5004 -X 192.168.0.2:5006/tcp
(with TCP the listener must be started before the other)
/*****************************************************************************
* aggregartp.c: split an RTP stream for several contribution links
*****************************************************************************
* Copyright (C) 2009 VideoLAN
* Copyright (C) 2009, 2011 VideoLAN
* $Id: aggregartp.c 48 2007-11-30 14:08:21Z cmassiot $
*
* Authors: Christophe Massiot <massiot@via.ecp.fr>
......@@ -21,6 +21,9 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
/* POLLRDHUP */
#define _GNU_SOURCE 1
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
......@@ -32,15 +35,25 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <poll.h>
#include <bitstream/ietf/rtp.h>
#include "util.h"
#define MAX_OUTPUTS 4
#define DEFAULT_MTU 1500
#define DEFAULT_RETX_BUFFER 500 /* ms */
/*****************************************************************************
* Local declarations
*****************************************************************************/
typedef struct block_t
{
uint8_t *p_data;
unsigned int i_size;
uint64_t i_date;
struct block_t *p_next;
} block_t;
typedef struct output_t
{
int i_fd;
......@@ -49,21 +62,37 @@ typedef struct output_t
unsigned int i_weighted_size, i_remainder;
} output_t;
static size_t i_asked_payload_size = DEFAULT_PAYLOAD_SIZE;
static size_t i_rtp_header_size = RTP_HEADER_SIZE;
static int i_input_fd;
static output_t p_outputs[MAX_OUTPUTS];
static bool b_input_tcp;
static block_t *p_input_block = NULL;
static output_t *p_outputs = NULL;
static int i_nb_outputs = 0;
static unsigned int i_max_weight = 0;
static bool b_overwrite_timestamps = false;
static bool b_overwrite_ssrc = false;
static in_addr_t i_ssrc = 0;
static uint16_t i_rtp_cc = 0;
static uint16_t i_rtp_seqnum = 0;
static int i_retx_fd = -1;
static bool b_retx_tcp;
static block_t *p_retx_block = NULL;
static block_t *p_retx_first = NULL, *p_retx_last = NULL;
static uint64_t i_retx_buffer = DEFAULT_RETX_BUFFER * 27000;
static void usage(void)
{
msg_Raw( NULL, "Usage: aggregartp [-i <RT priority>] [-t <ttl>] [-w] [-o <SSRC IP>] [-U] [-m <mtu>] @<src host> <dest host 1>[,<weight 1>] ... [<dest host N>,<weight N>]" );
msg_Raw( NULL, "Usage: aggregartp [-i <RT priority>] [-t <ttl>] [-w] [-o <SSRC IP>] [-U] [-x <retx buffer>] [-X <retx URL>] [-m <payload size>] [-R <RTP header>] @<src host> <dest host 1>[,<weight 1>] ... [<dest host N>,<weight N>]" );
msg_Raw( NULL, " host format: [<connect addr>[:<connect port>]][@[<bind addr][:<bind port>]]" );
msg_Raw( NULL, " -w: overwrite RTP timestamps" );
msg_Raw( NULL, " -o: overwrite RTP SSRC" );
msg_Raw( NULL, " -U: prepend RTP header" );
msg_Raw( NULL, " -x: length of the buffer for retransmission requests in ms [default 500]" );
msg_Raw( NULL, " -X: retransmission service @host:port[/tcp]" );
msg_Raw( NULL, " -m: size of the payload chunk, excluding optional RTP header (default 1316)" );
msg_Raw( NULL, " -R: size of the optional RTP header (default 12)" );
exit(EXIT_FAILURE);
}
......@@ -75,7 +104,7 @@ static output_t *NextOutput(void)
unsigned int i_min_size = p_outputs[0].i_weighted_size;
int i, i_output = 0;
for ( i = 1; i < MAX_OUTPUTS && p_outputs[i].i_weight; i++ )
for ( i = 1; i < i_nb_outputs && p_outputs[i].i_weight; i++ )
{
if ( p_outputs[i].i_weighted_size < i_min_size )
{
......@@ -84,25 +113,137 @@ static output_t *NextOutput(void)
}
}
for ( i = 0; i < MAX_OUTPUTS && p_outputs[i].i_weight; i++ )
for ( i = 0; i < i_nb_outputs && p_outputs[i].i_weight; i++ )
p_outputs[i].i_weighted_size -= i_min_size;
return &p_outputs[i_output];
}
/*****************************************************************************
* SendBlock: send a block to a file descriptor
*****************************************************************************/
static void SendBlock( int i_fd, struct sockaddr_storage *p_sout,
socklen_t i_len, block_t *p_block )
{
if ( sendto( i_fd, p_block->p_data, p_block->i_size, 0,
(struct sockaddr *)p_sout, i_len ) < 0 )
{
if ( errno == EBADF || errno == ECONNRESET || errno == EPIPE )
{
msg_Err( NULL, "write error (%s)", strerror(errno) );
exit(EXIT_FAILURE);
}
else
/* otherwise do not die because these errors can be transient */
msg_Warn( NULL, "write error (%s)", strerror(errno) );
}
}
/*****************************************************************************
* RetxQueue: store a packet in the retx queue
*****************************************************************************/
static void RetxQueue( block_t *p_block, uint64_t i_current_date )
{
p_block->i_date = i_current_date;
/* Queue block */
if ( p_retx_last != NULL )
{
p_retx_last->p_next = p_block;
p_retx_last = p_block;
}
else
p_retx_last = p_retx_first = p_block;
/* Purge old blocks */
while ( p_retx_first != NULL &&
p_retx_first->i_date < i_current_date - i_retx_buffer )
{
block_t *p_next = p_retx_first->p_next;
free(p_retx_first);
p_retx_first = p_next;
}
if ( p_retx_first == NULL )
p_retx_last = NULL;
}
/*****************************************************************************
* RetxHandle: handle a retx query
*****************************************************************************/
static void RetxHandle(void)
{
ssize_t i_size = RETX_HEADER_SIZE - p_retx_block->i_size;
uint8_t *p_buffer = p_retx_block->p_data + p_retx_block->i_size;
struct sockaddr_storage sout;
socklen_t i_len = sizeof(sout);
i_size = recvfrom( i_retx_fd, p_buffer, i_size, 0,
(struct sockaddr *)&sout, &i_len );
if ( i_size < 0 && errno != EAGAIN && errno != EINTR )
{
msg_Err( NULL, "unrecoverable read error, dying (%s)",
strerror(errno) );
exit(EXIT_FAILURE);
}
if ( i_size <= 0 ) return;
p_retx_block->i_size += i_size;
if ( p_retx_block->i_size != RETX_HEADER_SIZE )
{
if ( b_retx_tcp ) return;
msg_Err( NULL, "invalid retx packet received, dying" );
exit(EXIT_FAILURE);
}
if ( !retx_check(p_retx_block->p_data) )
{
msg_Err( NULL, "invalid retx packet, dying" );
exit(EXIT_FAILURE);
}
uint16_t i_seqnum = retx_get_seqnum(p_retx_block->p_data);
uint16_t i_num = retx_get_num(p_retx_block->p_data);
block_t *p_block = p_retx_first;
p_retx_block->i_size = 0;
while ( p_block != NULL )
{
if ( rtp_get_seqnum(p_block->p_data) == i_seqnum )
break;
p_block = p_block->p_next;
}
if ( p_block == NULL )
{
msg_Warn( NULL, "unable to find packet %hu for retx", i_seqnum );
return;
}
while ( i_num && p_block != NULL )
{
SendBlock( i_retx_fd, i_len ? &sout : NULL, i_len, p_block );
p_block = p_block->p_next;
i_num--;
}
if ( i_num )
msg_Warn( NULL, "unable to find %hu packets after %hu", i_num,
i_seqnum );
}
/*****************************************************************************
* Entry point
*****************************************************************************/
int main( int i_argc, char **pp_argv )
{
int i, c;
int c;
int i_priority = -1;
int i_ttl = 0;
bool b_udp = false;
int i_mtu = DEFAULT_MTU;
uint8_t *p_buffer, *p_read_buffer;
struct pollfd pfd[2];
while ( (c = getopt( i_argc, pp_argv, "i:t:wo:Um:h" )) != -1 )
while ( (c = getopt( i_argc, pp_argv, "i:t:wo:X:Um:R:h" )) != -1 )
{
switch ( c )
{
......@@ -128,12 +269,31 @@ int main( int i_argc, char **pp_argv )
break;
}
case 'X':
i_retx_fd = OpenSocket( optarg, 0, 0, 0, NULL, &b_retx_tcp );
if ( i_retx_fd == -1 )
{
msg_Err( NULL, "unable to set up retx with %s\n", optarg );
exit(EXIT_FAILURE);
}
pfd[1].fd = i_retx_fd;
pfd[1].events = POLLIN | POLLERR | POLLRDHUP | POLLHUP;
p_retx_block = malloc( sizeof(block_t) + RETX_HEADER_SIZE );
p_retx_block->p_data = (uint8_t *)p_retx_block + sizeof(block_t);
p_retx_block->i_size = 0;
break;
case 'U':
b_udp = true;
break;
case 'm':
i_mtu = strtol( optarg, NULL, 0 );
i_asked_payload_size = strtol( optarg, NULL, 0 );
break;
case 'R':
i_rtp_header_size = strtol( optarg, NULL, 0 );
break;
case 'h':
......@@ -145,34 +305,36 @@ int main( int i_argc, char **pp_argv )
if ( optind >= i_argc - 1 )
usage();
i_input_fd = OpenSocket( pp_argv[optind], 0, NULL );
optind++;
i = 0;
while ( optind < i_argc && i < MAX_OUTPUTS )
i_input_fd = OpenSocket( pp_argv[optind], 0, DEFAULT_PORT, 0, NULL,
&b_input_tcp );
if ( i_input_fd == -1 )
{
p_outputs[i].i_fd = OpenSocket( pp_argv[optind++], i_ttl,
&p_outputs[i].i_weight );
p_outputs[i].i_weighted_size = p_outputs[i].i_remainder = 0;
i_max_weight += p_outputs[i].i_weight;
i++;
}
if ( optind < i_argc )
{
msg_Err( NULL, "max number of outputs: %d (recompile)", MAX_OUTPUTS );
msg_Err( NULL, "unable to open input socket" );
exit(EXIT_FAILURE);
}
msg_Dbg( NULL, "%d outputs weight %u", i, i_max_weight );
for ( ; i < MAX_OUTPUTS; i++ )
p_outputs[i].i_weight = 0;
if ( b_udp )
optind++;
pfd[0].fd = i_input_fd;
pfd[0].events = POLLIN | POLLERR | POLLRDHUP | POLLHUP;
while ( optind < i_argc )
{
p_buffer = malloc( i_mtu + RTP_HEADER_SIZE );
p_read_buffer = p_buffer + RTP_HEADER_SIZE;
p_outputs = realloc( p_outputs, ++i_nb_outputs * sizeof(output_t) );
p_outputs[i_nb_outputs - 1].i_fd =
OpenSocket( pp_argv[optind++], i_ttl, 0, DEFAULT_PORT,
&p_outputs[i_nb_outputs - 1].i_weight, NULL );
if ( p_outputs[i_nb_outputs - 1].i_fd == -1 )
{
msg_Err( NULL, "unable to open output socket" );
exit(EXIT_FAILURE);
}
p_outputs[i_nb_outputs - 1].i_weighted_size =
p_outputs[i_nb_outputs - 1].i_remainder = 0;
i_max_weight += p_outputs[i_nb_outputs - 1].i_weight;
}
else
p_buffer = p_read_buffer = malloc( i_mtu );
msg_Dbg( NULL, "%d outputs weight %u%s", i_nb_outputs, i_max_weight,
i_retx_fd != -1 ? ", with retx" : "" );
if ( i_priority > 0 )
{
......@@ -191,43 +353,115 @@ int main( int i_argc, char **pp_argv )
for ( ; ; )
{
ssize_t i_size = read( i_input_fd, p_read_buffer, i_mtu );
output_t *p_output;
if ( i_size < 0 && errno != EAGAIN && errno != EINTR )
uint64_t i_current_date;
if ( poll( pfd, i_retx_fd == -1 ? 1 : 2, -1 ) < 0 )
{
msg_Err( NULL, "unrecoverable read error, dying (%s)",
strerror(errno) );
int saved_errno = errno;
msg_Warn( NULL, "couldn't poll(): %s", strerror(errno) );
if ( saved_errno == EINTR ) continue;
exit(EXIT_FAILURE);
}
if ( i_size <= 0 ) continue;
i_current_date = wall_Date();
if ( b_udp )
if ( (pfd[0].revents & (POLLERR | POLLRDHUP | POLLHUP)) ||
(i_retx_fd != -1 &&
(pfd[1].revents & (POLLERR | POLLRDHUP | POLLHUP))))
{
rtp_SetHdr( p_buffer, i_rtp_cc );
i_rtp_cc++;
i_size += RTP_HEADER_SIZE;
rtp_SetSSRC( p_buffer, (uint8_t *)&i_ssrc );
/* this isn't RFC-compliant, but we assume that at the other end,
* the RTP header will be stripped */
rtp_SetTimestamp( p_buffer, wall_Date() / 300 );
msg_Err( NULL, "poll error\n" );
exit(EXIT_FAILURE);
}
else
if ( pfd[0].revents & POLLIN )
{
if ( b_overwrite_ssrc )
rtp_SetSSRC( p_buffer, (uint8_t *)&i_ssrc );
if ( b_overwrite_timestamps )
rtp_SetTimestamp( p_buffer, wall_Date() / 300 );
}
/* Read input block */
ssize_t i_size, i_wanted_size;
uint8_t *p_read_buffer;
p_output = NextOutput();
if ( write( p_output->i_fd, p_buffer, i_size ) < 0 )
msg_Warn( NULL, "write error (%s)", strerror(errno) );
if ( b_udp )
i_wanted_size = i_asked_payload_size + RTP_HEADER_SIZE;
else
i_wanted_size = i_asked_payload_size + i_rtp_header_size;
if ( p_input_block == NULL )
{
if ( b_udp )
{
p_input_block = malloc( sizeof(block_t) +
i_asked_payload_size +
RTP_HEADER_SIZE );
p_input_block->i_size = RTP_HEADER_SIZE;
}
else
{
p_input_block = malloc( sizeof(block_t) +
i_asked_payload_size +
i_rtp_header_size );
p_input_block->p_data = (uint8_t *)p_input_block +
sizeof(block_t);
p_input_block->i_size = 0;
}
p_input_block->p_data = (uint8_t *)p_input_block +
sizeof(block_t);
}
p_read_buffer = p_input_block->p_data + p_input_block->i_size;
i_wanted_size -= p_input_block->i_size;
i_size = read( i_input_fd, p_read_buffer, i_wanted_size );
if ( i_size < 0 && errno != EAGAIN && errno != EINTR )
{
msg_Err( NULL, "unrecoverable read error, dying (%s)",
strerror(errno) );
exit(EXIT_FAILURE);
}
if ( i_size <= 0 ) continue;
p_input_block->i_size += i_size;
if ( b_input_tcp && i_size != i_wanted_size )
continue;
if ( b_udp )
{
rtp_set_hdr( p_input_block->p_data );
rtp_set_type( p_input_block->p_data, RTP_TYPE_TS );
rtp_set_seqnum( p_input_block->p_data, i_rtp_seqnum );
i_rtp_seqnum++;
rtp_set_ssrc( p_input_block->p_data, (uint8_t *)&i_ssrc );
/* this isn't RFC-compliant, but we assume that at the other
* end, the RTP header will be stripped */
rtp_set_timestamp( p_input_block->p_data,
i_current_date / 300 );
}
else
{
if ( b_overwrite_ssrc )
rtp_set_ssrc( p_input_block->p_data,
(uint8_t *)&i_ssrc );
if ( b_overwrite_timestamps )
rtp_set_timestamp( p_input_block->p_data,
i_current_date / 300 );
}
/* Output block */
output_t *p_output = NextOutput();
SendBlock( p_output->i_fd, NULL, 0, p_input_block );
p_output->i_weighted_size += (i_size + p_output->i_remainder)
/ p_output->i_weight;
p_output->i_remainder = (i_size + p_output->i_remainder)
% p_output->i_weight;
if ( i_retx_fd != -1 )
RetxQueue( p_input_block, i_current_date );
else
free( p_input_block );
p_input_block = NULL;
}
p_output->i_weighted_size += (i_size + p_output->i_remainder)
/ p_output->i_weight;
p_output->i_remainder = (i_size + p_output->i_remainder)
% p_output->i_weight;
if ( i_retx_fd != -1 && (pfd[1].revents & POLLIN) )
RetxHandle();
}
return EXIT_SUCCESS;
......
/*****************************************************************************
* ingests.c: create the aux file for a transport stream file
*****************************************************************************
* Copyright (C) 2009 VideoLAN
* Copyright (C) 2009, 2011 VideoLAN
* $Id: ingests.c 52 2009-10-06 16:48:00Z cmassiot $
*
* Authors: Christophe Massiot <massiot@via.ecp.fr>
......@@ -31,6 +31,8 @@
#include <string.h>
#include <errno.h>
#include <bitstream/mpeg/ts.h>
#include "util.h"
/*****************************************************************************
......@@ -119,7 +121,7 @@ static void OutputFirst(void)
{
i_last_nb_payloads = (i_ts_since_output + i_ts_in_payload - 1)
/ i_ts_in_payload;
i_ts_since_output -= i_last_nb_payloads;
i_ts_since_output -= i_last_nb_payloads * i_ts_in_payload;
}
/*****************************************************************************
......@@ -138,9 +140,9 @@ static void OutputLast(void)
*****************************************************************************/
static void TSHandle( uint8_t *p_ts )
{
uint16_t i_pid = ts_GetPID( p_ts );
uint16_t i_pid = ts_get_pid( p_ts );
if ( !ts_CheckSync( p_ts ) )
if ( !ts_validate( p_ts ) )
{
msg_Err( NULL, "lost TS synchro, go and fix your file (pos=%llu)",
(uint64_t)i_ts_read * TS_SIZE );
......@@ -149,9 +151,11 @@ static void TSHandle( uint8_t *p_ts )
i_ts_since_output++;
if ( (i_pid == i_pcr_pid || i_pcr_pid == 8192) && ts_HasPCR( p_ts ) )
if ( (i_pid == i_pcr_pid || i_pcr_pid == 8192)
&& ts_has_adaptation(p_ts) && ts_get_adaptation(p_ts)
&& tsaf_has_pcr(p_ts) )
{
uint64_t i_pcr = ts_GetPCR( p_ts ) * 300 + ts_GetPCRExt( p_ts );
uint64_t i_pcr = tsaf_get_pcr( p_ts ) * 300 + tsaf_get_pcrext( p_ts );
if ( i_last_pcr == POW2_33 * 300 ) /* init */
{
......
/*****************************************************************************
* multicat.c: netcat-equivalent for multicast
*****************************************************************************
* Copyright (C) 2009 VideoLAN
* Copyright (C) 2009, 2011 VideoLAN
* $Id: multicat.c 48 2007-11-30 14:08:21Z cmassiot $
*
* Authors: Christophe Massiot <massiot@via.ecp.fr>
......@@ -21,6 +21,9 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
/* POLLRDHUP */
#define _GNU_SOURCE 1
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
......@@ -38,11 +41,12 @@
#include <pthread.h>
#include <poll.h>
#include <bitstream/ietf/rtp.h>
#include <bitstream/mpeg/ts.h>
#include "util.h"
#define POLL_TIMEOUT 1000 /* 1 s */
#define RTP_HEADER_MAX_SIZE (RTP_HEADER_SIZE + 15 * 4)
#define RTP_TS_TYPE 33
/*****************************************************************************
* Local declarations
......@@ -56,10 +60,11 @@ static bool b_overwrite_ssrc = false;
static in_addr_t i_ssrc = 0;
static bool b_input_udp = false, b_output_udp = false;
static size_t i_asked_payload_size = DEFAULT_PAYLOAD_SIZE;
static size_t i_rtp_header_size = RTP_HEADER_SIZE;
static uint64_t i_rotate_size = DEFAULT_ROTATE_SIZE;
static volatile sig_atomic_t b_die = 0;
static uint16_t i_rtp_cc;
static uint16_t i_rtp_seqnum;
static uint64_t i_stc = 0; /* system time clock, used for date calculations */
static uint64_t i_first_stc = 0;
static uint64_t i_pcr = 0, i_pcr_stc = 0; /* for RTP/TS output */
......@@ -73,7 +78,7 @@ void (*pf_ExitWrite)(void);
static void usage(void)
{
msg_Raw( NULL, "Usage: multicat [-i <RT priority>] [-t <ttl>] [-X] [-f] [-p <PCR PID>] [-s <chunks>] [-n <chunks>] [-k <start time>] [-d <duration>] [-a] [-r <file duration>] [-S <SSRC IP>] [-u] [-U] [-m <payload size>] <input item> <output item>" );
msg_Raw( NULL, "Usage: multicat [-i <RT priority>] [-t <ttl>] [-X] [-f] [-p <PCR PID>] [-s <chunks>] [-n <chunks>] [-k <start time>] [-d <duration>] [-a] [-r <file duration>] [-S <SSRC IP>] [-u] [-U] [-m <payload size>] [-R <RTP header size>] <input item> <output item>" );
msg_Raw( NULL, " item format: <file path | device path | FIFO path | directory path | network host>" );
msg_Raw( NULL, " host format: [<connect addr>[:<connect port>]][@[<bind addr][:<bind port>]]" );
msg_Raw( NULL, " -X: also pass-through all packets to stdout" );
......@@ -89,6 +94,7 @@ static void usage(void)
msg_Raw( NULL, " -u: source has no RTP header" );
msg_Raw( NULL, " -U: destination has no RTP header" );
msg_Raw( NULL, " -m: size of the payload chunk, excluding optional RTP header (default 1316)" );
msg_Raw( NULL, " -R: size of the optional RTP header (default 12)" );
exit(EXIT_FAILURE);
}
......@@ -109,7 +115,7 @@ static bool Poll(void)
int i_ret;
pfd.fd = i_input_fd;
pfd.events = POLLIN;
pfd.events = POLLIN | POLLERR | POLLRDHUP | POLLHUP;
i_ret = poll( &pfd, 1, POLL_TIMEOUT );
if ( i_ret < 0 )
......@@ -118,15 +124,56 @@ static bool Poll(void)
b_die = 1;
return false;
}
if ( pfd.revents & (POLLERR | POLLRDHUP | POLLHUP) )
{
msg_Err( NULL, "poll error" );
b_die = 1;
return false;
}
if ( !i_ret ) return false;
return true;
}
/*****************************************************************************
* tcp_*: TCP socket handlers (only what differs from UDP)
*****************************************************************************/
static uint8_t *p_tcp_buffer = NULL;
static size_t i_tcp_size = 0;
static ssize_t tcp_Read( void *p_buf, size_t i_len )
{
if ( p_tcp_buffer == NULL )
p_tcp_buffer = malloc(i_len);
uint8_t *p_read_buffer;
ssize_t i_read_size = i_len;
p_read_buffer = p_tcp_buffer + i_tcp_size;
i_read_size -= i_tcp_size;
if ( (i_read_size = recv( i_input_fd, p_read_buffer, i_read_size, 0 )) < 0 )
{
msg_Err( NULL, "recv error (%s)", strerror(errno) );
b_die = 1;
return 0;
}
i_tcp_size += i_read_size;
i_stc = pf_Date();
if ( i_tcp_size != i_len )