Commit 1169d142 authored by Romain Vimont's avatar Romain Vimont
Browse files

executor: introduce new executor API

Introduce a new API to execute "runnables". The execution can be
automatically interrupted on timeout or via an explicit cancelation
request.
parent bf38cd15
/*****************************************************************************
* vlc_executor.h
*****************************************************************************
* Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifndef VLC_EXECUTOR_H
#define VLC_EXECUTOR_H
#include <vlc_common.h>
#include <vlc_tick.h>
# ifdef __cplusplus
extern "C" {
# endif
/** Executor type (opaque) */
typedef struct vlc_executor vlc_executor_t;
/**
* A Runnable is intended to be run from another thread by an executor.
*/
struct vlc_runnable {
/**
* This function is to be executed by a vlc_executor_t.
*
* It must implement the actions (arbitrarily long) to execute from an
* executor thread, synchronously. As soon as run() returns, the execution
* of this runnable is complete.
*
* After the runnable is submitted to an executor via
* vlc_executor_Submit(), the run() function is executed at most once (zero
* if the execution is canceled before it was started).
*
* It may not be NULL.
*
* \param userdata the userdata provided to vlc_executor_Submit()
*/
void (*run)(void *userdata);
/**
* This function attempts to interrupt the execution of run().
*
* If not NULL, it may be called on vlc_executor_Cancel() or on timeout.
*
* It is called from a thread different from the one executing run(). It is
* free to do any actions to "interrupt" the execution of run() (set a
* flag, close a file descriptor, etc.).
*
* The runnable will be considered "finished" once run() actually
* terminates.
*
* It should be quick, not to block the interruption of other runnables.
*
* \param userdata the userdata provided to vlc_executor_Submit()
*/
void (*interrupt)(void *userdata);
/**
* This function notifies the end of the execution.
*
* If not NULL, it is called either:
* - when run() terminates;
* - when the task is canceled before run() is called.
*
* In other words, it is always called in the end if vlc_executor_Submit()
* returns VLC_SUCCESS.
*
* \param userdata the userdata provided to vlc_executor_Submit()
*/
void (*on_finished)(void *userdata);
};
/**
* Create a new executor.
*
* \param max_threads the maximum number of threads used to execute runnables
* \return a pointer to a new executor, or NULL if an error occurred
*/
VLC_API vlc_executor_t *
vlc_executor_New(unsigned max_threads);
/**
* Delete an executor.
*
* Cancel all the tasks, wait for all the threads to complete, and delete the
* executor instance.
*
* \param executor the executor
*/
VLC_API void
vlc_executor_Delete(vlc_executor_t *executor);
/**
* Submit a runnable for execution.
*
* The struct vlc_runnable is not copied, it must exist until the end of the
* execution (the user is expected to pass a pointer to a static const
* structure).
*
* An id may optionally be provided in order to explicitly cancel the task
* later with vlc_executor_Cancel().
*
* A timeout may optionally be provided to interrupt the execution after some
* delay. The way the interruption is actually handled must be provided by the
* user via the runnable callback "interrupt". It is an error to provide a
* timeout without an interrupt callback.
*
* Here is a simple example (without interruption):
*
* \code{c}
* static void Run(void *userdata)
* {
* char *str = userdata;
* printf("start of %s\n", str);
* vlc_tick_sleep(VLC_TICK_FROM_SEC(3));
* printf("end of %s\n", str);
* }
*
* static void OnFinished(void *userdata)
* {
* free(userdata);
* }
*
* static const struct vlc_runnable runnable = {
* .run = Run,
* .on_finished = OnFinished,
* };
*
* void foo(vlc_executor_t *executor, const char *name)
* {
* // error handling replaced by assertions for brevity
* char *str = strdup(name);
* assert(str);
* int ret = vlc_executor_Submit(executor, &runnable, str, NULL, 0);
* assert(ret == VLC_SUCCESS);
* }
* \endcode
*
* \param executor the executor
* \param runnable the task to run
* \param userdata the userdata to pass to all runnable callbacks
* \param id a user-provided id to associate to the task, used to cancel
* the execution (may be NULL)
* \param timeout the delay before the task is automatically interrupted
* (0 for no timeout, negative values are illegal)
* \return VLC_SUCCESS on success, another value on error
*/
VLC_API int
vlc_executor_Submit(vlc_executor_t *executor,
const struct vlc_runnable *runnable, void *userdata,
void *id, vlc_tick_t timeout);
/**
* Cancel all submitted tasks having the specified id.
*
* If id is NULL, then cancel all tasks.
*
* \param executor the executor
* \param id the id of the tasks to cancel (NULL for all tasks)
*/
VLC_API void
vlc_executor_Cancel(vlc_executor_t *executor, void *id);
# ifdef __cplusplus
}
# endif
#endif
......@@ -350,6 +350,7 @@ libvlccore_la_SOURCES = \
misc/actions.c \
misc/background_worker.c \
misc/background_worker.h \
misc/executor.c \
misc/md5.c \
misc/probe.c \
misc/rand.c \
......
......@@ -958,3 +958,7 @@ vlc_video_context_GetType
vlc_video_context_GetPrivate
vlc_video_context_Hold
vlc_video_context_HoldDevice
vlc_executor_New
vlc_executor_Delete
vlc_executor_Execute
vlc_executor_Cancel
/*****************************************************************************
* misc/executor.c
*****************************************************************************
* Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <vlc_executor.h>
#include <vlc_atomic.h>
#include <vlc_list.h>
#include <vlc_threads.h>
#include "libvlc.h"
/**
* An executor task is created on every call to vlc_executor_Submit().
*
* It contains the user-provided data (runnable, userdata, timeout) and the
* current execution state.
*/
struct vlc_executor_task {
/** Node of vlc_executor.queue list */
struct vlc_list node;
/** Task id, may be NULL (provided by the user) */
void *id;
/** Timeout, 0 for no timeout, negative is illegal (provided by the user) */
vlc_tick_t timeout;
/**
* The task is "finished" if either:
* - the run() callback has completed, or
* - the run() callback has not been called and the task is canceled.
*/
bool finished;
/**
* Set to true if the task has been canceled.
*
* It is possible that the task is canceled but not finished ("cancel" has
* been requested while run() was running, but run() is not completed yet).
*/
bool canceled;
/** Date when run() is called, VLC_TICK_INVALID if not started. */
vlc_tick_t start_date;
/** The associated runnable */
const struct vlc_runnable *runnable;
/** The user data passed to all runnable callbacks */
void *userdata;
/** Refcount */
vlc_atomic_rc_t rc;
};
/**
* An executor can spawn several threads.
*
* This structure contains the data specific to one thread.
*/
struct vlc_executor_thread {
/** Node of vlc_executor.threads list */
struct vlc_list node;
/** The executor owning the thread */
vlc_executor_t *owner;
/** The system thread */
vlc_thread_t thread;
/** The current task executed by the thread, NULL if none */
struct vlc_executor_task *current_task;
};
/**
* The executor (also vlc_executor_t, exposed as opaque type in the public
* header).
*/
struct vlc_executor {
vlc_mutex_t lock;
/** Maximum number of threads to run the tasks */
unsigned max_threads;
/** List of active vlc_executor_thread instances */
struct vlc_list threads;
/** Thread count (in a separate field to quickly compare to max_threads) */
unsigned nthreads;
/* Number of tasks requested but not finished (neither completed or
* canceled) */
unsigned unfinished;
/** Queue of vlc_executor_task_t */
struct vlc_list queue;
/** Wait for the queue to be non-empty */
vlc_cond_t queue_wait;
/** True if the interrupt thread is started */
bool has_interrupt_thread;
/** Thread handling interruptions */
vlc_thread_t interrupt_thread;
/** Wait for the next task interruption deadline */
vlc_cond_t interrupt_wait;
/** True if executor deletion is requested */
bool closing;
};
vlc_executor_t *
vlc_executor_New(unsigned max_threads)
{
assert(max_threads);
vlc_executor_t *executor = malloc(sizeof(*executor));
if (!executor)
return NULL;
vlc_mutex_init(&executor->lock);
executor->max_threads = max_threads;
executor->nthreads = 0;
executor->unfinished = 0;
vlc_list_init(&executor->threads);
vlc_list_init(&executor->queue);
vlc_cond_init(&executor->queue_wait);
vlc_cond_init(&executor->interrupt_wait);
executor->closing = false;
executor->has_interrupt_thread = false;
return executor;
}
static struct vlc_executor_task *
TaskNew(const struct vlc_runnable *runnable, void *userdata, void *id,
vlc_tick_t timeout)
{
struct vlc_executor_task *task = malloc(sizeof(*task));
if (!task)
return NULL;
task->id = id;
task->timeout = timeout;
task->finished = false;
task->canceled = false;
task->start_date = VLC_TICK_INVALID;
task->runnable = runnable;
task->userdata = userdata;
vlc_atomic_rc_init(&task->rc);
return task;
}
static void
TaskHold(struct vlc_executor_task *task)
{
vlc_atomic_rc_inc(&task->rc);
}
static void
TaskRelease(struct vlc_executor_task *task)
{
if (vlc_atomic_rc_dec(&task->rc))
free(task);
}
static void
TerminateTask(vlc_executor_t *executor, struct vlc_executor_task *task)
{
vlc_mutex_assert(&executor->lock);
assert(!task->finished);
task->finished = true;
if (task->runnable->on_finished)
task->runnable->on_finished(task->userdata);
assert(executor->unfinished > 0);
--executor->unfinished;
TaskRelease(task);
}
static struct vlc_executor_task *
FindFirstTaskToInterrupt(vlc_executor_t *executor)
{
vlc_mutex_assert(&executor->lock);
/* XXX To improve execution complexity, we could keep the interruptible
* tasks in a priority queue. But there are typically only few threads, and
* this is not called more than once per task, so keep things simple, even
* if it's not optimal. */
struct vlc_executor_task *first = NULL;
vlc_tick_t first_deadline = VLC_TICK_INVALID;
struct vlc_executor_thread *thread;
vlc_list_foreach(thread, &executor->threads, node)
{
struct vlc_executor_task *task = thread->current_task;
/* Only consider tasks with a timeout, which are not already canceled */
if (task && task->timeout && !task->canceled)
{
/* If there is a current task, then it is necessarily started */
assert(task->start_date != VLC_TICK_INVALID);
vlc_tick_t deadline = task->start_date + task->timeout;
if (!first || deadline < first_deadline)
{
first = task;
first_deadline = deadline;
}
}
}
return first;
}
static void *
InterruptThreadRun(void *userdata)
{
vlc_executor_t *executor = userdata;
vlc_mutex_lock(&executor->lock);
while (!executor->closing)
{
struct vlc_executor_task *task = FindFirstTaskToInterrupt(executor);
if (!task)
{
vlc_cond_wait(&executor->interrupt_wait, &executor->lock);
continue;
}
assert(task->start_date != VLC_TICK_INVALID);
assert(task->timeout);
vlc_tick_t deadline = task->start_date + task->timeout;
/* The task may be released during the timedwait */
TaskHold(task);
bool timed_out = vlc_cond_timedwait(&executor->interrupt_wait,
&executor->lock, deadline) != 0;
if (timed_out && !task->finished && !task->canceled)
{
task->canceled = true;
/* This requests the task to stop itself. The interrupt() itself
* should be immediate, but the task could terminate later (once
* its run() callback is completed). */
task->runnable->interrupt(task->userdata);
}
TaskRelease(task);
}
vlc_mutex_unlock(&executor->lock);
return NULL;
}
/**
* Make sure that an interrupt thread is running.
*/
static int
RequireInterruptThread(vlc_executor_t *executor)
{
vlc_mutex_assert(&executor->lock);
if (executor->has_interrupt_thread)
return VLC_SUCCESS;
int ret = vlc_clone(&executor->interrupt_thread, InterruptThreadRun,
executor, VLC_THREAD_PRIORITY_LOW);
if (ret == VLC_SUCCESS)
executor->has_interrupt_thread = true;
return ret;
}
static void
QueuePush(vlc_executor_t *executor, struct vlc_executor_task *task)
{
vlc_mutex_assert(&executor->lock);
vlc_list_append(&task->node, &executor->queue);
vlc_cond_signal(&executor->queue_wait);
}
static struct vlc_executor_task *
QueueTake(vlc_executor_t *executor)
{
vlc_mutex_assert(&executor->lock);
while (!executor->closing && vlc_list_is_empty(&executor->queue))
vlc_cond_wait(&executor->queue_wait, &executor->lock);
if (executor->closing)
return NULL;
struct vlc_executor_task *task =
vlc_list_first_entry_or_null(&executor->queue, struct vlc_executor_task,
node);
assert(task);
vlc_list_remove(&task->node);
return task;
}
static void
QueueRemove(vlc_executor_t *executor, struct vlc_executor_task *task)
{
vlc_mutex_assert(&executor->lock);
vlc_list_remove(&task->node);
}
static void
QueueRemoveAll(vlc_executor_t *executor, void *id)
{
vlc_mutex_assert(&executor->lock);
struct vlc_executor_task *task;
vlc_list_foreach(task, &executor->queue, node)
{
if (!id || task->id == id)
{
task->canceled = true;
vlc_list_remove(&task->node);
TerminateTask(executor, task);
}
}
}
static void *
ThreadRun(void *userdata)
{
struct vlc_executor_thread *thread = userdata;
vlc_executor_t *executor = thread->owner;
vlc_mutex_lock(&executor->lock);
for (;;)
{
struct vlc_executor_task *task = QueueTake(executor);
if (!task)
/* The executor is closing, terminate this thread */
break;
thread->current_task = task;
task->start_date = vlc_tick_now();
if (task->timeout)
vlc_cond_signal(&executor->interrupt_wait);
vlc_mutex_unlock(&executor->lock);
/* Execute the user-provided runnable, without the executor lock */
task->runnable->run(task->userdata);
vlc_mutex_lock(&executor->lock);
thread->current_task = NULL;
TerminateTask(executor, task);
}
vlc_mutex_unlock(&executor->lock);
return NULL;
}
static int
SpawnThread(vlc_executor_t *executor)
{
vlc_mutex_assert(&executor->lock);
assert(executor->nthreads < executor->max_threads);
struct vlc_executor_thread *thread = malloc(sizeof(*thread));
if (!thread)
return VLC_ENOMEM;
thread->owner = executor;
thread->current_task = NULL;
if (vlc_clone(&thread->thread, ThreadRun, thread, VLC_THREAD_PRIORITY_LOW))
{
free(thread);
return VLC_EGENERIC;
}
executor->nthreads++;
vlc_list_append(&thread->node, &executor->threads);
return VLC_SUCCESS;
}
int
vlc_executor_Submit(vlc_executor_t *executor,
const struct vlc_runnable *runnable, void *userdata,
void *id, vlc_tick_t timeout)
{
assert(timeout >= 0);
/* An interrupt callback must be provided if a timeout is requested.
* Note that an interrupt callback may also be provided without timeout,
* for explicit calls to vlc_executor_Cancel(). */
assert(timeout == 0 || runnable->interrupt);
assert(runnable->run);
struct vlc_executor_task *task = TaskNew(runnable, userdata, id, timeout);
if (!task)
return VLC_ENOMEM;
vlc_mutex_lock(&executor->lock);