multicat.c 38.3 KB
Newer Older
1 2 3
/*****************************************************************************
 * multicat.c: netcat-equivalent for multicast
 *****************************************************************************
4
 * Copyright (C) 2009, 2011-2012, 2015-2017 VideoLAN
5
 * $Id$
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 *
 * Authors: Christophe Massiot <massiot@via.ecp.fr>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
 *****************************************************************************/

24 25 26
/* POLLRDHUP */
#define _GNU_SOURCE 1

27 28
#include <stdlib.h>
#include <stdio.h>
29 30
#include <sys/types.h>
#include <sys/stat.h>
31
#include <fcntl.h>
32 33 34
#include <unistd.h>
#include <stdint.h>
#include <stdbool.h>
35
#include <inttypes.h>
36 37 38 39 40 41 42
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
43
#include <poll.h>
44
#include <sys/ioctl.h>
45
#include <syslog.h>
46
#include <sys/uio.h>
47

48 49 50 51 52 53 54
#ifdef SIOCGSTAMPNS
#   define HAVE_TIMESTAMPS
#endif

#ifndef POLLRDHUP
#   define POLLRDHUP 0
#endif
55

56 57
#include <bitstream/ietf/rtp.h>
#include <bitstream/mpeg/ts.h>
58
#include <bitstream/mpeg/pes.h>
59

60 61
#include "util.h"

62
#undef DEBUG_WRITEBACK
63
#define POLL_TIMEOUT 1000 /* 1 s */
64
#define MAX_LATENESS INT64_C(27000000) /* 1 s */
65
#define FILE_FLUSH INT64_C(2700000) /* 100 ms */
66
#define MAX_PIDS 8192
67 68 69
#define POW2_33 UINT64_C(8589934592)
#define TS_CLOCK_MAX (POW2_33 * 27000000 / 90000)
#define MAX_PCR_INTERVAL (27000000 / 2)
70 71 72 73 74 75

/*****************************************************************************
 * Local declarations
 *****************************************************************************/
static int i_input_fd, i_output_fd;
FILE *p_input_aux, *p_output_aux;
76 77
static int i_ttl = 0;
static bool b_sleep = true;
78 79 80 81 82
static uint16_t i_pcr_pid = 0;
static bool b_overwrite_ssrc = false;
static in_addr_t i_ssrc = 0;
static bool b_input_udp = false, b_output_udp = false;
static size_t i_asked_payload_size = DEFAULT_PAYLOAD_SIZE;
83
static size_t i_rtp_header_size = RTP_HEADER_SIZE;
84
static uint64_t i_rotate_size = DEFAULT_ROTATE_SIZE;
85
static uint64_t i_rotate_offset = DEFAULT_ROTATE_OFFSET;
86
static uint64_t i_duration = 0;
87 88 89
static struct udprawpkt pktheader;
static bool b_raw_packets = false;
static uint8_t *pi_pid_cc_table = NULL;
90 91 92 93
/* PCR/PTS/DTS restamping */
static uint64_t i_last_pcr_date;
static uint64_t i_last_pcr = TS_CLOCK_MAX;
static uint64_t i_pcr_offset;
94

95
static volatile sig_atomic_t b_die = 0, b_error = 0;
96
static uint16_t i_rtp_seqnum;
97
static uint64_t i_stc = 0; /* system time clock, used for date calculations */
98
static uint64_t i_first_stc = 0;
99
static uint64_t i_pcr = 0, i_pcr_stc = 0; /* for RTP/TS output */
100 101 102 103 104 105 106
static uint64_t (*pf_Date)(void) = wall_Date;
static void (*pf_Sleep)( uint64_t ) = wall_Sleep;
static ssize_t (*pf_Read)( void *p_buf, size_t i_len );
static bool (*pf_Delay)(void) = NULL;
static void (*pf_ExitRead)(void);
static ssize_t (*pf_Write)( const void *p_buf, size_t i_len );
static void (*pf_ExitWrite)(void);
107 108 109

static void usage(void)
{
110
    msg_Raw( NULL, "Usage: multicat [-i <RT priority>] [-l <syslogtag>] [-t <ttl>] [-X] [-T <file name>] [-f] [-p <PCR PID>] [-C] [-P] [-s <chunks>] [-n <chunks>] [-k <start time>] [-d <duration>] [-a] [-r <file duration>] [-O <rotate offset>] [-S <SSRC IP>] [-u] [-U] [-m <payload size>] [-R <RTP header size>] [-w] <input item> <output item>" );
111
    msg_Raw( NULL, "    item format: <file path | device path | FIFO path | directory path | network host>" );
112
    msg_Raw( NULL, "    host format: [<connect addr>[:<connect port>]][@[<bind addr][:<bind port>]]" );
113
    msg_Raw( NULL, "    -X: also pass-through all packets to stdout" );
114
    msg_Raw( NULL, "    -T: write an XML file with the current characteristics of transmission" );
115
    msg_Raw( NULL, "    -f: output packets as fast as possible" );
116
    msg_Raw( NULL, "    -p: overwrite or create RTP timestamps using PCR PID (MPEG-2/TS)" );
117
    msg_Raw( NULL, "    -C: rewrite continuity counters to be continuous" );
118
    msg_Raw( NULL, "    -P: restamp PCRs, DTSs, and PTSs" );
119 120 121
    msg_Raw( NULL, "    -s: skip the first N chunks of payload [deprecated]" );
    msg_Raw( NULL, "    -n: exit after playing N chunks of payload [deprecated]" );
    msg_Raw( NULL, "    -k: start at the given position (in 27 MHz units, negative = from the end)" );
122 123
    msg_Raw( NULL, "    -d: exit after definite time (in 27 MHz units)" );
    msg_Raw( NULL, "    -a: append to existing destination file (risky)" );
124
    msg_Raw( NULL, "    -r: in directory mode, rotate file after this duration (default: 97200000000 ticks = 1 hour)" );
125
    msg_Raw( NULL, "    -O: in directory mode, rotate file after duration + this offset (default: 0 tick = calendar hour)" );
126 127 128 129
    msg_Raw( NULL, "    -S: overwrite or create RTP SSRC" );
    msg_Raw( NULL, "    -u: source has no RTP header" );
    msg_Raw( NULL, "    -U: destination has no RTP header" );
    msg_Raw( NULL, "    -m: size of the payload chunk, excluding optional RTP header (default 1316)" );
130
    msg_Raw( NULL, "    -R: size of the optional RTP header (default 12)" );
131
    msg_Raw( NULL, "    -w: send with RAW (needed for /srcaddr)" );
132 133 134 135 136 137 138 139
    exit(EXIT_FAILURE);
}

/*****************************************************************************
 * Signal Handler
 *****************************************************************************/
static void SigHandler( int i_signal )
{
140
    b_die = b_error = 1;
141 142 143
}

/*****************************************************************************
144
 * Poll: factorize polling code
145
 *****************************************************************************/
146
static bool Poll(void)
147
{
148 149 150 151
    struct pollfd pfd;
    int i_ret;

    pfd.fd = i_input_fd;
152
    pfd.events = POLLIN | POLLERR | POLLRDHUP | POLLHUP;
153 154 155 156 157

    i_ret = poll( &pfd, 1, POLL_TIMEOUT );
    if ( i_ret < 0 )
    {
        msg_Err( NULL, "poll error (%s)", strerror(errno) );
158
        b_die = b_error = 1;
159 160
        return false;
    }
161 162 163
    if ( pfd.revents & (POLLERR | POLLRDHUP | POLLHUP) )
    {
        msg_Err( NULL, "poll error" );
164
        b_die = b_error = 1;
165 166
        return false;
    }
167 168 169
    if ( !i_ret ) return false;

    return true;
170 171
}

172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
/*****************************************************************************
 * tcp_*: TCP socket handlers (only what differs from UDP)
 *****************************************************************************/
static uint8_t *p_tcp_buffer = NULL;
static size_t i_tcp_size = 0;

static ssize_t tcp_Read( void *p_buf, size_t i_len )
{
    if ( p_tcp_buffer == NULL )
        p_tcp_buffer = malloc(i_len);

    uint8_t *p_read_buffer;
    ssize_t i_read_size = i_len;
    p_read_buffer = p_tcp_buffer + i_tcp_size;
    i_read_size -= i_tcp_size;

    if ( (i_read_size = recv( i_input_fd, p_read_buffer, i_read_size, 0 )) < 0 )
    {
        msg_Err( NULL, "recv error (%s)", strerror(errno) );
191
        b_die = b_error = 1;
192 193 194 195 196 197 198 199 200 201 202 203 204 205
        return 0;
    }

    i_tcp_size += i_read_size;
    i_stc = pf_Date();

    if ( i_tcp_size != i_len )
        return 0;

    memcpy( p_buf, p_tcp_buffer, i_len );
    i_tcp_size = 0;
    return i_len;
}

206 207 208 209
/*****************************************************************************
 * udp_*: UDP socket handlers
 *****************************************************************************/
static off_t i_udp_nb_skips = 0;
210
static bool b_tcp = false;
211

212 213 214
static ssize_t udp_Read( void *p_buf, size_t i_len )
{
    ssize_t i_ret;
215 216 217 218 219 220 221 222 223
    if ( !i_udp_nb_skips && !i_first_stc )
        i_first_stc = pf_Date();

    if ( !Poll() )
    {
        i_stc = pf_Date();
        return 0;
    }

224
    if ( !b_tcp )
225
    {
226 227 228
        if ( (i_ret = recv( i_input_fd, p_buf, i_len, 0 )) < 0 )
        {
            msg_Err( NULL, "recv error (%s)", strerror(errno) );
229
            b_die = b_error = 1;
230 231 232
            return 0;
        }

233 234 235 236 237 238
#ifdef HAVE_TIMESTAMPS
        struct timespec ts;
        if ( !ioctl( i_input_fd, SIOCGSTAMPNS, &ts ) )
            i_stc = ts.tv_sec * UINT64_C(27000000) + ts.tv_nsec * 27 / 1000;
        else
#endif
239
        i_stc = pf_Date();
240
    }
241 242
    else
        i_ret = tcp_Read( p_buf, i_len );
243 244 245 246 247 248 249 250 251

    if ( i_udp_nb_skips )
    {
        i_udp_nb_skips--;
        return 0;
    }
    return i_ret;
}

252 253 254
static void udp_ExitRead(void)
{
    close( i_input_fd );
255 256
    if ( p_tcp_buffer != NULL )
        free( p_tcp_buffer );
257 258 259 260 261
}

static int udp_InitRead( const char *psz_arg, size_t i_len,
                         off_t i_nb_skipped_chunks, int64_t i_pos )
{
262
    if ( i_pos || (i_input_fd = OpenSocket( psz_arg, i_ttl, DEFAULT_PORT, 0,
263
                                            NULL, &b_tcp, NULL )) < 0 )
264 265 266 267 268 269
        return -1;

    i_udp_nb_skips = i_nb_skipped_chunks;

    pf_Read = udp_Read;
    pf_ExitRead = udp_ExitRead;
270 271 272 273
#ifdef HAVE_TIMESTAMPS
    if ( !b_tcp )
        pf_Date = real_Date;
#endif
274 275 276
    return 0;
}

277 278
static ssize_t raw_Write( const void *p_buf, size_t i_len )
{
279
#ifndef __APPLE__
280 281 282
    ssize_t i_ret;
    struct iovec iov[2];

283
    #if defined(__FreeBSD__)
284 285 286 287 288 289
    pktheader.udph.uh_ulen
    #else
    pktheader.udph.len
    #endif
    = htons(sizeof(struct udphdr) + i_len);

Christophe Massiot's avatar
Christophe Massiot committed
290 291 292 293
    #if defined(__FreeBSD__)
    pktheader.iph.ip_len = htons(sizeof(struct udprawpkt) + i_len);
    #endif

294 295 296 297 298 299 300 301 302 303 304
    iov[0].iov_base = &pktheader;
    iov[0].iov_len = sizeof(struct udprawpkt);

    iov[1].iov_base = (void *) p_buf;
    iov[1].iov_len = i_len;

    if ( (i_ret = writev( i_output_fd, iov, 2 )) < 0 )
    {
        if ( errno == EBADF || errno == ECONNRESET || errno == EPIPE )
        {
            msg_Err( NULL, "write error (%s)", strerror(errno) );
305
            b_die = b_error = 1;
306 307 308 309 310 311
        }
        /* otherwise do not set b_die because these errors can be transient */
        return 0;
    }

    return i_ret;
312 313 314
#else
    return -1;
#endif
315 316
}

317
/* Please note that the write functions also work for TCP */
318 319
static ssize_t udp_Write( const void *p_buf, size_t i_len )
{
320
    ssize_t i_ret;
321
    if ( (i_ret = send( i_output_fd, p_buf, i_len, 0 )) < 0 )
322 323 324 325
    {
        if ( errno == EBADF || errno == ECONNRESET || errno == EPIPE )
        {
            msg_Err( NULL, "write error (%s)", strerror(errno) );
326
            b_die = b_error = 1;
327 328 329 330
        }
        /* otherwise do not set b_die because these errors can be transient */
        return 0;
    }
331

332 333 334
    return i_ret;
}

335 336 337 338
static void udp_ExitWrite(void)
{
    close( i_output_fd );
}
339

340
static int udp_InitWrite( const char *psz_arg, size_t i_len, bool b_append )
341
{
342 343 344 345 346 347
    struct opensocket_opt opt;

    memset(&opt, 0, sizeof(struct opensocket_opt));
    if (b_raw_packets) {
        opt.p_raw_pktheader = &pktheader;
    }
348
    if ( (i_output_fd = OpenSocket( psz_arg, i_ttl, 0, DEFAULT_PORT,
349
                                    NULL, NULL, &opt )) < 0 )
350
        return -1;
351 352 353 354 355
    if (b_raw_packets) { 
        pf_Write = raw_Write;
    } else {
        pf_Write = udp_Write;
    }
356 357
    pf_ExitWrite = udp_ExitWrite;
    return 0;
358 359
}

360 361 362 363
/*****************************************************************************
 * stream_*: FIFO and character device handlers
 *****************************************************************************/
static off_t i_stream_nb_skips = 0;
364
static ssize_t i_buf_offset = 0;
365

366 367 368
static ssize_t stream_Read( void *p_buf, size_t i_len )
{
    ssize_t i_ret;
369 370 371 372 373 374 375 376
    if ( !i_stream_nb_skips && !i_first_stc )
        i_first_stc = pf_Date();

    if ( !Poll() )
    {
        i_stc = pf_Date();
        return 0;
    }
377

378 379
    if ( (i_ret = read( i_input_fd, p_buf + i_buf_offset,
                        i_len - i_buf_offset )) < 0 )
380 381
    {
        msg_Err( NULL, "read error (%s)", strerror(errno) );
382
        b_die = b_error = 1;
383 384 385
        return 0;
    }

386
    i_stc = pf_Date();
387 388 389 390 391 392 393
    i_buf_offset += i_ret;

    if ( i_buf_offset < i_len )
        return 0;

    i_ret = i_buf_offset;
    i_buf_offset = 0;
394 395 396 397 398 399 400 401
    if ( i_stream_nb_skips )
    {
        i_stream_nb_skips--;
        return 0;
    }
    return i_ret;
}

402 403 404 405 406 407 408 409 410 411 412
static void stream_ExitRead(void)
{
    close( i_input_fd );
}

static int stream_InitRead( const char *psz_arg, size_t i_len,
                            off_t i_nb_skipped_chunks, int64_t i_pos )
{
    if ( i_pos ) return -1;

    i_input_fd = OpenFile( psz_arg, true, false );
413
    if ( i_input_fd < 0 ) return -1;
414 415 416 417 418 419 420
    i_stream_nb_skips = i_nb_skipped_chunks;

    pf_Read = stream_Read;
    pf_ExitRead = stream_ExitRead;
    return 0;
}

421 422
static ssize_t stream_Write( const void *p_buf, size_t i_len )
{
423
    ssize_t i_ret;
424
retry:
425
    if ( (i_ret = write( i_output_fd, p_buf, i_len )) < 0 )
426
    {
427 428
        if (errno == EAGAIN || errno == EINTR)
            goto retry;
429
        msg_Err( NULL, "write error (%s)", strerror(errno) );
430
        b_die = b_error = 1;
431
    }
432 433 434
    return i_ret;
}

435
static void stream_ExitWrite(void)
436
{
437
    close( i_output_fd );
438 439
}

440
static int stream_InitWrite( const char *psz_arg, size_t i_len, bool b_append )
441
{
442
    i_output_fd = OpenFile( psz_arg, false, b_append );
443
    if ( i_output_fd < 0 ) return -1;
444 445 446 447 448

    pf_Write = stream_Write;
    pf_ExitWrite = stream_ExitWrite;
    return 0;
}
449

450 451 452
/*****************************************************************************
 * file_*: handler for the auxiliary file format
 *****************************************************************************/
453 454
static uint64_t i_file_next_flush = 0;

455 456
static ssize_t file_Read( void *p_buf, size_t i_len )
{
457 458 459 460 461 462
    uint8_t p_aux[8];
    ssize_t i_ret;

    if ( (i_ret = read( i_input_fd, p_buf, i_len )) < 0 )
    {
        msg_Err( NULL, "read error (%s)", strerror(errno) );
463
        b_die = b_error = 1;
464 465 466 467 468
        return 0;
    }
    if ( i_ret == 0 )
    {
        msg_Dbg( NULL, "end of file reached" );
469
        b_die = 1;
470 471 472 473 474 475
        return 0;
    }

    if ( fread( p_aux, 8, 1, p_input_aux ) != 1 )
    {
        msg_Warn( NULL, "premature end of aux file reached" );
476
        b_die = b_error = 1;
477 478
        return 0;
    }
479 480 481 482 483 484
    i_stc = FromSTC( p_aux );
    if ( !i_first_stc ) i_first_stc = i_stc;

    return i_ret;
}

485
static bool file_Delay(void)
486 487 488 489
{
    /* for correct throughput without rounding approximations */
    static uint64_t i_file_first_stc = 0, i_file_first_wall = 0;
    uint64_t i_wall = pf_Date();
490 491 492 493 494 495

    if ( !i_file_first_wall )
    {
        i_file_first_wall = i_wall;
        i_file_first_stc = i_stc;
    }
496 497 498 499 500 501 502 503 504 505 506 507 508 509
    else
    {
        int64_t i_delay = (i_stc - i_file_first_stc) -
                          (i_wall - i_file_first_wall);
        if ( i_delay > 0 )
            pf_Sleep( i_delay );
        else if ( i_delay < -MAX_LATENESS )
        {
            msg_Warn( NULL, "too much lateness, resetting clocks" );
            i_file_first_wall = i_wall;
            i_file_first_stc = i_stc;
        }
    }
    return true;
510
}
511

512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
static void file_ExitRead(void)
{
    close( i_input_fd );
    fclose( p_input_aux );
}

static int file_InitRead( const char *psz_arg, size_t i_len,
                          off_t i_nb_skipped_chunks, int64_t i_pos )
{
    char *psz_aux_file = GetAuxFile( psz_arg, i_len );
    if ( i_pos )
    {
        i_nb_skipped_chunks = LookupAuxFile( psz_aux_file, i_pos, false );
        if ( i_nb_skipped_chunks < 0 )
        {
            free( psz_aux_file );
            return -1;
        }
    }

    i_input_fd = OpenFile( psz_arg, true, false );
533 534 535 536 537
    if ( i_input_fd < 0 )
    {
        free(psz_aux_file);
        return -1;
    }
538 539
    p_input_aux = OpenAuxFile( psz_aux_file, true, false );
    free( psz_aux_file );
540
    if ( p_input_aux == NULL ) return -1;
541 542 543 544 545 546 547 548

    lseek( i_input_fd, (off_t)i_len * i_nb_skipped_chunks, SEEK_SET );
    fseeko( p_input_aux, 8 * i_nb_skipped_chunks, SEEK_SET );

    pf_Read = file_Read;
    pf_Delay = file_Delay;
    pf_ExitRead = file_ExitRead;
    return 0;
549 550 551 552 553 554
}

static ssize_t file_Write( const void *p_buf, size_t i_len )
{
    uint8_t p_aux[8];
    ssize_t i_ret;
555 556 557
#ifdef DEBUG_WRITEBACK
    uint64_t start = pf_Date(), end;
#endif
558 559 560

    if ( (i_ret = write( i_output_fd, p_buf, i_len )) < 0 )
    {
561
        msg_Err( NULL, "couldn't write to file (%s)", strerror(errno) );
562
        b_die = b_error = 1;
563 564
        return i_ret;
    }
565 566 567 568 569
#ifdef DEBUG_WRITEBACK
    end = pf_Date();
    if (end - start > 270000) /* 10 ms */
        msg_Err(NULL, "too long waiting in write(%"PRId64")", (end - start) / 27000);
#endif
570

571
    ToSTC( p_aux, i_stc );
572
    if ( fwrite( p_aux, 8, 1, p_output_aux ) != 1 )
573
    {
574
        msg_Err( NULL, "couldn't write to auxiliary file" );
575
        b_die = b_error = 1;
576
    }
577 578 579 580 581 582 583
    if (!i_file_next_flush)
        i_file_next_flush = i_stc + FILE_FLUSH;
    else if (i_file_next_flush <= i_stc)
    {
        fflush( p_output_aux );
        i_file_next_flush = i_stc + FILE_FLUSH;
    }
584 585 586 587 588 589 590 591 592 593 594 595 596

    return i_ret;
}

static void file_ExitWrite(void)
{
    close( i_output_fd );
    fclose( p_output_aux );
}

static int file_InitWrite( const char *psz_arg, size_t i_len, bool b_append )
{
    char *psz_aux_file = GetAuxFile( psz_arg, i_len );
597 598
    if ( b_append )
        CheckFileSizes( psz_arg, psz_aux_file, i_len );
599
    i_output_fd = OpenFile( psz_arg, false, b_append );
600
    if ( i_output_fd < 0 ) return -1;
601 602
    p_output_aux = OpenAuxFile( psz_aux_file, false, b_append );
    free( psz_aux_file );
603
    if ( p_output_aux == NULL ) return -1;
604 605 606 607 608 609 610 611 612 613 614 615 616

    pf_Write = file_Write;
    pf_ExitWrite = file_ExitWrite;
    return 0;
}

/*****************************************************************************
 * dir_*: handler for the auxiliary directory format
 *****************************************************************************/
static char *psz_input_dir_name;
static size_t i_input_dir_len;
static uint64_t i_input_dir_file;
static uint64_t i_input_dir_delay;
617

618 619
static ssize_t dir_Read( void *p_buf, size_t i_len )
{
620
    for ( ; ; )
621
    {
622 623 624
        ssize_t i_ret = file_Read( p_buf, i_len );
        if ( i_ret > 0 ) return i_ret;

625 626 627 628
        b_die = 0; /* we're not dead yet */
        close( i_input_fd );
        fclose( p_input_aux );
        i_input_fd = 0;
629
        p_input_aux = NULL;
630

631
        for ( ; ; )
632
        {
633 634 635 636 637 638
            i_input_dir_file++;

            i_input_fd = OpenDirFile( psz_input_dir_name, i_input_dir_file,
                                      true, i_input_dir_len, &p_input_aux );
            if ( i_input_fd > 0 ) break;

639 640
            if ( i_input_dir_file * i_rotate_size + i_rotate_offset >
                 i_first_stc + i_duration )
641 642 643 644 645 646 647
            {
                msg_Err( NULL, "end of files reached" );
                b_die = 1;
                return 0;
            }

            msg_Warn( NULL, "missing segment" );
648 649
        }
    }
650 651
}

652
static bool dir_Delay(void)
653 654
{
    uint64_t i_wall = pf_Date() - i_input_dir_delay;
655
    int64_t i_delay = i_stc - i_wall;
656

657 658 659 660 661 662 663 664
    if ( i_delay > 0 )
        pf_Sleep( i_delay );
    else if ( i_delay < -MAX_LATENESS )
    {
        msg_Warn( NULL, "dropping late packet" );
        return false;
    }
    return true;
665 666 667 668 669
}

static void dir_ExitRead(void)
{
    free( psz_input_dir_name );
670
    if ( i_input_fd > 0 )
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
    {
        close( i_input_fd );
        fclose( p_input_aux );
    }
}

static int dir_InitRead( const char *psz_arg, size_t i_len,
                         off_t i_nb_skipped_chunks, int64_t i_pos )
{
    if ( i_nb_skipped_chunks )
    {
        msg_Err( NULL, "unable to skip chunks with directory input" );
        return -1;
    }

    if ( i_pos <= 0 )
        i_pos += real_Date();
    if ( i_pos <= 0 )
    {
        msg_Err( NULL, "invalid position" );
        return -1;
    }
    i_first_stc = i_stc = i_pos;
    i_input_dir_delay = real_Date() - i_stc;

    psz_input_dir_name = strdup( psz_arg );
    i_input_dir_len = i_len;
698
    i_input_dir_file = GetDirFile( i_rotate_size, i_rotate_offset, i_pos );
699

700
    for ( ; ; )
701 702 703 704
    {
        i_nb_skipped_chunks = LookupDirAuxFile( psz_input_dir_name,
                                                i_input_dir_file, i_stc,
                                                i_input_dir_len );
705 706
        if ( i_nb_skipped_chunks >= 0 ) break;

707 708
        if ( i_input_dir_file * i_rotate_size + i_rotate_offset >
             i_stc + i_duration )
709 710 711 712
        {
            msg_Err( NULL, "position not found" );
            return -1;
        }
713 714 715

        i_input_dir_file++;
        msg_Warn( NULL, "missing segment" );
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
    }

    i_input_fd = OpenDirFile( psz_input_dir_name, i_input_dir_file,
                              true, i_input_dir_len, &p_input_aux );

    lseek( i_input_fd, (off_t)i_len * i_nb_skipped_chunks, SEEK_SET );
    fseeko( p_input_aux, 8 * i_nb_skipped_chunks, SEEK_SET );

    pf_Date = real_Date;
    pf_Sleep = real_Sleep;
    pf_Read = dir_Read;
    pf_Delay = dir_Delay;
    pf_ExitRead = dir_ExitRead;
    return 0;
}

static char *psz_output_dir_name;
static size_t i_output_dir_len;
static uint64_t i_output_dir_file;

static ssize_t dir_Write( const void *p_buf, size_t i_len )
{
738
    uint64_t i_dir_file = GetDirFile( i_rotate_size, i_rotate_offset, i_stc );
739
    if ( !i_output_fd || i_dir_file != i_output_dir_file )
740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
    {
        if ( i_output_fd )
        {
            close( i_output_fd );
            fclose( p_output_aux );
        }

        i_output_dir_file = i_dir_file;

        i_output_fd = OpenDirFile( psz_output_dir_name, i_output_dir_file,
                                   false, i_output_dir_len, &p_output_aux );
    }

    return file_Write( p_buf, i_len );
}

static void dir_ExitWrite(void)
{
    free( psz_output_dir_name );
    if ( i_output_fd )
    {
        close( i_output_fd );
        fclose( p_output_aux );
    }
}

static int dir_InitWrite( const char *psz_arg, size_t i_len, bool b_append )
{
    psz_output_dir_name = strdup( psz_arg );
    i_output_dir_len = i_len;
    i_output_dir_file = 0;
    i_output_fd = 0;

    pf_Date = real_Date;
    pf_Sleep = real_Sleep;
    pf_Write = dir_Write;
    pf_ExitWrite = dir_ExitWrite;

    return 0;
}

781 782 783 784 785 786 787
/*****************************************************************************
 * GetPCR: read PCRs to align RTP timestamps with PCR scale (RFC compliance)
 *****************************************************************************/
static void GetPCR( const uint8_t *p_buffer, size_t i_read_size )
{
    while ( i_read_size >= TS_SIZE )
    {
788
        uint16_t i_pid = ts_get_pid( p_buffer );
789

790
        if ( !ts_validate( p_buffer ) )
791 792 793
        {
            msg_Warn( NULL, "invalid TS packet (sync=0x%x)", p_buffer[0] );
        }
794
        else if ( (i_pid == i_pcr_pid || i_pcr_pid == 8192)
795 796
              && ts_has_adaptation(p_buffer) && ts_get_adaptation(p_buffer)
              && tsaf_has_pcr(p_buffer) )
797
        {
798
            i_pcr = tsaf_get_pcr( p_buffer ) * 300 + tsaf_get_pcrext( p_buffer );
799 800 801 802 803 804 805
            i_pcr_stc = i_stc;
        }
        p_buffer += TS_SIZE;
        i_read_size -= TS_SIZE;
    }
}

806
/*****************************************************************************
807
 * FixCC: fix continuity counters
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836
 *****************************************************************************/
static void FixCC( uint8_t *p_buffer, size_t i_read_size )
{
    while ( i_read_size >= TS_SIZE )
    {
        uint16_t i_pid = ts_get_pid( p_buffer );

        if ( !ts_validate( p_buffer ) )
        {
            msg_Warn( NULL, "invalid TS packet (sync=0x%x)", p_buffer[0] );
        }
        else
        {
            if ( pi_pid_cc_table[i_pid] == 0x10 )
            {
                msg_Dbg( NULL, "new pid entry %d", i_pid );
                pi_pid_cc_table[i_pid] = 0;
            }
            else if ( ts_has_payload( p_buffer ) )
            {
                pi_pid_cc_table[i_pid] = (pi_pid_cc_table[i_pid] + 1) % 0x10; 
            }
            ts_set_cc( p_buffer, pi_pid_cc_table[i_pid] );
        }
        p_buffer += TS_SIZE;
        i_read_size -= TS_SIZE;
    }
}

837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909
/*****************************************************************************
 * RestampPCR
 *****************************************************************************/
static void RestampPCR(uint8_t *p_ts)
{
    uint64_t i_pcr = tsaf_get_pcr(p_ts) * 300 + tsaf_get_pcrext(p_ts);
    bool b_discontinuity = tsaf_has_discontinuity(p_ts);

    if (i_last_pcr == TS_CLOCK_MAX)
        i_last_pcr = i_pcr;
    else {
        /* handle 2^33 wrap-arounds */
        uint64_t i_delta =
            (TS_CLOCK_MAX + i_pcr -
             (i_last_pcr % TS_CLOCK_MAX)) % TS_CLOCK_MAX;
        if (i_delta <= MAX_PCR_INTERVAL && !b_discontinuity)
            i_last_pcr = i_pcr;
        else {
            msg_Warn( NULL, "PCR discontinuity (%"PRIu64")", i_delta );
            i_last_pcr += i_stc - i_last_pcr_date;
            i_last_pcr %= TS_CLOCK_MAX;
            i_pcr_offset += TS_CLOCK_MAX + i_last_pcr - i_pcr;
            i_pcr_offset %= TS_CLOCK_MAX;
            i_last_pcr = i_pcr;
        }
    }
    i_last_pcr_date = i_stc;
    if (!i_pcr_offset)
        return;

    i_pcr += i_pcr_offset;
    i_pcr %= TS_CLOCK_MAX;
    tsaf_set_pcr(p_ts, i_pcr / 300);
    tsaf_set_pcrext(p_ts, i_pcr % 300);
    tsaf_clear_discontinuity(p_ts);
}

/*****************************************************************************
 * RestampTS
 *****************************************************************************/
static uint64_t RestampTS(uint64_t i_ts)
{
    i_ts += i_pcr_offset;
    i_ts %= TS_CLOCK_MAX;
    return i_ts;
}

/*****************************************************************************
 * Restamp: Restamp PCRs, DTSs and PTSs
 *****************************************************************************/
static void Restamp( uint8_t *p_buffer, size_t i_read_size )
{
    while ( i_read_size >= TS_SIZE )
    {
        if ( !ts_validate( p_buffer ) )
        {
            msg_Warn( NULL, "invalid TS packet (sync=0x%x)", p_buffer[0] );
        }
        else
        {
            if (ts_has_adaptation(p_buffer) && ts_get_adaptation(p_buffer) &&
                tsaf_has_pcr(p_buffer))
                RestampPCR(p_buffer);

            uint16_t header_size = TS_HEADER_SIZE +
                                   (ts_has_adaptation(p_buffer) ? 1 : 0) +
                                   ts_get_adaptation(p_buffer);
            if (ts_get_unitstart(p_buffer) && ts_has_payload(p_buffer) &&
                header_size + PES_HEADER_SIZE_PTS <= TS_SIZE &&
                pes_validate(p_buffer + header_size) &&
                pes_get_streamid(p_buffer + header_size) !=
                    PES_STREAM_ID_PRIVATE_2 &&
                pes_validate_header(p_buffer + header_size) &&
910 911 912
                pes_has_pts(p_buffer + header_size)
                /* disable the check as this is a common mistake */
                /* && pes_validate_pts(p_buffer + header_size) */) {
913 914 915 916 917
                pes_set_pts(p_buffer + header_size,
                        RestampTS(pes_get_pts(p_buffer + header_size) * 300) /
                        300);

                if (header_size + PES_HEADER_SIZE_PTSDTS <= TS_SIZE &&
918 919
                    pes_has_dts(p_buffer + header_size)
                    /* && pes_validate_dts(p_buffer + header_size) */)
920 921 922 923 924 925 926 927 928 929
                    pes_set_dts(p_buffer + header_size,
                        RestampTS(pes_get_dts(p_buffer + header_size) * 300) /
                        300);
            }
        }
        p_buffer += TS_SIZE;
        i_read_size -= TS_SIZE;
    }
}

930 931 932 933 934 935
/*****************************************************************************
 * Entry point
 *****************************************************************************/
int main( int i_argc, char **pp_argv )
{
    int i_priority = -1;
936
    const char *psz_syslog_tag = NULL;
937
    bool b_passthrough = false;
938
    bool b_restamp = false;
939
    int i_stc_fd = -1;
940 941
    off_t i_skip_chunks = 0, i_nb_chunks = -1;
    int64_t i_seek = 0;
942 943 944 945
    bool b_append = false;
    uint8_t *p_buffer, *p_read_buffer;
    size_t i_max_read_size, i_max_write_size;
    int c;
946 947
    struct sigaction sa;
    sigset_t set;
948

949
    /* Parse options */
950
    while ( (c = getopt( i_argc, pp_argv, "i:l:t:XT:fp:CPs:n:k:d:ar:O:S:uUm:R:wh" )) != -1 )
951 952 953 954 955 956 957
    {
        switch ( c )
        {
        case 'i':
            i_priority = strtol( optarg, NULL, 0 );
            break;

958 959 960 961
        case 'l':
            psz_syslog_tag = optarg;
            break;

962 963 964 965
        case 't':
            i_ttl = strtol( optarg, NULL, 0 );
            break;

966 967 968 969
        case 'X':
            b_passthrough = true;
            break;

970 971 972 973 974 975 976
        case 'T':
            i_stc_fd = open( optarg, O_WRONLY | O_CREAT | O_TRUNC, 0644 );
            if ( i_stc_fd < 0 )
                msg_Warn( NULL, "unable to open %s (%s)\n", optarg,
                          strerror(errno) );
            break;

977 978 979 980
        case 'f':
            b_sleep = false;
            break;

981 982 983 984
        case 'p':
            i_pcr_pid = strtol( optarg, NULL, 0 );
            break;

985 986 987 988 989
        case 'C':
            pi_pid_cc_table = malloc(MAX_PIDS * sizeof(uint8_t));
            memset(pi_pid_cc_table, 0x10, MAX_PIDS * sizeof(uint8_t));
            break;

990 991 992 993
        case 'P':
            b_restamp = true;
            break;

994 995 996 997 998 999 1000 1001
        case 's':
            i_skip_chunks = strtol( optarg, NULL, 0 );
            break;

        case 'n':
            i_nb_chunks = strtol( optarg, NULL, 0 );
            break;

1002 1003 1004 1005
        case 'k':
            i_seek = strtoull( optarg, NULL, 0 );
            break;

1006 1007 1008 1009 1010 1011 1012 1013
        case 'd':
            i_duration = strtoull( optarg, NULL, 0 );
            break;

        case 'a':
            b_append = true;
            break;

1014 1015 1016 1017
        case 'r':
            i_rotate_size = strtoull( optarg, NULL, 0 );
            break;

1018 1019 1020 1021
        case 'O':
            i_rotate_offset = strtoull( optarg, NULL, 0 );
            break;

1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
        case 'S':
        {
            struct in_addr maddr;
            if ( !inet_aton( optarg, &maddr ) )
                usage();
            i_ssrc = maddr.s_addr;
            b_overwrite_ssrc = true;
            break;
        }

        case 'u':
            b_input_udp = true;
            break;

        case 'U':
            b_output_udp = true;
            break;

        case 'm':
            i_asked_payload_size = strtol( optarg, NULL, 0 );
            break;

1044 1045 1046 1047
        case 'R':
            i_rtp_header_size = strtol( optarg, NULL, 0 );
            break;

1048 1049 1050 1051
        case 'w':
            b_raw_packets = true;
            break;

1052 1053 1054 1055 1056 1057 1058 1059 1060
        case 'h':
        default:
            usage();
            break;
        }
    }
    if ( optind >= i_argc - 1 )
        usage();

1061 1062 1063
    if ( psz_syslog_tag != NULL )
        msg_Openlog( psz_syslog_tag, LOG_NDELAY, LOG_USER );

1064
    /* Open sockets */
1065 1066
    if ( udp_InitRead( pp_argv[optind], i_asked_payload_size, i_skip_chunks,
                       i_seek ) < 0 )
1067
    {
1068 1069 1070
        int i_ret;
        mode_t i_mode = StatFile( pp_argv[optind] );
        if ( !i_mode )
1071
        {
1072 1073
            msg_Err( NULL, "input not found, exiting" );
            exit(EXIT_FAILURE);
1074
        }
1075 1076 1077 1078 1079 1080 1081

        if ( S_ISDIR( i_mode ) )
            i_ret = dir_InitRead( pp_argv[optind], i_asked_payload_size,
                                  i_skip_chunks, i_seek );
        else if ( S_ISCHR( i_mode ) || S_ISFIFO( i_mode ) )
            i_ret = stream_InitRead( pp_argv[optind], i_asked_payload_size,
                                     i_skip_chunks, i_seek );
1082
        else
1083 1084 1085
            i_ret = file_InitRead( pp_argv[optind], i_asked_payload_size,
                                   i_skip_chunks, i_seek );
        if ( i_ret == -1 )
1086
        {
1087 1088
            msg_Err( NULL, "couldn't open input, exiting" );
            exit(EXIT_FAILURE);
1089 1090 1091 1092 1093
        }
        b_input_udp = true; /* We don't need no, RTP header */
    }
    optind++;

1094
    if ( udp_InitWrite( pp_argv[optind], i_asked_payload_size, b_append ) < 0 )
1095
    {
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108
        int i_ret;
        mode_t i_mode = StatFile( pp_argv[optind] );

        if ( S_ISDIR( i_mode ) )
            i_ret = dir_InitWrite( pp_argv[optind], i_asked_payload_size,
                                   b_append );
        else if ( S_ISCHR( i_mode ) || S_ISFIFO( i_mode ) )
            i_ret = stream_InitWrite( pp_argv[optind], i_asked_payload_size,
                                      b_append );
        else
            i_ret = file_InitWrite( pp_argv[optind], i_asked_payload_size,
                                    b_append );
        if ( i_ret == -1 )
1109
        {
1110 1111
            msg_Err( NULL, "couldn't open output, exiting" );
            exit(EXIT_FAILURE);
1112 1113 1114 1115 1116 1117 1118
        }
        b_output_udp = true; /* We don't need no, RTP header */
    }
    optind++;

    srand( time(NULL) * getpid() );
    i_max_read_size = i_asked_payload_size + (b_input_udp ? 0 :
1119
                                              i_rtp_header_size);
1120 1121
    i_max_write_size = i_asked_payload_size + (b_output_udp ? 0 :
                                        (b_input_udp ? RTP_HEADER_SIZE :
1122
                                         i_rtp_header_size));
1123
    p_buffer = malloc( (i_max_read_size > i_max_write_size) ? i_max_read_size :
1124 1125 1126 1127
                       i_max_write_size );
    p_read_buffer = p_buffer + ((b_input_udp && !b_output_udp) ?
                                RTP_HEADER_SIZE : 0);
    if ( b_input_udp && !b_output_udp )
1128
        i_rtp_seqnum = rand() & 0xffff;
1129

1130
    /* Real-time priority */
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
    if ( i_priority > 0 )
    {
        struct sched_param param;
        int i_error;

        memset( &param, 0, sizeof(struct sched_param) );
        param.sched_priority = i_priority;
        if ( (i_error = pthread_setschedparam( pthread_self(), SCHED_RR,
                                               &param )) )
        {
            msg_Warn( NULL, "couldn't set thread priority: %s",
                      strerror(i_error) );
        }
    }

1146 1147 1148 1149 1150 1151 1152
    /* Set signal handlers */
    memset( &sa, 0, sizeof(struct sigaction) );
    sa.sa_handler = SigHandler;
    sigfillset( &set );

    if ( sigaction( SIGTERM, &sa, NULL ) == -1 ||
         sigaction( SIGHUP, &sa, NULL ) == -1 ||
1153 1154
         sigaction( SIGINT, &sa, NULL ) == -1 ||
         sigaction( SIGPIPE, &sa, NULL ) == -1 )
1155 1156 1157 1158
    {
        msg_Err( NULL, "couldn't set signal handler: %s", strerror(errno) );
        exit(EXIT_FAILURE);
    }
1159

1160
    /* Main loop */
1161 1162 1163 1164 1165 1166 1167 1168
    while ( !b_die )
    {
        ssize_t i_read_size = pf_Read( p_read_buffer, i_max_read_size );
        uint8_t *p_payload;
        size_t i_payload_size;
        uint8_t *p_write_buffer;
        size_t i_write_size;

1169 1170 1171
        if ( i_duration && i_stc > i_first_stc + i_duration )
            break;

1172 1173
        if ( i_read_size <= 0 ) continue;

1174
        if ( b_sleep && pf_Delay != NULL)
1175 1176
            if (!pf_Delay())
                goto dropped_packet;
1177

1178 1179 1180
        /* Determine start and size of payload */
        if ( !b_input_udp )
        {
1181
            if ( !rtp_check_hdr( p_read_buffer ) )
1182
                msg_Warn( NULL, "invalid RTP packet received" );
1183
            p_payload = rtp_payload( p_read_buffer );
1184 1185 1186 1187 1188 1189 1190 1191
            i_payload_size = p_read_buffer + i_read_size - p_payload;
        }
        else
        {
            p_payload = p_read_buffer;
            i_payload_size = i_read_size;
        }

1192 1193 1194 1195
        /* Skip last incomplete TS packet */
        i_read_size -= i_payload_size % TS_SIZE;
        i_payload_size -= i_payload_size % TS_SIZE;

1196 1197 1198
        /* Pad to get the asked payload size */
        while ( i_payload_size + TS_SIZE <= i_asked_payload_size )
        {
1199
            ts_pad( &p_payload[i_payload_size] );
1200 1201 1202 1203
            i_read_size += TS_SIZE;
            i_payload_size += TS_SIZE;
        }

1204 1205 1206
        /* Fix continuity counters */
        if ( pi_pid_cc_table != NULL )
            FixCC( p_payload, i_payload_size );
1207 1208 1209 1210

        /* Restamp */
        if ( b_restamp )
            Restamp( p_payload, i_payload_size );
1211

1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224
        /* Prepare header and size of output */
        if ( b_output_udp )
        {
            p_write_buffer = p_payload;
            i_write_size = i_payload_size;
        }
        else /* RTP output */
        {
            if ( b_input_udp )
            {
                p_write_buffer = p_buffer;
                i_write_size = i_payload_size + RTP_HEADER_SIZE;

1225 1226 1227 1228
                rtp_set_hdr( p_write_buffer );
                rtp_set_type( p_write_buffer, RTP_TYPE_TS );
                rtp_set_seqnum( p_write_buffer, i_rtp_seqnum );
                i_rtp_seqnum++;
1229 1230 1231 1232

                if ( i_pcr_pid )
                {
                    GetPCR( p_payload, i_payload_size );
1233 1234
                    rtp_set_timestamp( p_write_buffer,
                                       (i_pcr + (i_stc - i_pcr_stc)) / 300 );
1235 1236 1237 1238
                }
                else
                {
                    /* This isn't RFC-compliant but no one really cares */
1239
                    rtp_set_timestamp( p_write_buffer, i_stc / 300 );
1240
                }
1241
                rtp_set_ssrc( p_write_buffer, (uint8_t *)&i_ssrc );
1242 1243 1244 1245 1246 1247 1248 1249
            }
            else /* RTP output, RTP input */
            {
                p_write_buffer = p_read_buffer;
                i_write_size = i_read_size;

                if ( i_pcr_pid )
                {
1250
                    if ( rtp_get_type( p_write_buffer ) != RTP_TYPE_TS )
1251 1252 1253
                        msg_Warn( NULL, "input isn't MPEG transport stream" );
                    else
                        GetPCR( p_payload, i_payload_size );
1254 1255
                    rtp_set_timestamp( p_write_buffer,
                                       (i_pcr + (i_stc - i_pcr_stc)) / 300 );
1256 1257
                }
                if ( b_overwrite_ssrc )
1258
                    rtp_set_ssrc( p_write_buffer, (uint8_t *)&i_ssrc );
1259 1260 1261 1262
            }
        }

        pf_Write( p_write_buffer, i_write_size );
1263 1264 1265 1266
        if ( b_passthrough )
            if ( write( STDOUT_FILENO, p_write_buffer, i_write_size )
                  != i_write_size )
                msg_Warn( NULL, "write(stdout) error (%s)", strerror(errno) );
1267

1268
dropped_packet:
1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280
        if ( i_stc_fd != -1 )
        {
            char psz_stc[256];
            size_t i_len = sprintf( psz_stc, "<?xml version=\"1.0\" encoding=\"utf-8\"?><MULTICAT><STC value=\"%"PRIu64"\"/></MULTICAT>", i_stc );
            memset( psz_stc + i_len, '\n', sizeof(psz_stc) - i_len );
            if ( lseek( i_stc_fd, 0, SEEK_SET ) == (off_t)-1 )
                msg_Warn( NULL, "lseek date file failed (%s)",
                          strerror(errno) );
            if ( write( i_stc_fd, psz_stc, sizeof(psz_stc) ) != sizeof(psz_stc) )
                msg_Warn( NULL, "write date file error (%s)", strerror(errno) );
        }

1281 1282 1283
        if ( i_nb_chunks > 0 )
            i_nb_chunks--;
        if ( !i_nb_chunks )
1284
            break;
1285 1286
    }

1287
    free(pi_pid_cc_table);
Christophe Massiot's avatar
Christophe Massiot committed
1288
    free(p_buffer);
1289

1290 1291 1292
    pf_ExitRead();
    pf_ExitWrite();

1293 1294 1295
    if ( psz_syslog_tag != NULL )
        msg_Closelog();

1296
    return b_error ? EXIT_FAILURE : EXIT_SUCCESS;
1297
}