Commit 3d8bd06f authored by François Cartegnie's avatar François Cartegnie 🤞

demux: adaptive: have streams own commands queue

parent 4d6bca9a
......@@ -48,6 +48,7 @@ AbstractStream::AbstractStream(demux_t * demux_, const StreamFormat &format_)
segmentTracker = NULL;
pcr = VLC_TS_INVALID;
commandsqueue = NULL;
demuxer = NULL;
fakeesout = NULL;
......@@ -59,10 +60,25 @@ AbstractStream::AbstractStream(demux_t * demux_, const StreamFormat &format_)
if(!demuxersource)
throw VLC_EGENERIC;
CommandsFactory *factory = new CommandsFactory();
fakeesout = new (std::nothrow) FakeESOut(p_realdemux->out, factory);
CommandsFactory *factory = new (std::nothrow) CommandsFactory();
if(!factory)
{
delete demuxersource;
throw VLC_EGENERIC;
}
commandsqueue = new CommandsQueue(factory);
if(!commandsqueue)
{
delete factory;
delete demuxersource;
throw VLC_EGENERIC;
}
fakeesout = new (std::nothrow) FakeESOut(p_realdemux->out, commandsqueue);
if(!fakeesout)
{
delete commandsqueue;
delete demuxersource;
throw VLC_EGENERIC;
}
......@@ -77,6 +93,7 @@ AbstractStream::~AbstractStream()
delete demuxer;
delete demuxersource;
delete fakeesout;
delete commandsqueue;
}
......@@ -96,11 +113,11 @@ void AbstractStream::prepareFormatChange()
/* Enqueue Del Commands for all current ES */
fakeesout->scheduleAllForDeletion();
fakeesout->schedulePCRReset();
fakeesout->commandsqueue.Commit();
commandsqueue->Commit();
/* ignoring demuxer's own Del commands */
fakeesout->commandsqueue.setDrop(true);
commandsqueue->setDrop(true);
delete demuxer;
fakeesout->commandsqueue.setDrop(false);
commandsqueue->setDrop(false);
demuxer = NULL;
}
}
......@@ -134,12 +151,12 @@ mtime_t AbstractStream::getMinAheadTime() const
mtime_t AbstractStream::getBufferingLevel() const
{
return fakeesout->commandsqueue.getBufferingLevel();
return commandsqueue->getBufferingLevel();
}
mtime_t AbstractStream::getFirstDTS() const
{
return fakeesout->commandsqueue.getFirstDTS();
return commandsqueue->getFirstDTS();
}
int AbstractStream::esCount() const
......@@ -201,9 +218,9 @@ bool AbstractStream::restartDemux()
/* Push all ES as recycling candidates */
fakeesout->recycleAll();
/* Restart with ignoring pushes to queue */
return demuxer->restart(fakeesout->commandsqueue);
return demuxer->restart(commandsqueue);
}
fakeesout->commandsqueue.Commit();
commandsqueue->Commit();
return true;
}
......@@ -223,11 +240,11 @@ AbstractStream::status AbstractStream::demux(mtime_t nz_deadline, bool send)
if(!send)
return AbstractStream::status_buffering;
pcr = fakeesout->commandsqueue.Process(p_realdemux->out, VLC_TS_0 + nz_deadline);
if(!fakeesout->commandsqueue.isEmpty())
pcr = commandsqueue->Process(p_realdemux->out, VLC_TS_0 + nz_deadline);
if(!commandsqueue->isEmpty())
return AbstractStream::status_demuxed;
fakeesout->commandsqueue.Abort(true); /* reset buffering level */
commandsqueue->Abort(true); /* reset buffering level */
flushing = false;
pcr = 0;
return AbstractStream::status_dis;
......@@ -270,8 +287,8 @@ AbstractStream::status AbstractStream::demux(mtime_t nz_deadline, bool send)
return AbstractStream::status_buffering;
}
fakeesout->commandsqueue.Commit();
if(fakeesout->commandsqueue.isEmpty())
commandsqueue->Commit();
if(commandsqueue->isEmpty())
return AbstractStream::status_eof;
}
else if(nz_deadline + VLC_TS_0 > getBufferingLevel()) /* need to read more */
......@@ -284,7 +301,7 @@ AbstractStream::status AbstractStream::demux(mtime_t nz_deadline, bool send)
description.c_str(), getPCR(), getFirstDTS(), nz_deadline, getBufferingLevel()));
if(send)
pcr = fakeesout->commandsqueue.Process( p_realdemux->out, VLC_TS_0 + nz_deadline );
pcr = commandsqueue->Process( p_realdemux->out, VLC_TS_0 + nz_deadline );
/* Disable streams that are not selected (alternate streams) */
if(esCount() && !isSelected() && !fakeesout->restarting())
......
......@@ -25,6 +25,7 @@
#include "ChunksSource.hpp"
#include "SegmentTracker.hpp"
#include "plumbing/CommandsQueue.hpp"
#include "plumbing/Demuxer.hpp"
#include "plumbing/SourceStream.hpp"
#include "plumbing/FakeESOut.hpp"
......@@ -111,6 +112,7 @@ namespace adaptive
std::string language;
std::string description;
CommandsQueue *commandsqueue;
AbstractDemuxer *demuxer;
AbstractSourceStream *demuxersource;
FakeESOut *fakeesout; /* to intercept/proxy what is sent from demuxstream */
......
......@@ -193,16 +193,18 @@ EsOutControlResetPCRCommand * CommandsFactory::creatEsOutControlResetPCRCommand(
/*
* Commands Queue management
*/
CommandsQueue::CommandsQueue()
CommandsQueue::CommandsQueue( CommandsFactory *f )
{
bufferinglevel = VLC_TS_INVALID;
b_drop = false;
commandsFactory = f;
vlc_mutex_init(&lock);
}
CommandsQueue::~CommandsQueue()
{
Abort( false );
delete commandsFactory;
vlc_mutex_destroy(&lock);
}
......@@ -231,6 +233,11 @@ void CommandsQueue::Schedule( AbstractCommand *command )
vlc_mutex_unlock(&lock);
}
CommandsFactory * CommandsQueue::factory()
{
return commandsFactory;
}
mtime_t CommandsQueue::Process( es_out_t *out, mtime_t barrier )
{
mtime_t lastdts = barrier;
......
......@@ -136,8 +136,9 @@ namespace adaptive
class CommandsQueue
{
public:
CommandsQueue();
CommandsQueue( CommandsFactory * );
~CommandsQueue();
CommandsFactory * factory();
void Schedule( AbstractCommand * );
mtime_t Process( es_out_t *out, mtime_t );
void Abort( bool b_reset );
......@@ -148,6 +149,7 @@ namespace adaptive
mtime_t getFirstDTS() const;
private:
CommandsFactory *commandsFactory;
vlc_mutex_t lock;
void LockedCommit();
std::list<AbstractCommand *> incoming;
......
......@@ -92,14 +92,14 @@ bool Demuxer::create()
return true;
}
bool Demuxer::restart(CommandsQueue &queue)
bool Demuxer::restart(CommandsQueue *queue)
{
if(p_demux)
{
queue.setDrop(true);
queue->setDrop(true);
demux_Delete(p_demux);
p_demux = NULL;
queue.setDrop(false);
queue->setDrop(false);
}
sourcestream->Reset();
return create();
......
......@@ -36,7 +36,7 @@ namespace adaptive
virtual int demux(mtime_t) = 0;
virtual void drain() = 0;
virtual bool create() = 0;
virtual bool restart(CommandsQueue &) = 0;
virtual bool restart(CommandsQueue *) = 0;
bool alwaysStartsFromZero() const;
bool reinitsOnSeek() const;
......@@ -53,7 +53,7 @@ namespace adaptive
virtual int demux(mtime_t); /* impl */
virtual void drain(); /* impl */
virtual bool create(); /* impl */
virtual bool restart(CommandsQueue &); /* impl */
virtual bool restart(CommandsQueue *); /* impl */
protected:
AbstractSourceStream *sourcestream;
......
......@@ -24,13 +24,14 @@
#include "FakeESOut.hpp"
#include "FakeESOutID.hpp"
#include "CommandsQueue.hpp"
#include <vlc_es_out.h>
#include <vlc_block.h>
#include <cassert>
using namespace adaptive;
FakeESOut::FakeESOut( es_out_t *es, CommandsFactory *factory )
FakeESOut::FakeESOut( es_out_t *es, CommandsQueue *queue )
{
real_es_out = es;
fakeesout = new es_out_t;
......@@ -42,7 +43,8 @@ FakeESOut::FakeESOut( es_out_t *es, CommandsFactory *factory )
fakeesout->pf_send = esOutSend_Callback;
fakeesout->p_sys = (es_out_sys_t*) this;
commandsFactory = factory;
commandsqueue = queue;
timestamps_offset = 0;
extrainfo = NULL;
}
......@@ -54,8 +56,6 @@ es_out_t * FakeESOut::getEsOut()
FakeESOut::~FakeESOut()
{
delete commandsFactory;
recycleAll();
gc();
......@@ -145,9 +145,9 @@ size_t FakeESOut::esCount() const
void FakeESOut::schedulePCRReset()
{
AbstractCommand *command = commandsFactory->creatEsOutControlResetPCRCommand();
AbstractCommand *command = commandsqueue->factory()->creatEsOutControlResetPCRCommand();
if( likely(command) )
commandsqueue.Schedule( command );
commandsqueue->Schedule( command );
}
void FakeESOut::scheduleAllForDeletion()
......@@ -158,10 +158,10 @@ void FakeESOut::scheduleAllForDeletion()
FakeESOutID *es_id = *it;
if(!es_id->scheduledForDeletion())
{
AbstractCommand *command = commandsFactory->createEsOutDelCommand( es_id );
AbstractCommand *command = commandsqueue->factory()->createEsOutDelCommand( es_id );
if( likely(command) )
{
commandsqueue.Schedule( command );
commandsqueue->Schedule( command );
es_id->setScheduledForDeletion();
}
}
......@@ -171,8 +171,8 @@ void FakeESOut::scheduleAllForDeletion()
void FakeESOut::recycleAll()
{
/* Only used when demux is killed and commands queue is cancelled */
commandsqueue.Abort( true );
assert(commandsqueue.isEmpty());
commandsqueue->Abort( true );
assert(commandsqueue->isEmpty());
recycle_candidates.splice( recycle_candidates.end(), fakeesidlist );
}
......@@ -229,10 +229,10 @@ es_out_id_t * FakeESOut::esOutAdd_Callback(es_out_t *fakees, const es_format_t *
if( likely(es_id) )
{
assert(!es_id->scheduledForDeletion());
AbstractCommand *command = me->commandsFactory->createEsOutAddCommand( es_id );
AbstractCommand *command = me->commandsqueue->factory()->createEsOutAddCommand( es_id );
if( likely(command) )
{
me->commandsqueue.Schedule( command );
me->commandsqueue->Schedule( command );
return reinterpret_cast<es_out_id_t *>(es_id);
}
else
......@@ -255,10 +255,10 @@ int FakeESOut::esOutSend_Callback(es_out_t *fakees, es_out_id_t *p_es, block_t *
if( p_block->i_pts > VLC_TS_INVALID )
p_block->i_pts += offset;
}
AbstractCommand *command = me->commandsFactory->createEsOutSendCommand( es_id, p_block );
AbstractCommand *command = me->commandsqueue->factory()->createEsOutSendCommand( es_id, p_block );
if( likely(command) )
{
me->commandsqueue.Schedule( command );
me->commandsqueue->Schedule( command );
return VLC_SUCCESS;
}
return VLC_EGENERIC;
......@@ -268,10 +268,10 @@ void FakeESOut::esOutDel_Callback(es_out_t *fakees, es_out_id_t *p_es)
{
FakeESOut *me = (FakeESOut *) fakees->p_sys;
FakeESOutID *es_id = reinterpret_cast<FakeESOutID *>( p_es );
AbstractCommand *command = me->commandsFactory->createEsOutDelCommand( es_id );
AbstractCommand *command = me->commandsqueue->factory()->createEsOutDelCommand( es_id );
if( likely(command) )
{
me->commandsqueue.Schedule( command );
me->commandsqueue->Schedule( command );
es_id->setScheduledForDeletion();
}
}
......@@ -292,10 +292,10 @@ int FakeESOut::esOutControl_Callback(es_out_t *fakees, int i_query, va_list args
i_group = 0;
int64_t pcr = static_cast<int64_t>(va_arg( args, int64_t ));
pcr += me->getTimestampOffset();
AbstractCommand *command = me->commandsFactory->createEsOutControlPCRCommand( i_group, pcr );
AbstractCommand *command = me->commandsqueue->factory()->createEsOutControlPCRCommand( i_group, pcr );
if( likely(command) )
{
me->commandsqueue.Schedule( command );
me->commandsqueue->Schedule( command );
return VLC_SUCCESS;
}
}
......@@ -322,8 +322,8 @@ int FakeESOut::esOutControl_Callback(es_out_t *fakees, int i_query, va_list args
void FakeESOut::esOutDestroy_Callback(es_out_t *fakees)
{
FakeESOut *me = (FakeESOut *) fakees->p_sys;
AbstractCommand *command = me->commandsFactory->createEsOutDestroyCommand();
AbstractCommand *command = me->commandsqueue->factory()->createEsOutDestroyCommand();
if( likely(command) )
me->commandsqueue.Schedule( command );
me->commandsqueue->Schedule( command );
}
/* !Static callbacks */
......@@ -20,8 +20,8 @@
#ifndef FAKEESOUT_HPP
#define FAKEESOUT_HPP
#include "CommandsQueue.hpp"
#include <vlc_atomic.h>
#include <vlc_common.h>
#include <list>
namespace adaptive
{
......@@ -31,10 +31,13 @@ namespace adaptive
virtual void fillExtraFMTInfo( es_format_t * ) const = 0;
};
class CommandsQueue;
class FakeESOutID;
class FakeESOut
{
public:
FakeESOut( es_out_t *, CommandsFactory * );
FakeESOut( es_out_t *, CommandsQueue * );
~FakeESOut();
es_out_t * getEsOut();
void setTimestampOffset( mtime_t );
......@@ -60,14 +63,12 @@ namespace adaptive
static int esOutControl_Callback( es_out_t *, int, va_list );
static void esOutDestroy_Callback( es_out_t * );
CommandsQueue commandsqueue;
private:
es_out_t *real_es_out;
FakeESOutID * createNewID( const es_format_t * );
ExtraFMTInfoInterface *extrainfo;
mtime_t getTimestampOffset() const;
CommandsFactory *commandsFactory;
CommandsQueue *commandsqueue;
es_out_t *fakeesout;
mtime_t timestamps_offset;
std::list<FakeESOutID *> fakeesidlist;
......
......@@ -66,7 +66,7 @@ AbstractDemuxer * DASHStream::createDemux(const StreamFormat &format)
delete ret;
ret = NULL;
}
else fakeesout->commandsqueue.Commit();
else commandsqueue->Commit();
return ret;
}
......
......@@ -80,7 +80,7 @@ AbstractDemuxer * HLSStream::createDemux(const StreamFormat &format)
delete ret;
ret = NULL;
}
else fakeesout->commandsqueue.Commit();
else commandsqueue->Commit();
return ret;
}
......
......@@ -50,7 +50,7 @@ AbstractDemuxer * SmoothStream::createDemux(const StreamFormat &format)
delete ret;
ret = NULL;
}
else fakeesout->commandsqueue.Commit();
else commandsqueue->Commit();
return ret;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment