Commit 2dcf76ce authored by François Cartegnie's avatar François Cartegnie 🤞

demux: adaptive: refactor and split buffering thread

parent 64f08bdf
......@@ -36,6 +36,7 @@
#include "tools/Debug.hpp"
#include <vlc_stream.h>
#include <vlc_demux.h>
#include <vlc_threads.h>
#include <ctime>
......@@ -52,13 +53,24 @@ PlaylistManager::PlaylistManager( demux_t *p_demux_,
logic ( NULL ),
playlist ( pl ),
streamFactory ( factory ),
p_demux ( p_demux_ ),
nextPlaylistupdate ( 0 ),
i_nzpcr ( VLC_TS_INVALID )
p_demux ( p_demux_ )
{
currentPeriod = playlist->getFirstPeriod();
failedupdates = 0;
i_firstpcr = i_nzpcr;
thread = 0;
b_buffering = false;
nextPlaylistupdate = 0;
demux.i_nzpcr = VLC_TS_INVALID;
demux.i_firstpcr = VLC_TS_INVALID;
vlc_mutex_init(&demux.lock);
vlc_cond_init(&demux.cond);
vlc_mutex_init(&lock);
vlc_cond_init(&waitcond);
vlc_mutex_init(&cached.lock);
cached.b_live = false;
cached.i_length = 0;
cached.f_position = 0.0;
cached.i_time = VLC_TS_INVALID;
}
PlaylistManager::~PlaylistManager ()
......@@ -68,6 +80,11 @@ PlaylistManager::~PlaylistManager ()
delete playlist;
delete conManager;
delete logic;
vlc_cond_destroy(&waitcond);
vlc_mutex_destroy(&lock);
vlc_mutex_destroy(&demux.lock);
vlc_cond_destroy(&demux.cond);
vlc_mutex_destroy(&cached.lock);
}
void PlaylistManager::unsetPeriod()
......@@ -139,14 +156,32 @@ bool PlaylistManager::start()
playlist->playbackStart.Set(time(NULL));
nextPlaylistupdate = playlist->playbackStart.Get();
updateControlsContentType();
updateControlsPosition();
if(vlc_clone(&thread, managerThread, reinterpret_cast<void *>(this), VLC_THREAD_PRIORITY_INPUT))
return false;
setBufferingRunState(true);
return true;
}
AbstractStream::status PlaylistManager::demux(mtime_t *pi_nzbarrier, bool send)
void PlaylistManager::stop()
{
AbstractStream::status i_return = AbstractStream::status_eof;
if(thread)
{
vlc_cancel(thread);
vlc_join(thread, NULL);
thread = 0;
}
}
AbstractStream::buffering_status PlaylistManager::bufferize(mtime_t i_nzdeadline,
unsigned i_min_buffering, unsigned i_extra_buffering)
{
AbstractStream::buffering_status i_return = AbstractStream::buffering_end;
const mtime_t i_nzdeadline = *pi_nzbarrier;
std::vector<AbstractStream *>::iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
......@@ -154,42 +189,51 @@ AbstractStream::status PlaylistManager::demux(mtime_t *pi_nzbarrier, bool send)
if (st->isDisabled())
{
if(st->isSelected() && !st->isEOF())
if(st->isSelected() && !st->isDead())
reactivateStream(st);
else
continue;
}
AbstractStream::status i_ret = st->demux(i_nzdeadline, send);
if(i_ret == AbstractStream::status_buffering_ahead ||
i_return == AbstractStream::status_buffering_ahead)
{
i_return = AbstractStream::status_buffering_ahead;
}
else if(i_ret == AbstractStream::status_buffering)
{
i_return = AbstractStream::status_buffering;
}
else if(i_ret == AbstractStream::status_demuxed &&
i_return != AbstractStream::status_buffering)
{
i_return = AbstractStream::status_demuxed;
}
else if(i_ret == AbstractStream::status_dis)
AbstractStream::buffering_status i_ret = st->bufferize(i_nzdeadline, i_min_buffering, i_extra_buffering);
if(i_return != AbstractStream::buffering_ongoing) /* Buffering streams need to keep going */
{
i_return = AbstractStream::status_dis;
if(i_ret > i_return)
i_return = i_ret;
}
}
*pi_nzbarrier = std::min( *pi_nzbarrier, st->getPCR() - VLC_TS_0 );
*pi_nzbarrier = std::max( (mtime_t)0, *pi_nzbarrier );
vlc_mutex_lock(&demux.lock);
if(demux.i_nzpcr == VLC_TS_INVALID &&
i_return != AbstractStream::buffering_lessthanmin /* prevents starting before buffering is reached */ )
{
demux.i_nzpcr = getFirstDTS();
if(demux.i_nzpcr == VLC_TS_INVALID)
demux.i_nzpcr = getPCR();
}
vlc_mutex_unlock(&demux.lock);
return i_return;
}
AbstractStream::status PlaylistManager::dequeue(mtime_t *pi_nzbarrier)
{
AbstractStream::status i_return = AbstractStream::status_eof;
/* might be end of current period */
if(i_return == AbstractStream::status_eof && currentPeriod)
const mtime_t i_nzdeadline = *pi_nzbarrier;
std::vector<AbstractStream *>::iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
unsetPeriod();
currentPeriod = playlist->getNextPeriod(currentPeriod);
i_return = (setupPeriod()) ? AbstractStream::status_eop : AbstractStream::status_eof;
AbstractStream *st = *it;
mtime_t i_pcr;
AbstractStream::status i_ret = st->dequeue(i_nzdeadline, &i_pcr);
if( i_ret > i_return )
i_return = i_ret;
if( i_pcr > VLC_TS_INVALID )
*pi_nzbarrier = std::min( *pi_nzbarrier, i_pcr - VLC_TS_0 );
}
return i_return;
......@@ -225,7 +269,7 @@ mtime_t PlaylistManager::getPCR() const
std::vector<AbstractStream *>::const_iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
if ((*it)->isDisabled() || (*it)->isEOF())
if ((*it)->isDisabled() || (*it)->isDead())
continue;
if(pcr == VLC_TS_INVALID || pcr > (*it)->getPCR())
pcr = (*it)->getPCR();
......@@ -239,7 +283,7 @@ mtime_t PlaylistManager::getFirstDTS() const
std::vector<AbstractStream *>::const_iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
if ((*it)->isDisabled() || (*it)->isEOF())
if ((*it)->isDisabled() || (*it)->isDead())
continue;
if(dts == VLC_TS_INVALID || dts > (*it)->getFirstDTS())
dts = (*it)->getFirstDTS();
......@@ -247,17 +291,6 @@ mtime_t PlaylistManager::getFirstDTS() const
return dts;
}
int PlaylistManager::esCount() const
{
int es = 0;
std::vector<AbstractStream *>::const_iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
es += (*it)->esCount();
}
return es;
}
mtime_t PlaylistManager::getDuration() const
{
if (playlist->isLive())
......@@ -301,6 +334,8 @@ bool PlaylistManager::updatePlaylist()
for(it=streams.begin(); it!=streams.end(); ++it)
(*it)->runUpdates();
updateControlsContentType();
updateControlsPosition();
return true;
}
......@@ -311,7 +346,7 @@ mtime_t PlaylistManager::getFirstPlaybackTime() const
mtime_t PlaylistManager::getCurrentPlaybackTime() const
{
return i_nzpcr;
return demux.i_nzpcr;
}
void PlaylistManager::pruneLiveStream()
......@@ -321,7 +356,7 @@ void PlaylistManager::pruneLiveStream()
for(it=streams.begin(); it!=streams.end(); it++)
{
const AbstractStream *st = *it;
if(st->isDisabled() || !st->isSelected() || st->isEOF())
if(st->isDisabled() || !st->isSelected() || st->isDead())
continue;
const mtime_t t = st->getPlaybackTime();
if(minValidPos == 0 || t < minValidPos)
......@@ -346,57 +381,73 @@ int PlaylistManager::demux_callback(demux_t *p_demux)
int PlaylistManager::doDemux(int64_t increment)
{
mtime_t i_nzbarrier = i_nzpcr + increment;
if(i_nzpcr == VLC_TS_INVALID)
vlc_mutex_lock(&demux.lock);
if(demux.i_nzpcr == VLC_TS_INVALID)
{
if( AbstractStream::status_eof == demux(&i_nzbarrier, false) )
{
return VLC_DEMUXER_EOF;
}
i_nzpcr = getFirstDTS();
if(i_nzpcr == VLC_TS_INVALID)
i_nzpcr = getPCR();
if(i_firstpcr == VLC_TS_INVALID)
i_firstpcr = i_nzpcr;
i_nzbarrier = i_nzpcr + increment;
vlc_cond_timedwait(&demux.cond, &demux.lock, mdate() + CLOCK_FREQ / 20);
vlc_mutex_unlock(&demux.lock);
return AbstractStream::status_buffering;
}
AbstractStream::status status = demux(&i_nzbarrier, true);
AdvDebug(msg_Dbg( p_demux, "doDemux() status %d dts %ld pcr %ld", status, getFirstDTS(), getPCR() ));
if(demux.i_firstpcr == VLC_TS_INVALID)
demux.i_firstpcr = demux.i_nzpcr;
mtime_t i_nzbarrier = demux.i_nzpcr + increment;
vlc_mutex_unlock(&demux.lock);
AbstractStream::status status = dequeue(&i_nzbarrier);
updateControlsContentType();
updateControlsPosition();
switch(status)
{
case AbstractStream::status_eof:
return VLC_DEMUXER_EOF;
{
/* might be end of current period */
if(currentPeriod)
{
setBufferingRunState(false);
BasePeriod *nextPeriod = playlist->getNextPeriod(currentPeriod);
if(!nextPeriod)
return VLC_DEMUXER_EOF;
unsetPeriod();
currentPeriod = nextPeriod;
if (!setupPeriod())
return VLC_DEMUXER_EOF;
demux.i_nzpcr = VLC_TS_INVALID;
demux.i_firstpcr = VLC_TS_INVALID;
es_out_Control(p_demux->out, ES_OUT_RESET_PCR);
setBufferingRunState(true);
}
}
break;
case AbstractStream::status_buffering:
case AbstractStream::status_buffering_ahead:
vlc_mutex_lock(&demux.lock);
vlc_cond_timedwait(&demux.cond, &demux.lock, mdate() + CLOCK_FREQ / 20);
vlc_mutex_unlock(&demux.lock);
break;
case AbstractStream::status_dis:
case AbstractStream::status_eop:
i_nzpcr = VLC_TS_INVALID;
i_firstpcr = VLC_TS_INVALID;
case AbstractStream::status_discontinuity:
vlc_mutex_lock(&demux.lock);
demux.i_nzpcr = VLC_TS_INVALID;
demux.i_firstpcr = VLC_TS_INVALID;
es_out_Control(p_demux->out, ES_OUT_RESET_PCR);
vlc_mutex_unlock(&demux.lock);
break;
case AbstractStream::status_demuxed:
if( i_nzpcr != VLC_TS_INVALID && i_nzbarrier != i_nzpcr )
vlc_mutex_lock(&demux.lock);
if( demux.i_nzpcr != VLC_TS_INVALID && i_nzbarrier != demux.i_nzpcr )
{
i_nzpcr = i_nzbarrier;
es_out_Control(p_demux->out, ES_OUT_SET_GROUP_PCR, 0, VLC_TS_0 + i_nzpcr);
demux.i_nzpcr = i_nzbarrier;
mtime_t pcr = VLC_TS_0 + std::max(INT64_C(0), demux.i_nzpcr - CLOCK_FREQ / 10);
es_out_Control(p_demux->out, ES_OUT_SET_GROUP_PCR, 0, pcr);
}
vlc_mutex_unlock(&demux.lock);
break;
}
if(needsUpdate())
{
if(updatePlaylist())
scheduleNextUpdate();
else
failedupdates++;
}
/* Live starved and update still not there ? */
if(status == AbstractStream::status_buffering_ahead && needsUpdate())
msleep(CLOCK_FREQ / 20); /* Ugly */
return VLC_DEMUXER_SUCCESS;
}
......@@ -411,72 +462,97 @@ int PlaylistManager::doControl(int i_query, va_list args)
switch (i_query)
{
case DEMUX_CAN_SEEK:
*(va_arg (args, bool *)) = !playlist->isLive();
{
vlc_mutex_locker locker(&cached.lock);
*(va_arg (args, bool *)) = ! cached.b_live;
break;
}
case DEMUX_CAN_CONTROL_PACE:
*(va_arg (args, bool *)) = true;
break;
case DEMUX_CAN_PAUSE:
*(va_arg (args, bool *)) = !playlist->isLive();
{
vlc_mutex_locker locker(&cached.lock);
*(va_arg (args, bool *)) = ! cached.b_live;
break;
}
case DEMUX_SET_PAUSE_STATE:
return (playlist->isLive()) ? VLC_EGENERIC : VLC_SUCCESS;
{
vlc_mutex_locker locker(&cached.lock);
return cached.b_live ? VLC_EGENERIC : VLC_SUCCESS;
}
case DEMUX_GET_TIME:
{
mtime_t i_time = getCurrentPlaybackTime();
if(!playlist->isLive())
i_time -= getFirstPlaybackTime();
*(va_arg (args, int64_t *)) = i_time;
vlc_mutex_locker locker(&cached.lock);
*(va_arg (args, int64_t *)) = cached.i_time;
break;
}
case DEMUX_GET_LENGTH:
if(playlist->isLive())
{
vlc_mutex_locker locker(&cached.lock);
if(cached.b_live)
return VLC_EGENERIC;
*(va_arg (args, int64_t *)) = getDuration();
*(va_arg (args, int64_t *)) = cached.i_length;
break;
}
case DEMUX_GET_POSITION:
{
const mtime_t i_duration = getDuration();
if(i_duration == 0) /* == playlist->isLive() */
vlc_mutex_locker locker(&cached.lock);
if(cached.b_live)
return VLC_EGENERIC;
const mtime_t i_length = getCurrentPlaybackTime() - getFirstPlaybackTime();
*(va_arg (args, double *)) = (double) i_length / i_duration;
*(va_arg (args, double *)) = cached.f_position;
break;
}
case DEMUX_SET_POSITION:
{
setBufferingRunState(false); /* stop downloader first */
const mtime_t i_duration = getDuration();
if(i_duration == 0) /* == playlist->isLive() */
{
setBufferingRunState(true);
return VLC_EGENERIC;
}
int64_t time = i_duration * va_arg(args, double);
time += getFirstPlaybackTime();
if(!setPosition(time))
{
setBufferingRunState(true);
return VLC_EGENERIC;
}
i_nzpcr = VLC_TS_INVALID;
demux.i_nzpcr = VLC_TS_INVALID;
setBufferingRunState(true);
break;
}
case DEMUX_SET_TIME:
{
setBufferingRunState(false); /* stop downloader first */
if(playlist->isLive())
{
setBufferingRunState(true);
return VLC_EGENERIC;
}
int64_t time = va_arg(args, int64_t);// + getFirstPlaybackTime();
if(!setPosition(time))
{
setBufferingRunState(true);
return VLC_EGENERIC;
}
i_nzpcr = VLC_TS_INVALID;
demux.i_nzpcr = VLC_TS_INVALID;
setBufferingRunState(true);
break;
}
......@@ -498,6 +574,109 @@ int PlaylistManager::doControl(int i_query, va_list args)
return VLC_SUCCESS;
}
void PlaylistManager::setBufferingRunState(bool b)
{
vlc_mutex_lock(&lock);
b_buffering = b;
if(b_buffering)
vlc_cond_signal(&waitcond);
vlc_mutex_unlock(&lock);
}
void PlaylistManager::Run()
{
vlc_mutex_lock(&lock);
const unsigned i_min_buffering = playlist->getMinBuffering();
const unsigned i_extra_buffering = playlist->getMaxBuffering() - i_min_buffering;
while(1)
{
mutex_cleanup_push(&lock);
while(!b_buffering)
vlc_cond_wait(&waitcond, &lock);
vlc_cleanup_pop();
if(needsUpdate())
{
if(updatePlaylist())
scheduleNextUpdate();
else
failedupdates++;
}
vlc_mutex_lock(&demux.lock);
mtime_t i_nzpcr = demux.i_nzpcr;
vlc_mutex_unlock(&demux.lock);
int canc = vlc_savecancel();
AbstractStream::buffering_status i_return = bufferize(i_nzpcr, i_min_buffering, i_extra_buffering);
vlc_restorecancel( canc );
if(i_return != AbstractStream::buffering_lessthanmin)
{
mtime_t i_deadline = mdate();
if (i_return == AbstractStream::buffering_ongoing)
i_deadline += (CLOCK_FREQ / 20);
if (i_return == AbstractStream::buffering_full)
i_deadline += (CLOCK_FREQ / 10);
else if(i_return == AbstractStream::buffering_end)
i_deadline += (CLOCK_FREQ);
else /*if(i_return == AbstractStream::buffering_suspended)*/
i_deadline += (CLOCK_FREQ / 4);
vlc_mutex_lock(&demux.lock);
vlc_cond_signal(&demux.cond);
vlc_mutex_unlock(&demux.lock);
mutex_cleanup_push(&lock);
while(vlc_cond_timedwait(&waitcond, &lock, i_deadline) == 0
&& i_deadline < mdate());
vlc_cleanup_pop();
}
}
vlc_mutex_unlock(&lock);
}
void * PlaylistManager::managerThread(void *opaque)
{
(reinterpret_cast<PlaylistManager *>(opaque))->Run();
return NULL;
}
void PlaylistManager::updateControlsPosition()
{
vlc_mutex_locker locker(&cached.lock);
const mtime_t i_duration = cached.i_length;
if(i_duration == 0)
{
cached.f_position = 0.0;
}
else
{
const mtime_t i_length = getCurrentPlaybackTime() - getFirstPlaybackTime();
cached.f_position = (double) i_length / i_duration;
}
mtime_t i_time = getCurrentPlaybackTime();
if(!playlist->isLive())
i_time -= getFirstPlaybackTime();
cached.i_time = i_time;
}
void PlaylistManager::updateControlsContentType()
{
vlc_mutex_locker locker(&cached.lock);
if(playlist->isLive())
{
cached.b_live = true;
cached.i_length = 0;
}
else
{
cached.b_live = false;
cached.i_length = getDuration();
}
}
AbstractAdaptationLogic *PlaylistManager::createLogic(AbstractAdaptationLogic::LogicType type, HTTPConnectionManager *conn)
{
switch(type)
......
......@@ -52,8 +52,10 @@ namespace adaptive
virtual ~PlaylistManager ();
bool start();
void stop();
AbstractStream::status demux(mtime_t *, bool);
AbstractStream::buffering_status bufferize(mtime_t, unsigned, unsigned);
AbstractStream::status dequeue(mtime_t *);
void drain();
virtual bool needsUpdate() const;
......@@ -77,11 +79,14 @@ namespace adaptive
virtual mtime_t getFirstPlaybackTime() const;
mtime_t getCurrentPlaybackTime() const;
int esCount() const;
void pruneLiveStream();
virtual bool reactivateStream(AbstractStream *);
bool setupPeriod();
void unsetPeriod();
void updateControlsPosition();
void updateControlsContentType();
/* local factories */
virtual AbstractAdaptationLogic *createLogic(AbstractAdaptationLogic::LogicType,
HTTPConnectionManager *);
......@@ -93,11 +98,39 @@ namespace adaptive
AbstractStreamFactory *streamFactory;
demux_t *p_demux;
std::vector<AbstractStream *> streams;
time_t nextPlaylistupdate;
mtime_t i_nzpcr;
mtime_t i_firstpcr;
BasePeriod *currentPeriod;
/* shared with demux/buffering */
struct
{
mtime_t i_nzpcr;
mtime_t i_firstpcr;
vlc_mutex_t lock;
vlc_cond_t cond;
} demux;
/* buffering process */
time_t nextPlaylistupdate;
int failedupdates;
/* Controls */
struct
{
bool b_live;
mtime_t i_length;
mtime_t i_time;
double f_position;
vlc_mutex_t lock;
} cached;
private:
void setBufferingRunState(bool);
void Run();
static void * managerThread(void *);
vlc_mutex_t lock;
vlc_thread_t thread;
vlc_cond_t waitcond;
bool b_buffering;
};
}
......
......@@ -49,6 +49,7 @@ AbstractStream::AbstractStream(demux_t * demux_)
commandsqueue = NULL;
demuxer = NULL;
fakeesout = NULL;
vlc_mutex_init(&lock);
}
bool AbstractStream::init(const StreamFormat &format_, SegmentTracker *tracker, HTTPConnectionManager *conn)
......@@ -100,6 +101,8 @@ AbstractStream::~AbstractStream()
delete demuxersource;
delete fakeesout;
delete commandsqueue;
vlc_mutex_destroy(&lock);
}
void AbstractStream::prepareFormatChange()
......@@ -130,7 +133,7 @@ void AbstractStream::setDescription(const std::string &desc)
description = desc;
}
bool AbstractStream::isEOF() const
bool AbstractStream::isDead() const
{
return dead;
}
......@@ -147,11 +150,6 @@ mtime_t AbstractStream::getMinAheadTime() const
return segmentTracker->getMinAheadTime();
}
mtime_t AbstractStream::getBufferingLevel() const
{
return commandsqueue->getBufferingLevel();
}
mtime_t AbstractStream::getFirstDTS() const
{
return commandsqueue->getFirstDTS();
......@@ -229,23 +227,33 @@ bool AbstractStream::drain()
return fakeesout->drain();
}
AbstractStream::status AbstractStream::demux(mtime_t nz_deadline, bool send)
AbstractStream::buffering_status AbstractStream::bufferize(mtime_t nz_deadline,
unsigned i_min_buffering, unsigned i_extra_buffering)
{
vlc_mutex_lock(&lock);
/* Ensure it is configured */
if(!segmentTracker || !connManager || dead)
return AbstractStream::status_eof;
if(commandsqueue->isFlushing())
{
if(!send)
return AbstractStream::status_buffering;
vlc_mutex_unlock(&lock);
return AbstractStream::buffering_end;
}
(void) commandsqueue->Process(p_realdemux->out, VLC_TS_0 + nz_deadline);
if(!commandsqueue->isEmpty())
return AbstractStream::status_demuxed;
/* Disable streams that are not selected (alternate streams) */
if(esCount() && !isSelected() && !fakeesout->restarting())
{
disabled = true;
segmentTracker->reset();
commandsqueue->Abort(false);
msg_Dbg(p_realdemux, "deactivating stream %s", format.str().c_str());
vlc_mutex_unlock(&lock);
return AbstractStream::buffering_end;
}
commandsqueue->Abort(true); /* reset buffering level */
return AbstractStream::status_dis;
if(commandsqueue->isFlushing())
{
vlc_mutex_unlock(&lock);
return AbstractStream::buffering_suspended;
}
if(!demuxer)
......@@ -260,21 +268,36 @@ AbstractStream::status AbstractStream::demux(mtime_t nz_deadline, bool send)
prepareFormatChange();