Commit 4d6bca9a authored by François Cartegnie's avatar François Cartegnie 🤞

demux: adaptive: add locks to command queue

parent 54bfaff6
...@@ -197,11 +197,13 @@ CommandsQueue::CommandsQueue() ...@@ -197,11 +197,13 @@ CommandsQueue::CommandsQueue()
{ {
bufferinglevel = VLC_TS_INVALID; bufferinglevel = VLC_TS_INVALID;
b_drop = false; b_drop = false;
vlc_mutex_init(&lock);
} }
CommandsQueue::~CommandsQueue() CommandsQueue::~CommandsQueue()
{ {
Abort( false ); Abort( false );
vlc_mutex_destroy(&lock);
} }
static bool compareCommands( AbstractCommand *a, AbstractCommand *b ) static bool compareCommands( AbstractCommand *a, AbstractCommand *b )
...@@ -211,6 +213,7 @@ static bool compareCommands( AbstractCommand *a, AbstractCommand *b ) ...@@ -211,6 +213,7 @@ static bool compareCommands( AbstractCommand *a, AbstractCommand *b )
void CommandsQueue::Schedule( AbstractCommand *command ) void CommandsQueue::Schedule( AbstractCommand *command )
{ {
vlc_mutex_lock(&lock);
if( b_drop ) if( b_drop )
{ {
delete command; delete command;
...@@ -218,13 +221,14 @@ void CommandsQueue::Schedule( AbstractCommand *command ) ...@@ -218,13 +221,14 @@ void CommandsQueue::Schedule( AbstractCommand *command )
else if( command->getType() == ES_OUT_SET_GROUP_PCR ) else if( command->getType() == ES_OUT_SET_GROUP_PCR )
{ {
bufferinglevel = command->getTime(); bufferinglevel = command->getTime();
Commit(); LockedCommit();
commands.push_back( command ); commands.push_back( command );
} }
else else
{ {
incoming.push_back( command ); incoming.push_back( command );
} }
vlc_mutex_unlock(&lock);
} }
mtime_t CommandsQueue::Process( es_out_t *out, mtime_t barrier ) mtime_t CommandsQueue::Process( es_out_t *out, mtime_t barrier )
...@@ -232,6 +236,7 @@ mtime_t CommandsQueue::Process( es_out_t *out, mtime_t barrier ) ...@@ -232,6 +236,7 @@ mtime_t CommandsQueue::Process( es_out_t *out, mtime_t barrier )
mtime_t lastdts = barrier; mtime_t lastdts = barrier;
bool b_datasent = false; bool b_datasent = false;
vlc_mutex_lock(&lock);
while( !commands.empty() && commands.front()->getTime() <= barrier ) while( !commands.empty() && commands.front()->getTime() <= barrier )
{ {
AbstractCommand *command = commands.front(); AbstractCommand *command = commands.front();
...@@ -253,18 +258,28 @@ mtime_t CommandsQueue::Process( es_out_t *out, mtime_t barrier ) ...@@ -253,18 +258,28 @@ mtime_t CommandsQueue::Process( es_out_t *out, mtime_t barrier )
command->Execute( out ); command->Execute( out );
delete command; delete command;
} }
vlc_mutex_unlock(&lock);
return lastdts; return lastdts;
} }
void CommandsQueue::Commit() void CommandsQueue::LockedCommit()
{ {
/* reorder all blocks by time between 2 PCR and merge with main list */ /* reorder all blocks by time between 2 PCR and merge with main list */
incoming.sort( compareCommands ); incoming.sort( compareCommands );
commands.splice( commands.end(), incoming ); commands.splice( commands.end(), incoming );
} }
void CommandsQueue::Commit()
{
vlc_mutex_lock(&lock);
LockedCommit();
vlc_mutex_unlock(&lock);
}
void CommandsQueue::Abort( bool b_reset ) void CommandsQueue::Abort( bool b_reset )
{ {
vlc_mutex_lock(&lock);
commands.splice( commands.end(), incoming ); commands.splice( commands.end(), incoming );
while( !commands.empty() ) while( !commands.empty() )
{ {
...@@ -274,27 +289,38 @@ void CommandsQueue::Abort( bool b_reset ) ...@@ -274,27 +289,38 @@ void CommandsQueue::Abort( bool b_reset )
if( b_reset ) if( b_reset )
bufferinglevel = VLC_TS_INVALID; bufferinglevel = VLC_TS_INVALID;
vlc_mutex_unlock(&lock);
} }
bool CommandsQueue::isEmpty() const bool CommandsQueue::isEmpty() const
{ {
return commands.empty() && incoming.empty(); vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
bool b_empty = commands.empty() && incoming.empty();
vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
return b_empty;
} }
void CommandsQueue::setDrop( bool b ) void CommandsQueue::setDrop( bool b )
{ {
vlc_mutex_lock(&lock);
b_drop = b; b_drop = b;
vlc_mutex_unlock(&lock);
} }
mtime_t CommandsQueue::getBufferingLevel() const mtime_t CommandsQueue::getBufferingLevel() const
{ {
return bufferinglevel; mtime_t i_buffer;
vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
i_buffer = bufferinglevel;
vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
return i_buffer;
} }
mtime_t CommandsQueue::getFirstDTS() const mtime_t CommandsQueue::getFirstDTS() const
{ {
mtime_t i_dts = VLC_TS_INVALID; mtime_t i_dts = VLC_TS_INVALID;
std::list<AbstractCommand *>::const_iterator it; std::list<AbstractCommand *>::const_iterator it;
vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
for( it = commands.begin(); it != commands.end(); ++it ) for( it = commands.begin(); it != commands.end(); ++it )
{ {
if( (*it)->getTime() > VLC_TS_INVALID ) if( (*it)->getTime() > VLC_TS_INVALID )
...@@ -303,5 +329,6 @@ mtime_t CommandsQueue::getFirstDTS() const ...@@ -303,5 +329,6 @@ mtime_t CommandsQueue::getFirstDTS() const
break; break;
} }
} }
vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
return i_dts; return i_dts;
} }
...@@ -148,6 +148,8 @@ namespace adaptive ...@@ -148,6 +148,8 @@ namespace adaptive
mtime_t getFirstDTS() const; mtime_t getFirstDTS() const;
private: private:
vlc_mutex_t lock;
void LockedCommit();
std::list<AbstractCommand *> incoming; std::list<AbstractCommand *> incoming;
std::list<AbstractCommand *> commands; std::list<AbstractCommand *> commands;
mtime_t bufferinglevel; mtime_t bufferinglevel;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment