Commit a490aecf authored by Hugo Beauzée-Luyssen's avatar Hugo Beauzée-Luyssen

MediaLibrary: Trigger a callback when background service state changes

parent 888d2c24
...@@ -172,6 +172,11 @@ public: ...@@ -172,6 +172,11 @@ public:
* *
*/ */
virtual void onParsingStatsUpdated( uint32_t percent) = 0; virtual void onParsingStatsUpdated( uint32_t percent) = 0;
/**
* @brief onBackgroundTasksIdleChanged Called when background tasks idle state change
* @param isIdle true when all background tasks are idle, false otherwise
*/
virtual void onBackgroundTasksIdleChanged( bool isIdle ) = 0;
}; };
class IMediaLibrary class IMediaLibrary
......
...@@ -88,6 +88,8 @@ MediaLibrary::MediaLibrary() ...@@ -88,6 +88,8 @@ MediaLibrary::MediaLibrary()
: m_callback( nullptr ) : m_callback( nullptr )
, m_verbosity( LogLevel::Error ) , m_verbosity( LogLevel::Error )
, m_initialized( false ) , m_initialized( false )
, m_discovererIdle( true )
, m_parserIdle( true )
{ {
Log::setLogLevel( m_verbosity ); Log::setLogLevel( m_verbosity );
} }
...@@ -699,6 +701,26 @@ void MediaLibrary::resumeBackgroundOperations() ...@@ -699,6 +701,26 @@ void MediaLibrary::resumeBackgroundOperations()
m_parser->resume(); m_parser->resume();
} }
void MediaLibrary::onDiscovererIdleChanged( bool idle )
{
bool expected = !idle;
if ( m_discovererIdle.compare_exchange_strong( expected, idle ) == true )
{
if ( m_parserIdle == idle )
m_callback->onBackgroundTasksIdleChanged( idle );
}
}
void MediaLibrary::onParserIdleChanged( bool idle )
{
bool expected = !idle;
if ( m_parserIdle.compare_exchange_strong( expected, idle ) == true )
{
if ( m_discovererIdle == idle )
m_callback->onBackgroundTasksIdleChanged( idle );
}
}
DBConnection MediaLibrary::getConn() const DBConnection MediaLibrary::getConn() const
{ {
return m_dbConnection.get(); return m_dbConnection.get();
......
...@@ -130,6 +130,8 @@ class MediaLibrary : public IMediaLibrary, public IDeviceListerCb ...@@ -130,6 +130,8 @@ class MediaLibrary : public IMediaLibrary, public IDeviceListerCb
virtual void pauseBackgroundOperations() override; virtual void pauseBackgroundOperations() override;
virtual void resumeBackgroundOperations() override; virtual void resumeBackgroundOperations() override;
void onDiscovererIdleChanged( bool idle );
void onParserIdleChanged( bool idle );
DBConnection getConn() const; DBConnection getConn() const;
IMediaLibraryCb* getCb() const; IMediaLibraryCb* getCb() const;
...@@ -185,6 +187,8 @@ class MediaLibrary : public IMediaLibrary, public IDeviceListerCb ...@@ -185,6 +187,8 @@ class MediaLibrary : public IMediaLibrary, public IDeviceListerCb
LogLevel m_verbosity; LogLevel m_verbosity;
Settings m_settings; Settings m_settings;
bool m_initialized; bool m_initialized;
std::atomic_bool m_discovererIdle;
std::atomic_bool m_parserIdle;
}; };
} }
......
...@@ -119,6 +119,7 @@ void DiscovererWorker::enqueue( const std::string& entryPoint, Task::Type type ) ...@@ -119,6 +119,7 @@ void DiscovererWorker::enqueue( const std::string& entryPoint, Task::Type type )
void DiscovererWorker::run() void DiscovererWorker::run()
{ {
LOG_INFO( "Entering DiscovererWorker thread" ); LOG_INFO( "Entering DiscovererWorker thread" );
m_ml->onDiscovererIdleChanged( false );
while ( m_run == true ) while ( m_run == true )
{ {
Task task; Task task;
...@@ -126,9 +127,11 @@ void DiscovererWorker::run() ...@@ -126,9 +127,11 @@ void DiscovererWorker::run()
std::unique_lock<compat::Mutex> lock( m_mutex ); std::unique_lock<compat::Mutex> lock( m_mutex );
if ( m_tasks.size() == 0 ) if ( m_tasks.size() == 0 )
{ {
m_ml->onDiscovererIdleChanged( true );
m_cond.wait( lock, [this]() { return m_tasks.size() > 0 || m_run == false; } ); m_cond.wait( lock, [this]() { return m_tasks.size() > 0 || m_run == false; } );
if ( m_run == false ) if ( m_run == false )
break; break;
m_ml->onDiscovererIdleChanged( false );
} }
task = m_tasks.front(); task = m_tasks.front();
m_tasks.pop(); m_tasks.pop();
...@@ -155,6 +158,7 @@ void DiscovererWorker::run() ...@@ -155,6 +158,7 @@ void DiscovererWorker::run()
} }
} }
LOG_INFO( "Exiting DiscovererWorker thread" ); LOG_INFO( "Exiting DiscovererWorker thread" );
m_ml->onDiscovererIdleChanged( true );
} }
void DiscovererWorker::runReload( const std::string& entryPoint ) void DiscovererWorker::runReload( const std::string& entryPoint )
......
...@@ -159,4 +159,24 @@ void Parser::done( std::unique_ptr<parser::Task> t, parser::Task::Status status ...@@ -159,4 +159,24 @@ void Parser::done( std::unique_ptr<parser::Task> t, parser::Task::Status status
m_services[serviceIdx]->parse( std::move( t ) ); m_services[serviceIdx]->parse( std::move( t ) );
} }
void Parser::onIdleChanged( bool idle )
{
// If any parser service is not idle, then the global parser state is active
if ( idle == false )
{
m_ml->onParserIdleChanged( false );
return;
}
// Otherwise the parser is idle when all services are idle
for ( const auto& s : m_services )
{
// We're switching a service from "not idle" to "idle" here, so as far as the medialibrary
// is concerned the parser is still "not idle". In case a single parser service isn't
// idle, no need to trigger a change to the medialibrary.
if ( s->isIdle() == false )
return;
}
m_ml->onParserIdleChanged( true );
}
} }
...@@ -39,6 +39,7 @@ class IParserCb ...@@ -39,6 +39,7 @@ class IParserCb
public: public:
virtual ~IParserCb() = default; virtual ~IParserCb() = default;
virtual void done( std::unique_ptr<parser::Task> task, parser::Task::Status status ) = 0; virtual void done( std::unique_ptr<parser::Task> task, parser::Task::Status status ) = 0;
virtual void onIdleChanged( bool isIdle ) = 0;
}; };
class Parser : IParserCb class Parser : IParserCb
...@@ -60,6 +61,7 @@ private: ...@@ -60,6 +61,7 @@ private:
void restore(); void restore();
void updateStats(); void updateStats();
virtual void done( std::unique_ptr<parser::Task> task, parser::Task::Status status ) override; virtual void done( std::unique_ptr<parser::Task> task, parser::Task::Status status ) override;
virtual void onIdleChanged( bool idle ) override;
private: private:
typedef std::vector<ServicePtr> ServiceList; typedef std::vector<ServicePtr> ServiceList;
......
...@@ -102,6 +102,11 @@ void ParserService::initialize( MediaLibrary* ml, IParserCb* parserCb ) ...@@ -102,6 +102,11 @@ void ParserService::initialize( MediaLibrary* ml, IParserCb* parserCb )
initialize(); initialize();
} }
bool ParserService::isIdle() const
{
return m_idle;
}
uint8_t ParserService::nbNativeThreads() const uint8_t ParserService::nbNativeThreads() const
{ {
auto nbProcs = compat::Thread::hardware_concurrency(); auto nbProcs = compat::Thread::hardware_concurrency();
...@@ -122,6 +127,7 @@ void ParserService::mainloop() ...@@ -122,6 +127,7 @@ void ParserService::mainloop()
// that the underlying service has been deleted already. // that the underlying service has been deleted already.
std::string serviceName = name(); std::string serviceName = name();
LOG_INFO("Entering ParserService [", serviceName, "] thread"); LOG_INFO("Entering ParserService [", serviceName, "] thread");
setIdle( false );
while ( m_stopParser == false ) while ( m_stopParser == false )
{ {
...@@ -131,6 +137,7 @@ void ParserService::mainloop() ...@@ -131,6 +137,7 @@ void ParserService::mainloop()
if ( m_tasks.empty() == true || m_paused == true ) if ( m_tasks.empty() == true || m_paused == true )
{ {
LOG_INFO( "Halting ParserService [", serviceName, "] mainloop" ); LOG_INFO( "Halting ParserService [", serviceName, "] mainloop" );
setIdle( true );
m_cond.wait( lock, [this]() { m_cond.wait( lock, [this]() {
return ( m_tasks.empty() == false && m_paused == false ) return ( m_tasks.empty() == false && m_paused == false )
|| m_stopParser == true; || m_stopParser == true;
...@@ -139,6 +146,7 @@ void ParserService::mainloop() ...@@ -139,6 +146,7 @@ void ParserService::mainloop()
// We might have been woken up because the parser is being destroyed // We might have been woken up because the parser is being destroyed
if ( m_stopParser == true ) if ( m_stopParser == true )
break; break;
setIdle( false );
} }
// Otherwise it's safe to assume we have at least one element. // Otherwise it's safe to assume we have at least one element.
LOG_INFO('[', serviceName, "] has ", m_tasks.size(), " tasks remaining" ); LOG_INFO('[', serviceName, "] has ", m_tasks.size(), " tasks remaining" );
...@@ -175,6 +183,15 @@ void ParserService::mainloop() ...@@ -175,6 +183,15 @@ void ParserService::mainloop()
m_parserCb->done( std::move( task ), status ); m_parserCb->done( std::move( task ), status );
} }
LOG_INFO("Exiting ParserService [", serviceName, "] thread"); LOG_INFO("Exiting ParserService [", serviceName, "] thread");
setIdle( true );
}
void ParserService::setIdle(bool isIdle)
{
// Calling the idleChanged callback will trigger a call to isIdle, so set the value before
// invoking it, otherwise we have an incoherent state.
m_idle = isIdle;
m_parserCb->onIdleChanged( isIdle );
} }
} }
...@@ -62,6 +62,7 @@ public: ...@@ -62,6 +62,7 @@ public:
void stop(); void stop();
void parse( std::unique_ptr<parser::Task> t ); void parse( std::unique_ptr<parser::Task> t );
void initialize( MediaLibrary* mediaLibrary, IParserCb* parserCb ); void initialize( MediaLibrary* mediaLibrary, IParserCb* parserCb );
bool isIdle() const;
protected: protected:
uint8_t nbNativeThreads() const; uint8_t nbNativeThreads() const;
...@@ -75,6 +76,7 @@ protected: ...@@ -75,6 +76,7 @@ protected:
private: private:
// Thread(s) entry point // Thread(s) entry point
void mainloop(); void mainloop();
void setIdle( bool isIdle );
protected: protected:
MediaLibrary* m_ml; MediaLibrary* m_ml;
...@@ -85,6 +87,7 @@ private: ...@@ -85,6 +87,7 @@ private:
IParserCb* m_parserCb; IParserCb* m_parserCb;
bool m_stopParser; bool m_stopParser;
bool m_paused; bool m_paused;
bool m_idle;
compat::ConditionVariable m_cond; compat::ConditionVariable m_cond;
std::queue<std::unique_ptr<parser::Task>> m_tasks; std::queue<std::unique_ptr<parser::Task>> m_tasks;
std::vector<compat::Thread> m_threads; std::vector<compat::Thread> m_threads;
......
...@@ -54,6 +54,7 @@ class NoopCallback : public IMediaLibraryCb ...@@ -54,6 +54,7 @@ class NoopCallback : public IMediaLibraryCb
virtual void onEntryPointRemoved( const std::string&, bool ) override {} virtual void onEntryPointRemoved( const std::string&, bool ) override {}
virtual void onEntryPointBanned( const std::string&, bool ) override {} virtual void onEntryPointBanned( const std::string&, bool ) override {}
virtual void onEntryPointUnbanned( const std::string&, bool ) override {} virtual void onEntryPointUnbanned( const std::string&, bool ) override {}
virtual void onBackgroundTasksIdleChanged( bool ) override {}
}; };
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment