Commit c74fb8e1 authored by Rémi Denis-Courmont's avatar Rémi Denis-Courmont

prefetch: stream filter for threaded prefetching/buffering

parent 48a01700
......@@ -75,6 +75,7 @@ Demuxers:
Stream filter:
* Added ARIB STD-B25 TS streams decoder
* Added stream prebuffering plugin
* Removed HTTP Live streaming stream filter
Audio output:
......
......@@ -288,6 +288,7 @@ $Id$
* podcast: podcast feed parser
* posterize: posterize video filter
* postproc: Video post processing filter
* prefetch: Stream prefetching stream filter
* projectm: visualisation using libprojectM
* ps: input module for MPEG PS decapsulation
* psychedelic: Psychedelic video filter
......
......@@ -14,6 +14,12 @@ if !HAVE_WIN32
stream_filter_LTLIBRARIES += libdecomp_plugin.la
endif
libprefetch_plugin_la_SOURCES = stream_filter/prefetch.c
libprefetch_plugin_la_LIBADD = $(LIBPTHREAD)
if !HAVE_WIN32
stream_filter_LTLIBRARIES += libprefetch_plugin.la
endif
libsmooth_plugin_la_SOURCES = \
stream_filter/smooth/smooth.c \
stream_filter/smooth/utils.c \
......
/*****************************************************************************
* prefetch.c: prefetchinging module for VLC
*****************************************************************************
* Copyright © 2015 Rémi Denis-Courmont
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#ifdef HAVE_MMAP
#include <sys/mman.h>
#ifndef MAP_ANONYMOUS
# define MAP_ANONYMOUS MAP_ANON
#endif
#else
#include <errno.h>
#define MAP_FAILED ((void *)-1)
#define mmap(a,l,p,f,d,o) \
((void)(a), (void)(l), (void)(d), (void)(o), errno = ENOMEM, MAP_FAILED)
#define munmap(a,l) \
((void)(a), (void)(l), errno = EINVAL, -1)
#define sysconf(a) 1
#endif
#include <vlc_common.h>
#include <vlc_plugin.h>
#include <vlc_stream.h>
#include <vlc_fs.h>
struct stream_sys_t
{
vlc_mutex_t lock;
vlc_cond_t wait_data;
vlc_cond_t wait_space;
vlc_thread_t thread;
bool eof;
bool error;
bool paused;
bool can_seek;
bool can_pace;
bool can_pause;
uint64_t size;
int64_t pts_delay;
char *content_type;
uint64_t buffer_offset;
uint64_t stream_offset;
size_t buffer_length;
size_t buffer_size;
char *buffer;
size_t read_size;
size_t seek_threshold;
};
static int ThreadRead(stream_t *stream, size_t length)
{
stream_sys_t *sys = stream->p_sys;
int canc = vlc_savecancel();
vlc_mutex_unlock(&sys->lock);
assert(length > 0);
char *p = sys->buffer + (sys->buffer_offset % sys->buffer_size)
+ sys->buffer_length;
ssize_t val = stream_Read(stream->p_source, p, length);
if (val < 0)
msg_Err(stream, "cannot read data (at offset %"PRIu64")",
sys->buffer_offset + sys->buffer_length);
if (val == 0)
msg_Dbg(stream, "end of stream");
vlc_mutex_lock(&sys->lock);
vlc_restorecancel(canc);
if (val < 0)
return -1;
if (val == 0)
sys->eof = true;
assert((size_t)val <= length);
sys->buffer_length += val;
assert(sys->buffer_length <= sys->buffer_size);
return 0;
}
static int ThreadSeek(stream_t *stream, uint64_t seek_offset)
{
stream_sys_t *sys = stream->p_sys;
int canc = vlc_savecancel();
vlc_mutex_unlock(&sys->lock);
int val = stream_Seek(stream->p_source, seek_offset);
if (val != VLC_SUCCESS)
msg_Err(stream, "cannot seek (to offset %"PRIu64")", seek_offset);
vlc_mutex_lock(&sys->lock);
vlc_restorecancel(canc);
if (val != VLC_SUCCESS)
return -1;
sys->buffer_offset = seek_offset;
sys->buffer_length = 0;
sys->eof = false;
return 0;
}
#define MAX_READ 65536
#define SEEK_THRESHOLD MAX_READ
static void *Thread(void *data)
{
stream_t *stream = data;
stream_sys_t *sys = stream->p_sys;
bool paused = false;
vlc_mutex_lock(&sys->lock);
mutex_cleanup_push(&sys->lock);
for (;;)
{
if (paused != sys->paused)
{ /* Update pause state */
int canc = vlc_savecancel();
paused = sys->paused;
vlc_mutex_unlock(&sys->lock);
stream_Control(stream->p_source, STREAM_SET_PAUSE_STATE, paused);
vlc_mutex_lock(&sys->lock);
vlc_restorecancel(canc);
continue;
}
if (paused)
{ /* Wait for resumption */
vlc_cond_wait(&sys->wait_space, &sys->lock);
continue;
}
if (sys->stream_offset < sys->buffer_offset)
{ /* Need to seek backward */
if (ThreadSeek(stream, sys->stream_offset))
break;
continue;
}
if (sys->eof)
{ /* At EOF, wait for backward seek */
vlc_cond_wait(&sys->wait_space, &sys->lock);
continue;
}
assert(sys->stream_offset >= sys->buffer_offset);
/* As long as there is space, the buffer will retain already read
* ("historical") data. The data can be used if/when seeking backward.
* Unread data is however given precedence if the buffer is full. */
uint64_t history = sys->stream_offset - sys->buffer_offset;
if (sys->can_seek
&& history >= (sys->buffer_length + sys->seek_threshold))
{ /* Large skip: seek forward */
if (ThreadSeek(stream, sys->stream_offset))
break;
continue;
}
assert(sys->buffer_size >= sys->buffer_length);
size_t unused = sys->buffer_size - sys->buffer_length;
if (unused == 0)
{ /* Buffer is full */
if (history == 0)
{ /* Wait for data to be read */
vlc_cond_wait(&sys->wait_space, &sys->lock);
continue;
}
/* Discard some historical data to make room. */
size_t discard = sys->read_size;
if (discard > history)
discard = history;
/* discard <= sys->read_size <= sys->buffer_size = ...
* ... unused + sys->buffer_length = 0 + sys->buffer_length */
assert(discard <= sys->buffer_length);
sys->buffer_offset += discard;
sys->buffer_length -= discard;
history -= discard;
unused = discard;
}
/* Some streams cannot return a short data count and just wait for all
* requested data to become available (e.g. regular files). So we have
* to limit the data read in a single operation to avoid blocking for
* too long. */
if (unused > sys->read_size)
unused = sys->read_size;
if (ThreadRead(stream, unused))
break;
vlc_cond_signal(&sys->wait_data);
}
vlc_cleanup_pop();
sys->error = true;
vlc_cond_signal(&sys->wait_data);
vlc_mutex_unlock(&sys->lock);
return NULL;
}
static int Seek(stream_t *stream, uint64_t offset)
{
stream_sys_t *sys = stream->p_sys;
vlc_mutex_lock(&sys->lock);
if (sys->stream_offset != offset)
{
sys->stream_offset = offset;
vlc_cond_signal(&sys->wait_space);
}
vlc_mutex_unlock(&sys->lock);
return 0;
}
static size_t BufferLevel(const stream_t *stream, bool *eof)
{
stream_sys_t *sys = stream->p_sys;
*eof = false;
if (sys->stream_offset < sys->buffer_offset)
return 0;
if ((sys->stream_offset - sys->buffer_offset) >= sys->buffer_length)
{
*eof = sys->eof;
return 0;
}
return sys->buffer_offset + sys->buffer_length - sys->stream_offset;
}
static ssize_t Read(stream_t *stream, void *buf, size_t buflen)
{
stream_sys_t *sys = stream->p_sys;
size_t copy;
bool eof;
if (buf == NULL)
{
Seek(stream, sys->stream_offset + buflen);
return buflen;
}
vlc_mutex_lock(&sys->lock);
while ((copy = BufferLevel(stream, &eof)) == 0 && !eof)
{
if (sys->error)
{
vlc_mutex_unlock(&sys->lock);
return -1;
}
vlc_cond_wait(&sys->wait_data, &sys->lock);
}
char *p = sys->buffer + (sys->stream_offset % sys->buffer_size);
if (copy > buflen)
copy = buflen;
if (copy > 0)
{
memcpy(buf, p, copy);
sys->stream_offset += copy;
vlc_cond_signal(&sys->wait_space);
}
vlc_mutex_unlock(&sys->lock);
return copy;
}
static input_item_t *ReadDir(stream_t *stream)
{
(void) stream;
return NULL;
}
static int Control(stream_t *stream, int query, va_list args)
{
stream_sys_t *sys = stream->p_sys;
switch (query)
{
case STREAM_CAN_SEEK:
*va_arg(args, bool *) = sys->can_seek;
break;
case STREAM_CAN_FASTSEEK:
*va_arg(args, bool *) = false;
break;
case STREAM_CAN_PAUSE:
*va_arg(args, bool *) = sys->can_pause;
break;
case STREAM_CAN_CONTROL_PACE:
*va_arg (args, bool *) = sys->can_pace;
break;
case STREAM_SET_POSITION:
return Seek(stream, va_arg(args, uint64_t));
case STREAM_GET_POSITION:
*va_arg (args, uint64_t *) = sys->stream_offset;
break;
case STREAM_IS_DIRECTORY:
return VLC_EGENERIC;
case STREAM_GET_SIZE:
*va_arg(args, uint64_t *) = sys->size;
break;
case STREAM_GET_PTS_DELAY:
*va_arg(args, int64_t *) = sys->pts_delay;
break;
case STREAM_GET_TITLE_INFO:
case STREAM_GET_TITLE:
case STREAM_GET_SEEKPOINT:
case STREAM_GET_META:
return VLC_EGENERIC;
case STREAM_GET_CONTENT_TYPE:
if (sys->content_type == NULL)
return VLC_EGENERIC;
*va_arg(args, char **) = strdup(sys->content_type);
return VLC_SUCCESS;
case STREAM_GET_SIGNAL:
return VLC_EGENERIC;
case STREAM_SET_PAUSE_STATE:
{
bool paused = va_arg(args, unsigned);
vlc_mutex_lock(&sys->lock);
sys->paused = paused;
vlc_cond_signal(&sys->wait_space);
vlc_mutex_unlock (&sys->lock);
break;
}
case STREAM_SET_TITLE:
case STREAM_SET_SEEKPOINT:
case STREAM_SET_PRIVATE_ID_STATE:
case STREAM_SET_PRIVATE_ID_CA:
case STREAM_GET_PRIVATE_ID_STATE:
return VLC_EGENERIC;
default:
msg_Err(stream, "unimplemented query (%d) in control", query);
return VLC_EGENERIC;
}
return VLC_SUCCESS;
}
static int Open(vlc_object_t *obj)
{
stream_t *stream = (stream_t *)obj;
bool fast_seek;
/* For local files, the operating system is likely to do a better work at
* caching/prefetching. Also, prefetching with this module could cause
* undesirable high load at start-up. Lastly, local files may require
* support for title/seekpoint and meta control requests. */
stream_Control(stream->p_source, STREAM_CAN_FASTSEEK, &fast_seek);
if (fast_seek)
return VLC_EGENERIC;
/* PID-filtered streams are not suitable for prefetching, as they would
* suffer excessive latency to enable a PID. DVB would also require support
* for the signal level and Conditional Access controls.
* TODO? For seekable streams, a forced could work around the problem. */
if (stream_Control(stream->p_source, STREAM_GET_PRIVATE_ID_STATE, 0,
&(bool){ false }) == VLC_SUCCESS)
return VLC_EGENERIC;
stream_sys_t *sys = malloc(sizeof (*sys));
if (unlikely(sys == NULL))
return VLC_ENOMEM;
stream->pf_read = Read;
stream->pf_control = Control;
stream_Control(stream->p_source, STREAM_CAN_SEEK, &sys->can_seek);
stream_Control(stream->p_source, STREAM_CAN_PAUSE, &sys->can_pause);
stream_Control(stream->p_source, STREAM_CAN_CONTROL_PACE, &sys->can_pace);
stream_Control(stream->p_source, STREAM_GET_SIZE, &sys->size);
stream_Control(stream->p_source, STREAM_GET_PTS_DELAY, &sys->pts_delay);
if (stream_Control(stream->p_source, STREAM_GET_CONTENT_TYPE,
&sys->content_type))
sys->content_type = NULL;
sys->eof = false;
sys->error = false;
sys->paused = false;
sys->buffer_offset = 0;
sys->stream_offset = 0;
sys->buffer_length = 0;
sys->buffer_size = var_InheritInteger(obj, "prefetch-buffer-size") << 10u;
sys->read_size = var_InheritInteger(obj, "prefetch-read-size");
sys->seek_threshold = var_InheritInteger(obj, "prefetch-seek-threshold");
uint64_t size = stream_Size(stream->p_source);
if (size > 0)
{ /* No point allocating a buffer larger than the source stream */
if (sys->buffer_size > size)
sys->buffer_size = size;
if (sys->read_size > size)
sys->read_size = size;
}
if (sys->buffer_size < sys->read_size)
sys->buffer_size = sys->read_size;
/* Round up to a multiple of the page size */
long page_size = sysconf(_SC_PAGESIZE);
sys->buffer_size += page_size - 1;
sys->buffer_size &= ~(page_size - 1);
sys->buffer = mmap(NULL, 2 * sys->buffer_size, PROT_NONE,
MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
if (sys->buffer == MAP_FAILED)
goto error;
int fd = vlc_memfd();
if (fd == -1)
goto error;
if (ftruncate(fd, sys->buffer_size)
|| mmap(sys->buffer, sys->buffer_size,
PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0) == MAP_FAILED
|| mmap(sys->buffer + sys->buffer_size, sys->buffer_size,
PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0) == MAP_FAILED)
{
close(fd);
goto error;
}
close(fd);
vlc_mutex_init(&sys->lock);
vlc_cond_init(&sys->wait_data);
vlc_cond_init(&sys->wait_space);
stream->p_sys = sys;
if (vlc_clone(&sys->thread, Thread, stream, VLC_THREAD_PRIORITY_LOW))
{
vlc_cond_destroy(&sys->wait_space);
vlc_cond_destroy(&sys->wait_data);
vlc_mutex_destroy(&sys->lock);
goto error;
}
msg_Dbg(stream, "using %zu bytes buffer, %zu bytes read",
sys->buffer_size, sys->read_size);
stream->pf_read = Read;
stream->pf_readdir = ReadDir;
stream->pf_control = Control;
return VLC_SUCCESS;
error:
if (sys->buffer != MAP_FAILED)
munmap(sys->buffer, 2 * sys->buffer_size);
free(sys);
return VLC_ENOMEM;
}
/**
* Releases allocate resources.
*/
static void Close (vlc_object_t *obj)
{
stream_t *stream = (stream_t *)obj;
stream_sys_t *sys = stream->p_sys;
vlc_cancel(sys->thread);
vlc_join(sys->thread, NULL);
vlc_cond_destroy(&sys->wait_space);
vlc_cond_destroy(&sys->wait_data);
vlc_mutex_destroy(&sys->lock);
munmap(sys->buffer, 2 * sys->buffer_size);
free(sys->content_type);
free(sys);
}
vlc_module_begin()
set_category(CAT_INPUT)
set_subcategory(SUBCAT_INPUT_STREAM_FILTER)
set_capability("stream_filter", 0)
set_description(N_("Stream prefetch filter"))
set_callbacks(Open, Close)
add_integer("prefetch-buffer-size", 1 << 14, N_("Buffer size"),
N_("Prefetch buffer size (KiB)"), false)
change_integer_range(4, 1 << 20)
add_integer("prefetch-read-size", 1 << 14, N_("Read size"),
N_("Prefetch background read size (bytes)"), true)
change_integer_range(1, 1 << 29)
add_integer("prefetch-seek-threshold", 1 << 14, N_("Seek threshold"),
N_("Prefetch forward seek threshold (bytes)"), true)
change_integer_range(0, UINT64_C(1) << 60)
vlc_module_end()
......@@ -1035,6 +1035,7 @@ modules/stream_filter/cache_block.c
modules/stream_filter/cache_read.c
modules/stream_filter/decomp.c
modules/stream_filter/hds/hds.c
modules/stream_filter/prefetch.c
modules/stream_filter/record.c
modules/stream_filter/smooth/smooth.c
modules/stream_out/autodel.c
......
......@@ -396,7 +396,7 @@ stream_t *stream_AccessNew(vlc_object_t *parent, input_thread_t *input,
if (sys->access->pf_read != NULL)
{
s->pf_read = AStreamReadStream;
cachename = "cache_read";
cachename = "prefetch,cache_read";
}
else
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment