Commit 3f410fd9 authored by Ronald S. Bultje's avatar Ronald S. Bultje

Rewrite flushing logic

The old flushing logic would simply leave frame threads (and tile
threads) running without caring how much latency that might impose
in the post-seek time-to-first-frame. This commit adds a 'flush'
state that will abort all running frame/tile threads from decoding
their current frame, as well as dispose of all frames in the output
queue.

Then, we use dav1d_flush() in dav1d_close() to abort running threads
on exit, instead of signaling their respective dependents to prevent
deadlocks. The advantage of this approach is that we don't signal on
objects we don't have ownership over, and thus this should prevent
race conditions where the owning thread could dispose of the object
just as we're signaling it, which I believe is what causes #193.
parent c1b0808c
......@@ -2368,6 +2368,8 @@ int dav1d_decode_tile_sbrow(Dav1dTileContext *const t) {
t->a = f->a + col_sb128_start + tile_row * f->sb128w;
t->bx < ts->tiling.col_end; t->bx += sb_step)
{
if (atomic_load_explicit(&t->tile_thread.flush, memory_order_acquire))
return 1;
if (decode_sb(t, root_bl, c->intra_edge.root[root_bl]))
return 1;
if (t->bx & 16 || f->seq_hdr.sb128)
......@@ -2397,6 +2399,8 @@ int dav1d_decode_tile_sbrow(Dav1dTileContext *const t) {
t->lf_mask = f->lf.mask + sb128y * f->sb128w + col_sb128_start;
t->bx < ts->tiling.col_end; t->bx += sb_step)
{
if (atomic_load_explicit(&t->tile_thread.flush, memory_order_acquire))
return 1;
if (root_bl == BL_128X128) {
t->cur_sb_cdef_idx_ptr = t->lf_mask->cdef_idx;
t->cur_sb_cdef_idx_ptr[0] = -1;
......@@ -2974,7 +2978,7 @@ int dav1d_submit_frame(Dav1dContext *const c) {
&f->frame_thread.td.lock);
out_delayed = &c->frame_thread.out_delayed[next];
if (out_delayed->p.data[0]) {
if (out_delayed->visible && !out_delayed->flushed)
if (out_delayed->visible)
dav1d_picture_ref(&c->out, &out_delayed->p);
dav1d_thread_picture_unref(out_delayed);
}
......
......@@ -291,6 +291,7 @@ struct Dav1dTileContext {
struct thread_data td;
struct FrameTileThreadData *fttd;
int die;
atomic_int flush;
} tile_thread;
};
......
......@@ -134,6 +134,7 @@ int dav1d_open(Dav1dContext **const c_out,
t->tile_thread.fttd = &f->tile_thread;
pthread_create(&t->tile_thread.td.thread, NULL, dav1d_tile_task, t);
}
atomic_init(&t->tile_thread.flush, 0);
}
f->libaom_cm = av1_alloc_ref_mv_common();
if (!f->libaom_cm) goto error;
......@@ -254,11 +255,8 @@ int dav1d_get_picture(Dav1dContext *const c, Dav1dPicture *const out)
if (out_delayed->p.data[0]) {
const unsigned progress = atomic_load_explicit(&out_delayed->progress[1],
memory_order_relaxed);
if (out_delayed->visible && !out_delayed->flushed &&
progress != FRAME_ERROR)
{
if (out_delayed->visible && progress != FRAME_ERROR)
dav1d_picture_ref(&c->out, &out_delayed->p);
}
dav1d_thread_picture_unref(out_delayed);
if (c->out.data[0])
return output_image(c, out, &c->out);
......@@ -292,8 +290,32 @@ void dav1d_flush(Dav1dContext *const c) {
if (c->n_fc == 1) return;
for (unsigned n = 0; n < c->n_fc; n++)
c->frame_thread.out_delayed[n].flushed = 1;
// mark each currently-running frame as flushing, so that we
// exit out as quickly as the running thread checks this flag
for (unsigned n = 0; n < c->n_fc; n++) {
Dav1dFrameContext *const f = &c->fc[n];
for (int m = 0; m < f->n_tc; m++)
atomic_store(&f->tc[m].tile_thread.flush, 1);
}
for (unsigned n = 0, next = c->frame_thread.next; n < c->n_fc; n++, next++) {
if (next == c->n_fc) next = 0;
Dav1dFrameContext *const f = &c->fc[next];
pthread_mutex_lock(&f->frame_thread.td.lock);
if (f->n_tile_data > 0) {
while (f->n_tile_data > 0)
pthread_cond_wait(&f->frame_thread.td.cond,
&f->frame_thread.td.lock);
assert(!f->cur.data[0]);
}
pthread_mutex_unlock(&f->frame_thread.td.lock);
for (int m = 0; m < f->n_tc; m++)
atomic_store(&f->tc[m].tile_thread.flush, 0);
Dav1dThreadPicture *const out_delayed = &c->frame_thread.out_delayed[next];
if (out_delayed->p.data[0])
dav1d_thread_picture_unref(out_delayed);
}
c->frame_thread.next = 0;
}
void dav1d_close(Dav1dContext **const c_out) {
......@@ -302,37 +324,17 @@ void dav1d_close(Dav1dContext **const c_out) {
Dav1dContext *const c = *c_out;
if (!c) return;
dav1d_flush(c);
for (unsigned n = 0; n < c->n_fc; n++) {
Dav1dFrameContext *const f = &c->fc[n];
// clean-up threading stuff
if (c->n_fc > 1) {
if (f->frame_hdr.refresh_context)
dav1d_cdf_thread_signal(&f->out_cdf);
dav1d_thread_picture_signal(&f->sr_cur, FRAME_ERROR,
PLANE_TYPE_ALL);
pthread_mutex_lock(&f->frame_thread.td.lock);
f->frame_thread.die = 1;
pthread_cond_signal(&f->frame_thread.td.cond);
pthread_mutex_unlock(&f->frame_thread.td.lock);
pthread_join(f->frame_thread.td.thread, NULL);
// free references from dav1d_submit_frame() usually freed by
// dav1d_decode_frame
for (int i = 0; i < 7; i++) {
if (f->refp[i].p.data[0])
dav1d_thread_picture_unref(&f->refp[i]);
dav1d_ref_dec(&f->ref_mvs_ref[i]);
}
dav1d_picture_unref(&f->cur);
dav1d_thread_picture_unref(&f->sr_cur);
dav1d_cdf_thread_unref(&f->in_cdf);
if (f->frame_hdr.refresh_context)
dav1d_cdf_thread_unref(&f->out_cdf);
dav1d_ref_dec(&f->cur_segmap_ref);
dav1d_ref_dec(&f->prev_segmap_ref);
dav1d_ref_dec(&f->mvs_ref);
for (int i = 0; i < f->n_tile_data; i++)
dav1d_data_unref(&f->tile[i].data);
freep(&f->frame_thread.b);
dav1d_freep_aligned(&f->frame_thread.pal_idx);
dav1d_freep_aligned(&f->frame_thread.cf);
......
......@@ -1339,17 +1339,13 @@ int dav1d_parse_obus(Dav1dContext *const c, Dav1dData *const in) {
if (out_delayed->p.data[0]) {
const unsigned progress = atomic_load_explicit(&out_delayed->progress[1],
memory_order_relaxed);
if (out_delayed->visible && !out_delayed->flushed &&
progress != FRAME_ERROR)
{
if (out_delayed->visible && progress != FRAME_ERROR)
dav1d_picture_ref(&c->out, &out_delayed->p);
}
dav1d_thread_picture_unref(out_delayed);
}
dav1d_thread_picture_ref(out_delayed,
&c->refs[c->frame_hdr.existing_frame_idx].p);
out_delayed->visible = 1;
out_delayed->flushed = 0;
out_delayed->p.m = in->m;
pthread_mutex_unlock(&f->frame_thread.td.lock);
}
......
......@@ -160,7 +160,6 @@ int dav1d_thread_picture_alloc(Dav1dThreadPicture *const p,
if (res) return res;
p->visible = visible;
p->flushed = 0;
if (t) {
atomic_init(&p->progress[0], 0);
atomic_init(&p->progress[1], 0);
......@@ -217,7 +216,6 @@ void dav1d_thread_picture_ref(Dav1dThreadPicture *dst,
dst->t = src->t;
dst->visible = src->visible;
dst->progress = src->progress;
dst->flushed = src->flushed;
}
void dav1d_picture_unref(Dav1dPicture *const p) {
......@@ -273,8 +271,10 @@ void dav1d_thread_picture_signal(const Dav1dThreadPicture *const p,
return;
pthread_mutex_lock(&p->t->lock);
if (plane_type != PLANE_TYPE_Y) atomic_store(&p->progress[0], y);
if (plane_type != PLANE_TYPE_BLOCK) atomic_store(&p->progress[1], y);
if (plane_type != PLANE_TYPE_Y)
atomic_store(&p->progress[0], y);
if (plane_type != PLANE_TYPE_BLOCK)
atomic_store(&p->progress[1], y);
pthread_cond_broadcast(&p->t->cond);
pthread_mutex_unlock(&p->t->lock);
}
......@@ -44,7 +44,7 @@ enum PlaneType {
typedef struct Dav1dThreadPicture {
Dav1dPicture p;
int visible, flushed;
int visible;
struct thread_data *t;
// [0] block data (including segmentation map and motion vectors)
// [1] pixel data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment