Commit c5db60a7 authored by François Cartegnie's avatar François Cartegnie 🤞

demux: adaptative: rewrite using synchronous demuxers

Can now flush buffers on demux restart.
Do align pcr after sending to decoders instead of always incrementing
by the target value (avoid dropping blocks across segments on restart).
Always issue a fakees to demuxer, then recycle on execution.
Avoids double deletion with duplicate Del commands (demuxer 0..n+self n).
Can now handle HLS discontinuities.
Drops the streamoutput layer.
parent d666b6e5
......@@ -309,8 +309,6 @@ libadaptative_plugin_la_SOURCES = \
demux/adaptative/plumbing/FakeESOutID.hpp \
demux/adaptative/plumbing/SourceStream.cpp \
demux/adaptative/plumbing/SourceStream.hpp \
demux/adaptative/plumbing/StreamOutput.cpp \
demux/adaptative/plumbing/StreamOutput.hpp \
demux/adaptative/ChunksSource.hpp \
demux/adaptative/PlaylistManager.cpp \
demux/adaptative/PlaylistManager.h \
......@@ -364,6 +362,8 @@ libadaptative_dash_SOURCES = \
demux/dash/xml/Node.h \
demux/dash/DASHManager.cpp \
demux/dash/DASHManager.h \
demux/dash/DASHStream.cpp \
demux/dash/DASHStream.hpp \
demux/dash/DASHStreamFormat.hpp
libadaptative_hls_SOURCES = \
......
......@@ -33,7 +33,7 @@
#include "logic/AlwaysBestAdaptationLogic.h"
#include "logic/RateBasedAdaptationLogic.h"
#include "logic/AlwaysLowestAdaptationLogic.hpp"
#include "plumbing/StreamOutput.hpp"
#include "tools/Debug.hpp"
#include <vlc_stream.h>
#include <vlc_demux.h>
......@@ -45,12 +45,12 @@ using namespace adaptative;
PlaylistManager::PlaylistManager( demux_t *p_demux_,
AbstractPlaylist *pl,
AbstractStreamOutputFactory *factory,
AbstractStreamFactory *factory,
AbstractAdaptationLogic::LogicType type ) :
conManager ( NULL ),
logicType ( type ),
playlist ( pl ),
streamOutputFactory( factory ),
streamFactory ( factory ),
p_demux ( p_demux_ ),
nextPlaylistupdate ( 0 ),
i_nzpcr ( 0 )
......@@ -61,14 +61,14 @@ PlaylistManager::PlaylistManager( demux_t *p_demux_,
PlaylistManager::~PlaylistManager ()
{
delete conManager;
delete streamOutputFactory;
delete streamFactory;
unsetPeriod();
delete playlist;
}
void PlaylistManager::unsetPeriod()
{
std::vector<Stream *>::iterator it;
std::vector<AbstractStream *>::iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
delete *it;
streams.clear();
......@@ -84,50 +84,46 @@ bool PlaylistManager::setupPeriod()
for(it=sets.begin();it!=sets.end();++it)
{
BaseAdaptationSet *set = *it;
if(set)
if(set && streamFactory)
{
Stream *st = new (std::nothrow) Stream(p_demux, set->getStreamFormat());
if(!st)
continue;
AbstractAdaptationLogic *logic = createLogic(logicType);
if(!logic)
continue;
SegmentTracker *tracker = new (std::nothrow) SegmentTracker(logic, set);
if(!tracker)
{
delete st;
delete logic;
continue;
}
SegmentTracker *tracker = new (std::nothrow) SegmentTracker(logic, set);
try
AbstractStream *st = streamFactory->create(p_demux, set->getStreamFormat(),
logic, tracker, conManager);
if(!st)
{
delete tracker;
delete logic;
continue;
}
streams.push_back(st);
/* Generate stream description */
std::list<std::string> languages;
if(!set->getLang().empty())
{
languages = set->getLang();
}
else if(!set->getRepresentations().empty())
{
if(!tracker || !streamOutputFactory)
{
delete tracker;
delete logic;
throw VLC_ENOMEM;
}
std::list<std::string> languages;
if(!set->getLang().empty())
{
languages = set->getLang();
}
else if(!set->getRepresentations().empty())
{
languages = set->getRepresentations().front()->getLang();
}
if(!languages.empty())
st->setLanguage(languages.front());
if(!set->description.Get().empty())
st->setDescription(set->description.Get());
st->create(logic, tracker, streamOutputFactory);
streams.push_back(st);
} catch (int) {
delete st;
languages = set->getRepresentations().front()->getLang();
}
if(!languages.empty())
st->setLanguage(languages.front());
if(!set->description.Get().empty())
st->setDescription(set->description.Get());
}
}
return true;
......@@ -135,11 +131,10 @@ bool PlaylistManager::setupPeriod()
bool PlaylistManager::start()
{
if(!setupPeriod())
if(!conManager && !(conManager = new (std::nothrow) HTTPConnectionManager(VLC_OBJECT(p_demux->s))))
return false;
conManager = new (std::nothrow) HTTPConnectionManager(VLC_OBJECT(p_demux->s));
if(!conManager)
if(!setupPeriod())
return false;
playlist->playbackStart.Set(time(NULL));
......@@ -148,14 +143,14 @@ bool PlaylistManager::start()
return true;
}
Stream::status PlaylistManager::demux(mtime_t nzdeadline, bool send)
AbstractStream::status PlaylistManager::demux(mtime_t nzdeadline, bool send)
{
Stream::status i_return = Stream::status_eof;
AbstractStream::status i_return = AbstractStream::status_eof;
std::vector<Stream *>::iterator it;
std::vector<AbstractStream *>::iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
Stream *st = *it;
AbstractStream *st = *it;
if (st->isDisabled())
{
......@@ -165,25 +160,28 @@ Stream::status PlaylistManager::demux(mtime_t nzdeadline, bool send)
continue;
}
Stream::status i_ret = st->demux(conManager, nzdeadline, send);
if(i_ret == Stream::status_buffering)
AbstractStream::status i_ret = st->demux(nzdeadline, send);
if(i_ret == AbstractStream::status_buffering)
{
i_return = Stream::status_buffering;
i_return = AbstractStream::status_buffering;
}
else if(i_ret == Stream::status_demuxed &&
i_return != Stream::status_buffering)
else if(i_ret == AbstractStream::status_demuxed &&
i_return != AbstractStream::status_buffering)
{
i_return = Stream::status_demuxed;
i_return = AbstractStream::status_demuxed;
}
else if(i_ret == AbstractStream::status_dis)
{
i_return = AbstractStream::status_dis;
}
}
/* might be end of current period */
if(i_return == Stream::status_eof && currentPeriod)
if(i_return == AbstractStream::status_eof && currentPeriod)
{
unsetPeriod();
currentPeriod = playlist->getNextPeriod(currentPeriod);
i_return = (setupPeriod()) ? Stream::status_eop : Stream::status_eof;
i_return = (setupPeriod()) ? AbstractStream::status_eop : AbstractStream::status_eof;
}
return i_return;
......@@ -192,10 +190,10 @@ Stream::status PlaylistManager::demux(mtime_t nzdeadline, bool send)
mtime_t PlaylistManager::getPCR() const
{
mtime_t pcr = VLC_TS_INVALID;
std::vector<Stream *>::const_iterator it;
std::vector<AbstractStream *>::const_iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
if ((*it)->isDisabled())
if ((*it)->isDisabled() || (*it)->isEOF())
continue;
if(pcr == VLC_TS_INVALID || pcr > (*it)->getPCR())
pcr = (*it)->getPCR();
......@@ -206,10 +204,10 @@ mtime_t PlaylistManager::getPCR() const
mtime_t PlaylistManager::getFirstDTS() const
{
mtime_t dts = VLC_TS_INVALID;
std::vector<Stream *>::const_iterator it;
std::vector<AbstractStream *>::const_iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
if ((*it)->isDisabled())
if ((*it)->isDisabled() || (*it)->isEOF())
continue;
if(dts == VLC_TS_INVALID || dts > (*it)->getFirstDTS())
dts = (*it)->getFirstDTS();
......@@ -220,7 +218,7 @@ mtime_t PlaylistManager::getFirstDTS() const
int PlaylistManager::esCount() const
{
int es = 0;
std::vector<Stream *>::const_iterator it;
std::vector<AbstractStream *>::const_iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
es += (*it)->esCount();
......@@ -242,7 +240,7 @@ bool PlaylistManager::setPosition(mtime_t time)
for(int real = 0; real < 2; real++)
{
/* Always probe if we can seek first */
std::vector<Stream *>::iterator it;
std::vector<AbstractStream *>::iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
ret &= (*it)->setPosition(time, !real);
......@@ -258,7 +256,7 @@ bool PlaylistManager::seekAble() const
if(playlist->isLive())
return false;
std::vector<Stream *>::const_iterator it;
std::vector<AbstractStream *>::const_iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
{
if(!(*it)->seekAble())
......@@ -269,7 +267,7 @@ bool PlaylistManager::seekAble() const
bool PlaylistManager::updatePlaylist()
{
std::vector<Stream *>::const_iterator it;
std::vector<AbstractStream *>::const_iterator it;
for(it=streams.begin(); it!=streams.end(); ++it)
(*it)->runUpdates();
......@@ -287,7 +285,7 @@ int PlaylistManager::doDemux(int64_t increment)
{
if(i_nzpcr == VLC_TS_INVALID)
{
if( Stream::status_eof == demux(i_nzpcr + increment, false) )
if( AbstractStream::status_eof == demux(i_nzpcr + increment, false) )
{
return VLC_DEMUXER_EOF;
}
......@@ -296,19 +294,20 @@ int PlaylistManager::doDemux(int64_t increment)
i_nzpcr = getPCR();
}
Stream::status status = demux(i_nzpcr + increment, true);
AbstractStream::status status = demux(i_nzpcr + increment, true);
AdvDebug(msg_Dbg( p_demux, "doDemux() status %d dts %ld pcr %ld", status, getFirstDTS(), getPCR() ));
switch(status)
{
case Stream::status_eof:
case AbstractStream::status_eof:
return VLC_DEMUXER_EOF;
case Stream::status_buffering:
case AbstractStream::status_buffering:
break;
case Stream::status_eop:
case AbstractStream::status_dis:
case AbstractStream::status_eop:
i_nzpcr = VLC_TS_INVALID;
es_out_Control(p_demux->out, ES_OUT_RESET_PCR);
break;
case Stream::status_demuxed:
case AbstractStream::status_demuxed:
if( i_nzpcr != VLC_TS_INVALID )
{
i_nzpcr += increment;
......
......@@ -43,20 +43,17 @@ namespace adaptative
using namespace logic;
using namespace http;
class AbstractStreamFactory;
class AbstractStreamOutputFactory;
class PlaylistManager
{
public:
PlaylistManager( demux_t *, AbstractPlaylist *,
AbstractStreamOutputFactory *,
AbstractStreamFactory *,
AbstractAdaptationLogic::LogicType type );
virtual ~PlaylistManager ();
bool start();
Stream::status demux(mtime_t, bool);
AbstractStream::status demux(mtime_t, bool);
mtime_t getDuration() const;
mtime_t getPCR() const;
mtime_t getFirstDTS() const;
......@@ -82,9 +79,9 @@ namespace adaptative
HTTPConnectionManager *conManager;
AbstractAdaptationLogic::LogicType logicType;
AbstractPlaylist *playlist;
AbstractStreamOutputFactory *streamOutputFactory;
AbstractStreamFactory *streamFactory;
demux_t *p_demux;
std::vector<Stream *> streams;
std::vector<AbstractStream *> streams;
time_t nextPlaylistupdate;
mtime_t i_nzpcr;
BasePeriod *currentPeriod;
......
......@@ -22,37 +22,65 @@
#include "http/HTTPConnectionManager.h"
#include "logic/AbstractAdaptationLogic.h"
#include "playlist/SegmentChunk.hpp"
#include "plumbing/StreamOutput.hpp"
#include "SegmentTracker.hpp"
#include "plumbing/SourceStream.hpp"
#include "plumbing/CommandsQueue.hpp"
#include "tools/Debug.hpp"
#include <vlc_demux.h>
using namespace adaptative;
using namespace adaptative::http;
using namespace adaptative::logic;
Stream::Stream(demux_t * demux_, const StreamFormat &format_)
AbstractStream::AbstractStream(demux_t * demux_, const StreamFormat &format_)
{
p_demux = demux_;
p_realdemux = demux_;
type = UNKNOWN;
format = format_;
output = NULL;
adaptationLogic = NULL;
currentChunk = NULL;
eof = false;
dead = false;
disabled = false;
flushing = false;
restarting_output = false;
discontinuity = false;
segmentTracker = NULL;
streamOutputFactory = NULL;
pcr = VLC_TS_INVALID;
demuxer = NULL;
fakeesout = NULL;
/* Don't even try if not supported */
if((unsigned)format == StreamFormat::UNSUPPORTED)
throw VLC_EGENERIC;
demuxersource = new (std::nothrow) ChunksSourceStream( VLC_OBJECT(p_realdemux), this );
if(!demuxersource)
throw VLC_EGENERIC;
CommandsFactory *factory = new CommandsFactory();
fakeesout = new (std::nothrow) FakeESOut(p_realdemux->out, factory);
if(!fakeesout)
{
delete demuxersource;
throw VLC_EGENERIC;
}
fakeesout->setExtraInfoProvider( this );
}
Stream::~Stream()
AbstractStream::~AbstractStream()
{
delete currentChunk;
delete adaptationLogic;
delete output;
delete segmentTracker;
delete demuxer;
delete demuxersource;
delete fakeesout;
}
StreamType Stream::mimeToType(const std::string &mime)
StreamType AbstractStream::mimeToType(const std::string &mime)
{
StreamType mimetype;
if (!mime.compare(0, 6, "video/"))
......@@ -66,97 +94,103 @@ StreamType Stream::mimeToType(const std::string &mime)
return mimetype;
}
void Stream::create(AbstractAdaptationLogic *logic, SegmentTracker *tracker,
const AbstractStreamOutputFactory *factory)
void AbstractStream::bind(AbstractAdaptationLogic *logic, SegmentTracker *tracker,
HTTPConnectionManager *conn)
{
adaptationLogic = logic;
segmentTracker = tracker;
streamOutputFactory = factory;
updateFormat(format);
connManager = conn;
}
void Stream::updateFormat(StreamFormat &newformat)
void AbstractStream::prepareFormatChange()
{
if( format == newformat && output )
return;
output = streamOutputFactory->create(p_demux, newformat, output);
format = newformat;
if(!output)
throw VLC_EGENERIC;
output->setLanguage(language);
output->setDescription(description);
if(demuxer)
{
/* Enqueue Del Commands for all current ES */
demuxer->drain();
/* Enqueue Del Commands for all current ES */
fakeesout->scheduleAllForDeletion();
fakeesout->schedulePCRReset();
fakeesout->commandsqueue.Commit();
/* ignoring demuxer's own Del commands */
fakeesout->commandsqueue.setDrop(true);
delete demuxer;
fakeesout->commandsqueue.setDrop(false);
demuxer = NULL;
}
}
void Stream::setLanguage(const std::string &lang)
void AbstractStream::setLanguage(const std::string &lang)
{
language = lang;
}
void Stream::setDescription(const std::string &desc)
void AbstractStream::setDescription(const std::string &desc)
{
description = desc;
}
bool Stream::isEOF() const
bool AbstractStream::isEOF() const
{
return dead;
}
mtime_t AbstractStream::getPCR() const
{
return false;
return pcr;
}
mtime_t Stream::getPCR() const
mtime_t AbstractStream::getBufferingLevel() const
{
if(!output)
return 0;
return output->getPCR();
return fakeesout->commandsqueue.getBufferingLevel();
}
mtime_t Stream::getFirstDTS() const
mtime_t AbstractStream::getFirstDTS() const
{
if(!output)
return 0;
return output->getFirstDTS();
return fakeesout->commandsqueue.getFirstDTS();
}
int Stream::esCount() const
int AbstractStream::esCount() const
{
if(!output)
return 0;
return output->esCount();
return fakeesout->esCount();
}
bool Stream::operator ==(const Stream &stream) const
bool AbstractStream::operator ==(const AbstractStream &stream) const
{
return stream.type == type;
}
SegmentChunk * Stream::getChunk()
SegmentChunk * AbstractStream::getChunk()
{
if (currentChunk == NULL && output && !eof)
if (currentChunk == NULL && !eof)
{
if(esCount() && !isSelected())
{
disabled = true;
return NULL;
}
currentChunk = segmentTracker->getNextChunk(output->switchAllowed());
currentChunk = segmentTracker->getNextChunk(!fakeesout->restarting());
if (currentChunk == NULL)
eof = true;
}
return currentChunk;
}
bool Stream::seekAble() const
bool AbstractStream::seekAble() const
{
return (output && output->seekAble());
return (demuxer &&
!fakeesout->restarting() &&
!restarting_output &&
!discontinuity &&
!flushing );
}
bool Stream::isSelected() const
bool AbstractStream::isSelected() const
{
return output && output->isSelected();
return fakeesout->hasSelectedEs();
}
bool Stream::reactivate(mtime_t basetime)
bool AbstractStream::reactivate(mtime_t basetime)
{
if(setPosition(basetime, false))
{
......@@ -170,46 +204,144 @@ bool Stream::reactivate(mtime_t basetime)
}
}
bool Stream::isDisabled() const
bool AbstractStream::startDemux()
{
if(demuxer)
return false;
try
{
demuxersource->Reset();
demuxer = createDemux(format);
} catch(int) {
msg_Err(p_realdemux, "Failed to create demuxer");
}
return !!demuxer;
}
bool AbstractStream::restartDemux()
{
if(!demuxer)
{
return startDemux();
}
else if(demuxer->reinitsOnSeek())
{
/* Push all ES as recycling candidates */
fakeesout->recycleAll();
/* Restart with ignoring pushes to queue */
return demuxer->restart(fakeesout->commandsqueue);
}
return true;
}
bool AbstractStream::isDisabled() const
{
return disabled;
}
Stream::status Stream::demux(HTTPConnectionManager *connManager, mtime_t nz_deadline, bool send)
AbstractStream::status AbstractStream::demux(mtime_t nz_deadline, bool send)
{
if(!output)
return Stream::status_eof;
/* Ensure it is configured */
if(!adaptationLogic || !segmentTracker || !connManager || dead)
return AbstractStream::status_eof;
if(nz_deadline + VLC_TS_0 > output->getPCR()) /* not already demuxed */
if(flushing)
{
if(!send)
return AbstractStream::status_buffering;
pcr = fakeesout->commandsqueue.Process(p_realdemux->out, VLC_TS_0 + nz_deadline);
if(!fakeesout->commandsqueue.isEmpty())
return AbstractStream::status_demuxed;
fakeesout->commandsqueue.Abort(true); /* reset buffering level */
flushing = false;
pcr = 0;
return AbstractStream::status_dis;
}
if(!demuxer && !startDemux())
{
/* If demux fails because of probing failure / wrong format*/
if(restarting_output)
{
msg_Dbg( p_realdemux, "Flushing on format change" );
prepareFormatChange();
restarting_output = false;
discontinuity = false;
flushing = true;
return AbstractStream::status_buffering;
}
dead = true; /* Prevent further retries */
return AbstractStream::status_eof;
}
if(nz_deadline + VLC_TS_0 > getBufferingLevel()) /* not already demuxed */
{
/* need to read, demuxer still buffering, ... */
if(read(connManager) <= 0)
if(demuxer->demux() != VLC_DEMUXER_SUCCESS)
{
if(output->isEmpty())
return Stream::status_eof;
if(restarting_output || discontinuity)
{
msg_Dbg( p_realdemux, "Flushing on discontinuity" );
prepareFormatChange();
restarting_output = false;
discontinuity = false;
flushing = true;
return AbstractStream::status_buffering;
}
fakeesout->commandsqueue.Commit();
if(fakeesout->commandsqueue.isEmpty())
return AbstractStream::status_eof;
}
else if(nz_deadline + VLC_TS_0 > output->getPCR()) /* need to read more */