Commit 74b4f0f4 authored by Romain Vimont's avatar Romain Vimont
Browse files

preparser: use the new executor API

Replace the background_worker by an executor.
parent 1169d142
......@@ -24,8 +24,9 @@
#include <vlc_common.h>
#include <vlc_atomic.h>
#include <vlc_executor.h>
#include <vlc_tick.h>
#include "misc/background_worker.h"
#include "input/input_interface.h"
#include "input/input_internal.h"
#include "preparser.h"
......@@ -35,218 +36,236 @@ struct input_preparser_t
{
vlc_object_t* owner;
input_fetcher_t* fetcher;
struct background_worker* worker;
vlc_executor_t *executor;
vlc_tick_t default_timeout;
atomic_bool deactivated;
};
typedef struct input_preparser_req_t
struct input_preparser_task
{
input_preparser_t *preparser;
input_item_t *item;
input_item_meta_request_option_t options;
const input_preparser_callbacks_t *cbs;
void *userdata;
vlc_atomic_rc_t rc;
} input_preparser_req_t;
typedef struct input_preparser_task_t
{
input_preparser_req_t *req;
input_preparser_t* preparser;
int preparse_status;
input_item_parser_id_t *parser;
atomic_int state;
atomic_bool done;
} input_preparser_task_t;
static input_preparser_req_t *ReqCreate(input_item_t *item,
input_item_meta_request_option_t options,
const input_preparser_callbacks_t *cbs,
void *userdata)
vlc_mutex_t lock;
vlc_cond_t cond_ended;
bool preparse_ended;
int preparse_status;
bool fetch_ended;
atomic_bool interrupted;
};
static struct input_preparser_task *
TaskNew(input_preparser_t *preparser, input_item_t *item,
input_item_meta_request_option_t options,
const input_preparser_callbacks_t *cbs, void *userdata)
{
input_preparser_req_t *req = malloc(sizeof(*req));
if (unlikely(!req))
struct input_preparser_task *task = malloc(sizeof(*task));
if (!task)
return NULL;
req->item = item;
req->options = options;
req->cbs = cbs;
req->userdata = userdata;
vlc_atomic_rc_init(&req->rc);
task->preparser = preparser;
task->item = item;
task->options = options;
task->cbs = cbs;
task->userdata = userdata;
input_item_Hold(item);
return req;
task->parser = NULL;
vlc_mutex_init(&task->lock);
vlc_cond_init(&task->cond_ended);
task->preparse_ended = false;
task->preparse_status = ITEM_PREPARSE_SKIPPED;
task->fetch_ended = false;
atomic_init(&task->interrupted, false);
return task;
}
static void ReqHold(input_preparser_req_t *req)
static void
TaskDelete(struct input_preparser_task *task)
{
vlc_atomic_rc_inc(&req->rc);
input_item_Release(task->item);
free(task);
}
static void ReqRelease(input_preparser_req_t *req)
static void
OnParserEnded(input_item_t *item, int status, void *task_)
{
if (vlc_atomic_rc_dec(&req->rc))
{
input_item_Release(req->item);
free(req);
}
VLC_UNUSED(item);
struct input_preparser_task *task = task_;
if (atomic_load(&task->interrupted))
/*
* On interruption, the call to input_item_parser_id_Release() may
* trigger this "parser ended" callback.
*
* It must be ignored for two reasons:
* - the task lock is already held by this thread;
* - the preparse_status, already set, must not be overwritten.
*/
return;
vlc_mutex_lock(&task->lock);
assert(!task->preparse_ended);
task->preparse_status = status == VLC_SUCCESS ? ITEM_PREPARSE_DONE
: ITEM_PREPARSE_FAILED;
task->preparse_ended = true;
vlc_mutex_unlock(&task->lock);
vlc_cond_signal(&task->cond_ended);
}
static void OnParserEnded(input_item_t *item, int status, void *task_)
static void
OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree,
void *task_)
{
VLC_UNUSED(item);
input_preparser_task_t* task = task_;
struct input_preparser_task *task = task_;
atomic_store( &task->state, status );
atomic_store( &task->done, true );
background_worker_RequestProbe( task->preparser->worker );
if (task->cbs && task->cbs->on_subtree_added)
task->cbs->on_subtree_added(task->item, subtree, task->userdata);
}
static void OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree,
void *task_)
static void
OnArtFetchEnded(input_item_t *item, bool fetched, void *userdata)
{
VLC_UNUSED(item);
input_preparser_task_t* task = task_;
input_preparser_req_t *req = task->req;
VLC_UNUSED(fetched);
struct input_preparser_task *task = userdata;
vlc_mutex_lock(&task->lock);
assert(!task->fetch_ended);
task->fetch_ended = true;
vlc_mutex_unlock(&task->lock);
if (req->cbs && req->cbs->on_subtree_added)
req->cbs->on_subtree_added(req->item, subtree, req->userdata);
vlc_cond_signal(&task->cond_ended);
}
static int PreparserOpenInput( void* preparser_, void* req_, void** out )
static const input_fetcher_callbacks_t input_fetcher_callbacks = {
.on_art_fetch_ended = OnArtFetchEnded,
};
static void
ParseLocked(struct input_preparser_task *task)
{
input_preparser_t* preparser = preparser_;
input_preparser_req_t *req = req_;
input_preparser_task_t* task = malloc( sizeof *task );
vlc_mutex_assert(&task->lock);
if( unlikely( !task ) )
goto error;
if (atomic_load(&task->interrupted))
return;
static const input_item_parser_cbs_t cbs = {
.on_ended = OnParserEnded,
.on_subtree_added = OnParserSubtreeAdded,
};
atomic_init( &task->state, VLC_ETIMEOUT );
atomic_init( &task->done, false );
task->preparser = preparser_;
task->req = req;
task->preparse_status = -1;
task->parser = input_item_Parse( req->item, preparser->owner, &cbs,
task );
if( !task->parser )
goto error;
*out = task;
vlc_object_t *obj = task->preparser->owner;
task->parser = input_item_Parse(task->item, obj, &cbs, task);
if (!task->parser)
{
task->preparse_status = ITEM_PREPARSE_FAILED;
return;
}
return VLC_SUCCESS;
/* Wait until the end of parsing */
while (!task->preparse_ended && !atomic_load(&task->interrupted))
vlc_cond_wait(&task->cond_ended, &task->lock);
error:
free( task );
if (req->cbs && req->cbs->on_preparse_ended)
req->cbs->on_preparse_ended(req->item, ITEM_PREPARSE_FAILED, req->userdata);
return VLC_EGENERIC;
/* This call also interrupts the parsing if it is still running */
input_item_parser_id_Release(task->parser);
}
static int PreparserProbeInput( void* preparser_, void* task_ )
static void
FetchLocked(struct input_preparser_task *task)
{
input_preparser_task_t* task = task_;
return atomic_load( &task->done );
VLC_UNUSED( preparser_ );
}
vlc_mutex_assert(&task->lock);
static void on_art_fetch_ended(input_item_t *item, bool fetched, void *userdata)
{
VLC_UNUSED(item);
VLC_UNUSED(fetched);
input_preparser_task_t *task = userdata;
input_preparser_req_t *req = task->req;
if (atomic_load(&task->interrupted))
return;
input_item_SetPreparsed(req->item, true);
input_fetcher_t *fetcher = task->preparser->fetcher;
if (!fetcher || !(task->options & META_REQUEST_OPTION_FETCH_ANY))
return;
if (req->cbs && req->cbs->on_preparse_ended)
req->cbs->on_preparse_ended(req->item, task->preparse_status, req->userdata);
int ret =
input_fetcher_Push(fetcher, task->item,
task->options & META_REQUEST_OPTION_FETCH_ANY,
&input_fetcher_callbacks, task);
if (ret != VLC_SUCCESS)
return;
ReqRelease(req);
free(task);
/* Wait until the end of fetching (fetching is not interruptible) */
while (!task->fetch_ended)
vlc_cond_wait(&task->cond_ended, &task->lock);
}
static const input_fetcher_callbacks_t input_fetcher_callbacks = {
.on_art_fetch_ended = on_art_fetch_ended,
};
static void PreparserCloseInput( void* preparser_, void* task_ )
static void
RunnableRun(void *userdata)
{
input_preparser_task_t* task = task_;
input_preparser_req_t *req = task->req;
struct input_preparser_task *task = userdata;
input_preparser_t* preparser = preparser_;
input_item_t* item = req->item;
vlc_mutex_lock(&task->lock);
ParseLocked(task);
FetchLocked(task);
vlc_mutex_unlock(&task->lock);
int status;
switch( atomic_load( &task->state ) )
{
case VLC_SUCCESS:
status = ITEM_PREPARSE_DONE;
break;
case VLC_ETIMEOUT:
status = ITEM_PREPARSE_TIMEOUT;
break;
default:
status = ITEM_PREPARSE_FAILED;
break;
}
if (!atomic_load(&task->interrupted))
input_item_SetPreparsed(task->item, true);
}
input_item_parser_id_Release( task->parser );
static void
RunnableInterrupt(void *userdata)
{
struct input_preparser_task *task = userdata;
if( preparser->fetcher && (req->options & META_REQUEST_OPTION_FETCH_ANY) )
{
task->preparse_status = status;
ReqHold(task->req);
if (!input_fetcher_Push(preparser->fetcher, item,
req->options & META_REQUEST_OPTION_FETCH_ANY,
&input_fetcher_callbacks, task))
{
return;
}
ReqRelease(task->req);
}
assert(!atomic_load(&task->interrupted));
atomic_store(&task->interrupted, true);
vlc_cond_signal(&task->cond_ended);
}
free(task);
static void
RunnableOnFinished(void *userdata)
{
struct input_preparser_task *task = userdata;
input_item_SetPreparsed( item, true );
if (req->cbs && req->cbs->on_preparse_ended)
req->cbs->on_preparse_ended(req->item, status, req->userdata);
}
if (task->cbs && task->cbs->on_preparse_ended)
task->cbs->on_preparse_ended(task->item, task->preparse_status,
task->userdata);
static void ReqHoldVoid(void *item) { ReqHold(item); }
static void ReqReleaseVoid(void *item) { ReqRelease(item); }
TaskDelete(task);
}
input_preparser_t* input_preparser_New( vlc_object_t *parent )
{
input_preparser_t* preparser = malloc( sizeof *preparser );
if (!preparser)
return NULL;
struct background_worker_config conf = {
.default_timeout = VLC_TICK_FROM_MS(var_InheritInteger( parent, "preparse-timeout" )),
.max_threads = var_InheritInteger( parent, "preparse-threads" ),
.pf_start = PreparserOpenInput,
.pf_probe = PreparserProbeInput,
.pf_stop = PreparserCloseInput,
.pf_release = ReqReleaseVoid,
.pf_hold = ReqHoldVoid
};
if( likely( preparser ) )
preparser->worker = background_worker_New( preparser, &conf );
int max_threads = var_InheritInteger(parent, "preparse-threads");
if (max_threads < 1)
max_threads = 1;
if( unlikely( !preparser || !preparser->worker ) )
preparser->executor = vlc_executor_New(max_threads);
if (!preparser->executor)
{
free( preparser );
free(preparser);
return NULL;
}
preparser->default_timeout =
VLC_TICK_FROM_MS(var_InheritInteger(parent, "preparse-timeout"));
if (preparser->default_timeout < 0)
preparser->default_timeout = 0;
preparser->owner = parent;
preparser->fetcher = input_fetcher_New( parent );
atomic_init( &preparser->deactivated, false );
......@@ -260,7 +279,7 @@ input_preparser_t* input_preparser_New( vlc_object_t *parent )
void input_preparser_Push( input_preparser_t *preparser,
input_item_t *item, input_item_meta_request_option_t i_options,
const input_preparser_callbacks_t *cbs, void *cbs_userdata,
int timeout, void *id )
int timeout_ms, void *id )
{
if( atomic_load( &preparser->deactivated ) )
return;
......@@ -287,14 +306,22 @@ void input_preparser_Push( input_preparser_t *preparser,
return;
}
struct input_preparser_req_t *req = ReqCreate(item, i_options,
cbs, cbs_userdata);
struct input_preparser_task *task =
TaskNew(preparser, item, i_options, cbs, cbs_userdata);
static const struct vlc_runnable runnable = {
.run = RunnableRun,
.interrupt = RunnableInterrupt,
.on_finished = RunnableOnFinished,
};
if (background_worker_Push(preparser->worker, req, id, timeout))
if (req->cbs && cbs->on_preparse_ended)
vlc_tick_t timeout = timeout_ms == -1 ? preparser->default_timeout
: VLC_TICK_FROM_MS(timeout_ms);
if (vlc_executor_Submit(preparser->executor, &runnable, task, id, timeout))
{
if (cbs && cbs->on_preparse_ended)
cbs->on_preparse_ended(item, ITEM_PREPARSE_FAILED, cbs_userdata);
ReqRelease(req);
TaskDelete(task);
}
}
void input_preparser_fetcher_Push( input_preparser_t *preparser,
......@@ -308,18 +335,18 @@ void input_preparser_fetcher_Push( input_preparser_t *preparser,
void input_preparser_Cancel( input_preparser_t *preparser, void *id )
{
background_worker_Cancel( preparser->worker, id );
vlc_executor_Cancel(preparser->executor, id);
}
void input_preparser_Deactivate( input_preparser_t* preparser )
{
atomic_store( &preparser->deactivated, true );
background_worker_Cancel( preparser->worker, NULL );
input_preparser_Cancel(preparser, NULL);
}
void input_preparser_Delete( input_preparser_t *preparser )
{
background_worker_Delete( preparser->worker );
vlc_executor_Delete(preparser->executor);
if( preparser->fetcher )
input_fetcher_Delete( preparser->fetcher );
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment