Commit 45c877aa authored by Julian Scheel's avatar Julian Scheel Committed by Thomas Guillem

access: Add satip access module

This module implements a minimal RTSP subset, which is required to support
satip servers.
Signed-off-by: Thomas Guillem's avatarThomas Guillem <thomas@gllm.fr>
parent 3078e74a
......@@ -44,6 +44,7 @@ Access:
* Named pipes and device nodes are no longer included in directory listings
by default. Use --list-special-files to include them back.
* Support for timeout in UDP input --udp-timeout=<seconds>
* New SAT>IP access module, to receive DVB-S via IP networks
Decoder:
* OMX GPU-zerocopy support for decoding and display on Android using OpenMax IL
......
......@@ -613,7 +613,7 @@ AC_FUNC_STRCOLL
dnl Check for non-standard system calls
case "$SYS" in
"linux")
AC_CHECK_FUNCS([accept4 pipe2 eventfd vmsplice sched_getaffinity])
AC_CHECK_FUNCS([accept4 pipe2 eventfd vmsplice sched_getaffinity recvmmsg])
;;
"mingw32")
AC_CHECK_FUNCS([_lock_file])
......
......@@ -328,6 +328,7 @@ $Id$
* samplerate: Secret Rabbit Code (libsamplerate) audio resampler
* sap: Interface module to read SAP/SDP announcements
* sapi: Windows Text to Speech Synthetizer using the SAPI 5.1 API
* satip: SES Astra SAT>IP access module
* scale: Images rescaler
* scaletempo: Scale audio tempo in sync with playback rate
* scene: scene video filter
......
......@@ -442,6 +442,9 @@ access_LTLIBRARIES += libavio_plugin.la
endif
endif
libsatip_plugin_la_SOURCES = access/satip.c
libsatip_plugin_la_LIBADD = $(SOCKET_LIBS)
access_LTLIBRARIES += libsatip_plugin.la
### Misc ###
......
/*****************************************************************************
* satip.c: SAT>IP input module
*****************************************************************************
* Copyright © 2016 VLC authors and VideoLAN
* Copyright © 2016 jusst technologies GmbH
* Copyright © 2016 Videolabs SAS
* Copyright © 2016 Julian Scheel
*
* Authors: Julian Scheel <julian@jusst.de>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#include "config.h"
#include <unistd.h>
#include <ctype.h>
#include <vlc_common.h>
#include <vlc_plugin.h>
#include <vlc_access.h>
#include <vlc_network.h>
#include <vlc_block.h>
#include <vlc_rand.h>
#include <vlc_url.h>
#include <vlc_interrupt.h>
#ifdef HAVE_POLL
#include <poll.h>
#endif
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#undef HAVE_RECVMMSG
#define RTSP_DEFAULT_PORT 554
#define RTSP_RECEIVE_BUFFER 2048
#define RTP_HEADER_SIZE 12
#define VLEN 100
#define KEEPALIVE_INTERVAL 60
#define KEEPALIVE_MARGIN 5
static int satip_open(access_t *);
static void satip_close(access_t *);
#define BUFFER_TEXT N_("Receive buffer")
#define BUFFER_LONGTEXT N_("UDP receive buffer size (bytes)")
#define MULTICAST_TEXT N_("Request multicast stream")
#define MULTICAST_LONGTEXT N_("Request server to send stream as multicast")
vlc_module_begin()
set_shortname("satip")
set_description( N_("SAT>IP Receiver Plugin") )
set_capability("access", 201)
set_callbacks(satip_open, satip_close)
set_category(CAT_INPUT)
set_subcategory(SUBCAT_INPUT_ACCESS)
add_integer("satip-buffer", 0x400000, BUFFER_TEXT, BUFFER_LONGTEXT, true)
add_bool("satip-multicast", false, MULTICAST_TEXT, MULTICAST_LONGTEXT, true)
add_shortcut("satip")
vlc_module_end()
enum rtsp_state {
RTSP_IDLE,
RTSP_DESCRIBE,
RTSP_SETUP,
RTSP_PLAY,
RTSP_RUNNING
};
enum rtsp_result {
RTSP_RESULT_OK = 200,
};
#define UDP_ADDRESS_LEN 16
typedef struct access_sys_t {
char *content_base;
char *control;
char session_id[64];
uint16_t stream_id;
int keepalive_interval;
char udp_address[UDP_ADDRESS_LEN];
uint16_t udp_port;
int tcp_sock;
int udp_sock;
int rtcp_sock;
enum rtsp_state state;
int cseq;
size_t fifo_size;
block_fifo_t *fifo;
vlc_thread_t thread;
#ifdef HAVE_RECVMMSG
block_t *input_blocks[VLEN];
#endif
uint16_t last_seq_nr;
bool woken;
} access_sys_t;
static void parse_session(char *request_line, char *session, unsigned max, int *timeout) {
char *state;
char *tok;
tok = strtok_r(request_line, ";", &state);
if (tok == NULL)
return;
strncpy(session, tok, __MIN(strlen(tok), max - 1));
while ((tok = strtok_r(NULL, ";", &state)) != NULL) {
if (strncmp(tok, "timeout=", 8) == 0) {
*timeout = atoi(tok + 8);
if (*timeout > 5)
*timeout -= KEEPALIVE_MARGIN;
else if (*timeout > 0)
*timeout = 1;
}
}
}
static int parse_port(char *str, uint16_t *port)
{
int p = atoi(str);
if (p < 0 || p > UINT16_MAX)
return VLC_EBADVAR;
*port = p;
return 0;
}
static int parse_transport(access_t *access, char *request_line) {
access_sys_t *sys = access->p_sys;
char *state;
char *tok;
int err;
tok = strtok_r(request_line, ";", &state);
if (tok == NULL || strncmp(tok, "RTP/AVP", 7) != 0)
return VLC_EGENERIC;
tok = strtok_r(NULL, ";", &state);
if (tok == NULL || strncmp(tok, "multicast", 9) != 0)
return 0;
while ((tok = strtok_r(NULL, ";", &state)) != NULL) {
if (strncmp(tok, "destination=", 12) == 0) {
strncpy(sys->udp_address, tok + 12, __MIN(strlen(tok + 12), UDP_ADDRESS_LEN - 1));
} else if (strncmp(tok, "port=", 5) == 0) {
char port[6];
char *end;
memset(port, 0x00, 6);
strncpy(port, tok + 5, __MIN(strlen(tok + 5), 5));
if ((end = strstr(port, "-")) != NULL)
*end = '\0';
err = parse_port(port, &sys->udp_port);
if (err)
return err;
}
}
return 0;
}
/*
* Semi-interruptible net_Gets replacement.
* If an interruption is occuring it will fallback to non-interruptible read
* with a given timeout before it returns.
*
* interrupted: Informs the caller whether an interrupt occured or not
*/
static char *net_readln_timeout(vlc_object_t *obj, int fd, int timeout, bool *interrupted)
{
char *buf = NULL;
size_t size = 0, len = 0;
bool intr = false;
for (;;)
{
if (len == size)
{
if (unlikely(size >= (1 << 16)))
{
errno = EMSGSIZE;
goto error; /* put sane buffer size limit */
}
char *newbuf = realloc(buf, size + 1024);
if (unlikely(newbuf == NULL))
goto error;
buf = newbuf;
size += 1024;
}
assert(len < size);
ssize_t val = 0;
if (!intr) {
val = vlc_recv_i11e(fd, buf + len, size - len, MSG_PEEK);
if (val <= 0 && errno == EINTR) {
intr = true;
if (interrupted)
*interrupted = true;
continue;
}
if (val <= 0)
goto error;
} else {
struct pollfd pfd = {
.fd = fd,
.events = POLLIN,
};
int ret;
while((ret = poll(&pfd, 1, timeout)) < 0);
if (ret < 0)
goto error;
val = recv(fd, buf + len, size - len, MSG_PEEK);
if (val <= 0)
goto error;
}
char *end = memchr(buf + len, '\n', val);
if (end != NULL)
val = (end + 1) - (buf + len);
if (recv(fd, buf + len, val, 0) != val)
goto error;
len += val;
if (end != NULL)
break;
}
assert(len > 0);
buf[--len] = '\0';
if (len > 0 && buf[--len] == '\r')
buf[len] = '\0';
return buf;
error:
msg_Err(obj, "read error: %s", vlc_strerror_c(errno));
free(buf);
return NULL;
}
#define skip_whitespace(x) while(*x == ' ') x++
static enum rtsp_result rtsp_handle(access_t *access, bool *interrupted) {
access_sys_t *sys = access->p_sys;
uint8_t buffer[512];
int rtsp_result = 0;
bool have_header = false;
size_t content_length = 0;
size_t read = 0;
char *in, *val;
/* Parse header */
while (!have_header) {
in = net_readln_timeout((vlc_object_t*)access, sys->tcp_sock, 5000,
interrupted);
if (in == NULL)
break;
if (strncmp(in, "RTSP/1.0 ", 9) == 0) {
rtsp_result = atoi(in + 9);
} else if (strncmp(in, "Content-Base:", 13) == 0) {
free(sys->content_base);
val = in + 13;
skip_whitespace(val);
sys->content_base = strdup(val);
} else if (strncmp(in, "Content-Length:", 15) == 0) {
val = in + 16;
skip_whitespace(val);
content_length = atoi(val);
} else if (strncmp("Session:", in, 8) == 0) {
val = in + 8;
skip_whitespace(val);
parse_session(val, sys->session_id, 64, &sys->keepalive_interval);
} else if (strncmp("Transport:", in, 10) == 0) {
val = in + 10;
skip_whitespace(val);
if (parse_transport(access, val) != 0) {
rtsp_result = VLC_EGENERIC;
break;
}
} else if (strncmp("com.ses.streamID:", in, 17) == 0) {
val = in + 17;
skip_whitespace(val);
sys->stream_id = atoi(val);
} else if (in[0] == '\0') {
have_header = true;
}
free(in);
}
/* Discard further content */
while (content_length > 0 &&
(read = net_Read(access, sys->tcp_sock, buffer, __MIN(sizeof(buffer), content_length))))
content_length -= read;
return rtsp_result;
}
#ifdef HAVE_RECVMMSG
static int alloc_input_blocks(access_t *access)
{
access_sys_t *sys = access->p_sys;
int i;
for (i = 0; i < VLEN; i++) {
if (!sys->input_blocks[i])
sys->input_blocks[i] = block_Alloc(RTSP_RECEIVE_BUFFER);
if (unlikely(sys->input_blocks[i] == NULL))
return VLC_ENOMEM;
}
return 0;
}
#endif
static int check_rtp_seq(access_t *access, block_t *block)
{
access_sys_t *sys = access->p_sys;
uint16_t seq_nr = block->p_buffer[2] << 8 | block->p_buffer[3];
if (seq_nr == sys->last_seq_nr) {
msg_Warn(access, "Received duplicate packet (seq_nr=%"PRIu16")", seq_nr);
return VLC_EGENERIC;
} else if (seq_nr < (uint16_t)(sys->last_seq_nr + 1)) {
msg_Warn(access, "Received out of order packet (seq_nr=%"PRIu16" < %"PRIu16")",
seq_nr, sys->last_seq_nr);
return VLC_EGENERIC;
} else if (++sys->last_seq_nr > 1 && seq_nr > sys->last_seq_nr) {
msg_Warn(access, "Gap in seq_nr (%"PRIu16" > %"PRIu16"), probably lost a packet",
seq_nr, sys->last_seq_nr);
}
sys->last_seq_nr = seq_nr;
return 0;
}
static void satip_teardown(void *data) {
access_t *access = data;
access_sys_t *sys = access->p_sys;
int ret;
if (sys->tcp_sock > 0) {
if (sys->session_id[0] > 0) {
char discard_buf[32];
struct pollfd pfd = {
.fd = sys->tcp_sock,
.events = POLLOUT,
};
char *msg;
ssize_t len = asprintf(&msg, "TEARDOWN %s RTSP/1.0\r\n"
"CSeq: %d\r\n"
"Session: %s\r\n\r\n",
sys->control, sys->cseq++, sys->session_id);
if (len < 0)
return;
/* make socket non-blocking, to avoid blocking when output buffer
* has not enough space */
#ifndef _WIN32
fcntl(sys->tcp_sock, F_SETFL, fcntl(sys->tcp_sock, F_GETFL) | O_NONBLOCK);
#else
ioctlsocket(sys->tcp_sock, FIONBIO, &(unsigned long){ 1 });
#endif
for (unsigned sent = 0; sent < len;) {
ret = poll(&pfd, 1, 5000);
if (ret == 0) {
msg_Err(access, "Timed out sending RTSP teardown\n");
free(msg);
return;
}
ret = send(sys->tcp_sock, msg + sent, len, MSG_NOSIGNAL);
if (ret < 0) {
msg_Err(access, "Failed to send RTSP teardown: %d\n", ret);
free(msg);
return;
}
sent += ret;
}
free(msg);
if (rtsp_handle(access, NULL) != RTSP_RESULT_OK) {
msg_Err(access, "Failed to teardown RTSP session");
return;
}
/* Some SATIP servers send a few empty extra bytes after TEARDOWN.
* Try to read them, to avoid a TCP socket reset */
while ((len = recv(sys->tcp_sock, discard_buf, sizeof(discard_buf), 0) > 0));
/* Extra sleep for compatibility with some satip servers, that
* can't handle new sessions right after teardown */
msleep(150000);
}
}
}
#define RECV_TIMEOUT 2 * 1000 * 1000
static void *satip_thread(void *data) {
access_t *access = data;
access_sys_t *sys = access->p_sys;
int sock = sys->udp_sock;
volatile mtime_t last_recv = mdate();
block_t *block = NULL;
ssize_t len;
mtime_t next_keepalive = mdate() + sys->keepalive_interval * 1000 * 1000;
#ifdef HAVE_RECVMMSG
struct mmsghdr msgs[VLEN];
struct iovec iovecs[VLEN];
int retval, i;
memset(msgs, 0, sizeof(msgs));
#endif
while (last_recv > mdate() - RECV_TIMEOUT) {
#ifdef HAVE_RECVMMSG
if (alloc_input_blocks(access) != 0) {
msg_Err(access, "Failed to allocate memory for input buffers");
break;
}
for(i = 0; i < VLEN; ++i) {
iovecs[i].iov_base = sys->input_blocks[i]->p_buffer;
iovecs[i].iov_len = RTSP_RECEIVE_BUFFER;
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
}
retval = recvmmsg(sock, msgs, VLEN, MSG_WAITFORONE, NULL);
if (retval == -1)
continue;
last_recv = mdate();
for(i = 0; i < retval; ++i) {
len = msgs[i].msg_len;
block = sys->input_blocks[i];
if (check_rtp_seq(access, block))
continue;
block->p_buffer += RTP_HEADER_SIZE;
block->i_buffer = len - RTP_HEADER_SIZE;
block_FifoPut(sys->fifo, block);
sys->input_blocks[i] = NULL;
}
#else
struct pollfd ufd;
ufd.fd = sock;
ufd.events = POLLIN;
if (poll(&ufd, 1, 20) == -1)
continue;
block = block_Alloc(RTSP_RECEIVE_BUFFER);
if (block == NULL) {
msg_Err(access, "Failed to allocate memory for input buffer");
break;
}
block_cleanup_push(block);
len = recv(sock, block->p_buffer, RTSP_RECEIVE_BUFFER, 0);
vlc_cleanup_pop();
if (len < RTP_HEADER_SIZE) {
block_Release(block);
continue;
}
if (check_rtp_seq(access, block)) {
block_Release(block);
continue;
}
last_recv = mdate();
block->p_buffer += RTP_HEADER_SIZE;
block->i_buffer = len - RTP_HEADER_SIZE;
block_FifoPut(sys->fifo, block);
#endif
if (sys->keepalive_interval > 0 && mdate() > next_keepalive) {
net_Printf(access, sys->tcp_sock,
"OPTIONS %s RTSP/1.0\r\n"
"CSeq: %d\r\n"
"Session: %s\r\n\r\n",
sys->control, sys->cseq++, sys->session_id);
if (rtsp_handle(access, NULL) != RTSP_RESULT_OK)
msg_Warn(access, "Failed to keepalive RTSP session");
next_keepalive = mdate() + sys->keepalive_interval * 1000 * 1000;
}
}
msg_Dbg(access, "timed out waiting for data...");
vlc_fifo_Lock(sys->fifo);
sys->woken = true;
vlc_fifo_Signal(sys->fifo);
vlc_fifo_Unlock(sys->fifo);
return NULL;
}
static block_t* satip_block(access_t *access) {
access_sys_t *sys = access->p_sys;
block_t *block;
vlc_fifo_Lock(sys->fifo);
while (vlc_fifo_IsEmpty(sys->fifo)) {
if (sys->woken)
break;
vlc_fifo_Wait(sys->fifo);
}
if ((block = vlc_fifo_DequeueUnlocked(sys->fifo)) == NULL)
access->info.b_eof = true;
sys->woken = false;
vlc_fifo_Unlock(sys->fifo);
return block;
}
static int satip_control(access_t *access, int i_query, va_list args) {
bool *pb_bool;
int64_t *pi_64;
switch(i_query)
{
case ACCESS_CAN_CONTROL_PACE:
case ACCESS_CAN_SEEK:
case ACCESS_CAN_PAUSE:
pb_bool = (bool*)va_arg(args, bool*);
*pb_bool = false;
break;
case ACCESS_GET_PTS_DELAY:
pi_64 = (int64_t*)va_arg(args, int64_t *);
*pi_64 = INT64_C(1000) * var_InheritInteger(access, "live-caching");
break;
default:
return VLC_EGENERIC;
}
return VLC_SUCCESS;
}
/* Bind two adjacent free ports, of which the first one is even (for RTP data)
* and the second is odd (RTCP). This is a requirement of the satip
* specification */
static int satip_bind_ports(access_t *access)
{
access_sys_t *sys = access->p_sys;
uint8_t rnd;
vlc_rand_bytes(&rnd, 1);
sys->udp_port = 9000 + (rnd * 2); /* randomly chosen, even start point */
while (sys->udp_sock < 0) {
sys->udp_sock = net_OpenDgram(access, "0.0.0.0", sys->udp_port, NULL,
0, IPPROTO_UDP);
if (sys->udp_sock < 0) {
if (sys->udp_port == 65534)
break;
sys->udp_port += 2;
continue;
}
sys->rtcp_sock = net_OpenDgram(access, "0.0.0.0", sys->udp_port + 1, NULL,
0, IPPROTO_UDP);
if (sys->rtcp_sock < 0) {
close(sys->udp_sock);
sys->udp_port += 2;
continue;
}
}
if (sys->udp_sock < 0) {
msg_Err(access, "Could not open two adjacent ports for RTP and RTCP data");
return VLC_EGENERIC;
}
return 0;
}
static int satip_open(access_t *access) {
access_sys_t *sys;
vlc_url_t url;
bool multicast = var_InheritBool(access, "satip-multicast");
access->p_sys = sys = calloc(1, sizeof(*sys));
if (sys == NULL)
return VLC_ENOMEM;
msg_Dbg(access, "try to open '%s'", access->psz_location);
sys->udp_sock = -1;
sys->rtcp_sock = -1;
/* convert url to lowercase, some famous m3u playlists for satip contain
* uppercase parameters while most (all?) satip servers do only understand
* parameters matching lowercase spelling as defined in the specification
* */
char *psz_lower_location = strdup(access->psz_location);
if (psz_lower_location == NULL)
goto error;
for (unsigned i = 0; i < strlen(psz_lower_location); i++)
psz_lower_location[i] = tolower(psz_lower_location[i]);
vlc_UrlParse(&url, psz_lower_location);
if (url.i_port <= 0)
url.i_port = RTSP_DEFAULT_PORT;
msg_Dbg(access, "connect to host '%s'", url.psz_host);
sys->tcp_sock = net_ConnectTCP(access, url.psz_host, url.i_port);
if (sys->tcp_sock < 0) {
msg_Err(access, "Failed to connect to RTSP server %s:%d",
url.psz_host, url.i_port);
goto error;
}
setsockopt (sys->tcp_sock, SOL_SOCKET, SO_KEEPALIVE, &(int){ 1 }, sizeof (int));
if (asprintf(&sys->content_base, "rtsp://%s:%d/", url.psz_host,
url.i_port) < 0) {
sys->content_base = NULL;
goto error;
}
sys->last_seq_nr = 0;
sys->keepalive_interval = (KEEPALIVE_INTERVAL - KEEPALIVE_MARGIN);
if (multicast) {
net_Printf(access, sys->tcp_sock,
"SETUP rtsp://%s RTSP/1.0\r\n"
"CSeq: %d\r\n"
"Transport: RTP/AVP;multicast\r\n\r\n",
psz_lower_location, sys->cseq++);
} else {
/* open UDP socket to acquire a free port to use */
if (satip_bind_ports(access))
goto error;
net_Printf(access, sys->tcp_sock,
"SETUP rtsp://%s RTSP/1.0\r\n"
"CSeq: %d\r\n"
"Transport: RTP/AVP;unicast;client_port=%d-%d\r\n\r\n",
psz_lower_location, sys->cseq++, sys->udp_port, sys->udp_port + 1);
}
bool interrupted = false;
if (rtsp_handle(access, &interrupted) != RTSP_RESULT_OK) {
msg_Err(access, "Failed to setup RTSP session");
goto error;
}
if (asprintf(&sys->control, "%sstream=%d", sys->content_base, sys->stream_id) < 0) {
sys->control = NULL;
goto error;
}
if (interrupted) {
msg_Warn(access, "SETUP was interrupted, abort startup");
goto error;
}
/* Extra sleep for compatibility with some satip servers, that
* can't handle PLAY right after SETUP */
if (vlc_msleep_i11e(50000) < 0)
goto error;
/* Open UDP socket for reading if not done */
if (multicast) {
sys->udp_sock = net_OpenDgram