Commit dbfeff43 authored by Hugo Beauzée-Luyssen's avatar Hugo Beauzée-Luyssen

Batch media added&modified notifications through the new notifier thread

parent 1ca51b48
......@@ -56,21 +56,23 @@ class IMediaLibraryCb
public:
virtual ~IMediaLibraryCb() = default;
/**
* @brief onFileAdded Will be called when a media gets added.
* @brief onFileAdded Will be called when some media get added.
* Depending if the media is being restored or was just discovered,
* the media type might be a best effort guess. If the media was freshly
* discovered, it is extremely likely that no metadata will be
* available yet.
* The number of media is undefined, but is guaranteed to be at least 1.
*/
virtual void onMediaAdded( MediaPtr media ) = 0;
virtual void onMediaAdded( std::vector<MediaPtr> media ) = 0;
/**
* @brief onFileUpdated Will be called when a file metadata gets updated.
*/
virtual void onMediaUpdated( MediaPtr media ) = 0;
virtual void onMediaUpdated( std::vector<MediaPtr> media ) = 0;
virtual void onMediaDeleted( std::vector<int64_t> ids ) = 0;
virtual void onArtistAdded( ArtistPtr artist ) = 0;
virtual void onAlbumAdded( AlbumPtr album ) = 0;
/**
* @brief onTrackAdded Called when a media gets detected as an album track
......
......@@ -150,7 +150,7 @@ bool MediaLibrary::createAllTables()
void MediaLibrary::registerEntityHooks()
{
if ( m_deletionNotifier == nullptr )
if ( m_modificationNotifier == nullptr )
return;
m_dbConnection->registerUpdateHook( policy::MediaTable::Name,
......@@ -158,7 +158,7 @@ void MediaLibrary::registerEntityHooks()
if ( reason != SqliteConnection::HookReason::Delete )
return;
Media::removeFromCache( rowId );
m_deletionNotifier->notifyMediaRemoval( rowId );
m_modificationNotifier->notifyMediaRemoval( rowId );
});
}
......@@ -259,7 +259,7 @@ std::shared_ptr<Media> MediaLibrary::addFile( const fs::IFile& fileFs, Folder& p
Media::destroy( this, mptr->id() );
return nullptr;
}
m_callback->onMediaAdded( mptr );
m_modificationNotifier->notifyMediaCreation( mptr );
if ( m_parser != nullptr )
m_parser->parse( mptr, file );
return mptr;
......@@ -488,8 +488,8 @@ void MediaLibrary::startDiscoverer()
void MediaLibrary::startDeletionNotifier()
{
m_deletionNotifier.reset( new DeletionNotifier( this ) );
m_deletionNotifier->start();
m_modificationNotifier.reset( new ModificationNotifier( this ) );
m_modificationNotifier->start();
}
bool MediaLibrary::updateDatabaseModel( unsigned int previousVersion )
......@@ -549,6 +549,11 @@ IMediaLibraryCb* MediaLibrary::getCb() const
return m_callback;
}
std::shared_ptr<ModificationNotifier> MediaLibrary::getNotifier() const
{
return m_modificationNotifier;
}
void MediaLibrary::discover( const std::string &entryPoint )
{
if ( m_discoverer != nullptr )
......
......@@ -23,7 +23,7 @@
#ifndef MEDIALIBRARY_H
#define MEDIALIBRARY_H
class DeletionNotifier;
class ModificationNotifier;
class DiscovererWorker;
class Parser;
class ParserService;
......@@ -114,6 +114,7 @@ class MediaLibrary : public IMediaLibrary
DBConnection getConn() const;
IMediaLibraryCb* getCb() const;
std::shared_ptr<ModificationNotifier> getNotifier() const;
public:
static const uint32_t DbModelVersion;
......@@ -146,7 +147,7 @@ class MediaLibrary : public IMediaLibrary
// Same reasoning applies here.
//FIXME: Having to maintain a specific ordering sucks, let's use shared_ptr or something
std::unique_ptr<DiscovererWorker> m_discoverer;
std::unique_ptr<DeletionNotifier> m_deletionNotifier;
std::shared_ptr<ModificationNotifier> m_modificationNotifier;
LogLevel m_verbosity;
Settings m_settings;
};
......
......@@ -27,10 +27,12 @@
#include "IMediaLibrary.h"
#include "Media.h"
#include "File.h"
#include "utils/DeletionNotifier.h"
Parser::Parser(MediaLibrary* ml )
Parser::Parser( MediaLibrary* ml )
: m_ml( ml )
, m_callback( ml->getCb() )
, m_notifier( ml->getNotifier() )
, m_opToDo( 0 )
, m_opDone( 0 )
, m_percent( 0 )
......@@ -109,7 +111,7 @@ void Parser::done( std::unique_ptr<parser::Task> t, parser::Task::Status status
}
if ( status == parser::Task::Status::Success )
{
m_callback->onMediaUpdated( t->media );
m_notifier->notifyMediaModification( t->media );
}
auto serviceIdx = ++t->currentService;
......
......@@ -65,6 +65,7 @@ private:
MediaLibrary* m_ml;
IMediaLibraryCb* m_callback;
std::shared_ptr<ModificationNotifier> m_notifier;
std::atomic_uint m_opToDo;
std::atomic_uint m_opDone;
std::atomic_uint m_percent;
......
......@@ -25,13 +25,13 @@
#include "DeletionNotifier.h"
#include "MediaLibrary.h"
DeletionNotifier::DeletionNotifier( MediaLibraryPtr ml )
ModificationNotifier::ModificationNotifier( MediaLibraryPtr ml )
: m_ml( ml )
, m_cb( ml->getCb() )
{
}
DeletionNotifier::~DeletionNotifier()
ModificationNotifier::~ModificationNotifier()
{
if ( m_notifierThread.joinable() == true )
{
......@@ -41,40 +41,36 @@ DeletionNotifier::~DeletionNotifier()
}
}
void DeletionNotifier::start()
void ModificationNotifier::start()
{
assert( m_notifierThread.get_id() == std::thread::id{} );
m_stop = false;
m_notifierThread = std::thread{ &DeletionNotifier::run, this };
m_notifierThread = std::thread{ &ModificationNotifier::run, this };
}
void DeletionNotifier::notifyMediaRemoval( int64_t mediaId )
void ModificationNotifier::notifyMediaCreation( MediaPtr media )
{
notifyRemoval( mediaId, m_media );
notifyCreation( std::move( media ), m_media );
}
void DeletionNotifier::notifyRemoval( int64_t rowId, DeletionNotifier::Queue& queue )
void ModificationNotifier::notifyMediaModification( MediaPtr media )
{
std::lock_guard<std::mutex> lock( m_lock );
queue.entities.push_back( rowId );
queue.timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds{ 500 };
if ( m_timeout == std::chrono::time_point<std::chrono::steady_clock>{} )
{
// If no wake up has been scheduled, schedule one now
m_timeout = queue.timeout;
m_cond.notify_all();
}
notifyModification( std::move( media ), m_media );
}
void ModificationNotifier::notifyMediaRemoval( int64_t mediaId )
{
notifyRemoval( mediaId, m_media );
}
void DeletionNotifier::run()
void ModificationNotifier::run()
{
constexpr auto ZeroTimeout = std::chrono::time_point<std::chrono::steady_clock>{};
// Create some other queue to swap with the ones that are used
// by other threads. That way we can release those early and allow
// more insertions to proceed
Queue media;
Queue<IMedia> media;
while ( m_stop == false )
{
......@@ -90,24 +86,10 @@ void DeletionNotifier::run()
checkQueue( m_media, media, nextTimeout, now );
m_timeout = nextTimeout;
}
notify( std::move( media ), &IMediaLibraryCb::onMediaDeleted );
notify( std::move( media ), &IMediaLibraryCb::onMediaAdded, &IMediaLibraryCb::onMediaUpdated, &IMediaLibraryCb::onMediaDeleted );
}
}
void DeletionNotifier::checkQueue( DeletionNotifier::Queue& input, DeletionNotifier::Queue& output,
std::chrono::time_point<std::chrono::steady_clock>& nextTimeout,
std::chrono::time_point<std::chrono::steady_clock> now)
{
constexpr auto ZeroTimeout = std::chrono::time_point<std::chrono::steady_clock>{};
// LOG_ERROR( "Input timeout: ", input.timeout.time_since_epoch(), " - Now: ", now.time_since_epoch() );
if ( input.timeout <= now && input.entities.size() > 0 )
{
using std::swap;
swap( input, output );
}
// Or is scheduled for timeout soon:
else if ( input.timeout != ZeroTimeout && ( nextTimeout == ZeroTimeout || input.timeout < nextTimeout ) )
{
nextTimeout = input.timeout;
}
}
......@@ -33,13 +33,15 @@
class IMediaLibraryCb;
class DeletionNotifier
class ModificationNotifier
{
public:
DeletionNotifier( MediaLibraryPtr ml );
~DeletionNotifier();
ModificationNotifier( MediaLibraryPtr ml );
~ModificationNotifier();
void start();
void notifyMediaCreation(MediaPtr media );
void notifyMediaModification(MediaPtr media );
void notifyMediaRemoval( int64_t media );
private:
......@@ -47,31 +49,86 @@ private:
void notify();
private:
template <typename T>
struct Queue
{
std::vector<int64_t> entities;
std::vector<std::shared_ptr<T>> added;
std::vector<std::shared_ptr<T>> modified;
std::vector<int64_t> removed;
std::chrono::time_point<std::chrono::steady_clock> timeout;
};
template <typename Func>
void notify( Queue&& queue, Func f )
template <typename T, typename AddedCb, typename ModifiedCb, typename RemovedCb>
void notify( Queue<T>&& queue, AddedCb addedCb, ModifiedCb modifiedCb, RemovedCb removedCb )
{
if ( queue.entities.size() == 0 )
return;
(*m_cb.*f)( std::move( queue.entities ) );
if ( queue.added.size() > 0 )
(*m_cb.*addedCb)( std::move( queue.added ) );
if ( queue.modified.size() > 0 )
(*m_cb.*modifiedCb)( std::move( queue.modified ) );
if ( queue.removed.size() > 0 )
(*m_cb.*removedCb)( std::move( queue.removed ) );
}
void notifyRemoval( int64_t rowId, Queue& queue );
template <typename T>
void notifyCreation( std::shared_ptr<T> entity, Queue<T>& queue )
{
std::lock_guard<std::mutex> lock( m_lock );
queue.added.push_back( std::move( entity ) );
updateTimeout( queue );
}
void checkQueue( Queue& input, Queue& output, std::chrono::time_point<std::chrono::steady_clock>& nextTimeout,
std::chrono::time_point<std::chrono::steady_clock> now );
template <typename T>
void notifyModification( std::shared_ptr<T> entity, Queue<T>& queue )
{
std::lock_guard<std::mutex> lock( m_lock );
queue.modified.push_back( std::move( entity ) );
updateTimeout( queue );
}
template <typename T>
void notifyRemoval( int64_t rowId, Queue<T>& queue )
{
std::lock_guard<std::mutex> lock( m_lock );
queue.removed.push_back( rowId );
updateTimeout( m_media );
}
template <typename T>
void updateTimeout( Queue<T>& queue )
{
queue.timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds{ 500 };
if ( m_timeout == std::chrono::time_point<std::chrono::steady_clock>{} )
{
// If no wake up has been scheduled, schedule one now
m_timeout = queue.timeout;
m_cond.notify_all();
}
}
template <typename T>
void checkQueue( Queue<T>& input, Queue<T>& output, std::chrono::time_point<std::chrono::steady_clock>& nextTimeout,
std::chrono::time_point<std::chrono::steady_clock> now )
{
constexpr auto ZeroTimeout = std::chrono::time_point<std::chrono::steady_clock>{};
// LOG_ERROR( "Input timeout: ", input.timeout.time_since_epoch(), " - Now: ", now.time_since_epoch() );
if ( input.timeout <= now )
{
using std::swap;
swap( input, output );
}
// Or is scheduled for timeout soon:
else if ( input.timeout != ZeroTimeout && ( nextTimeout == ZeroTimeout || input.timeout < nextTimeout ) )
{
nextTimeout = input.timeout;
}
}
private:
MediaLibraryPtr m_ml;
IMediaLibraryCb* m_cb;
// Queues
Queue m_media;
Queue<IMedia> m_media;
// Notifier thread
std::mutex m_lock;
......
......@@ -29,8 +29,8 @@ namespace mock
class NoopCallback : public IMediaLibraryCb
{
virtual void onMediaAdded(MediaPtr) override {}
virtual void onMediaUpdated(MediaPtr) override {}
virtual void onMediaAdded( std::vector<MediaPtr> ) override {}
virtual void onMediaUpdated( std::vector<MediaPtr> ) override {}
virtual void onMediaDeleted( std::vector<int64_t> ) override {}
virtual void onDiscoveryStarted(const std::string&) override {}
virtual void onDiscoveryCompleted(const std::string&) override {}
......
......@@ -59,7 +59,6 @@ class MediaLibraryWithoutBackground : public MediaLibraryTester
{
virtual void startDiscoverer() override {}
virtual void startParser() override {}
virtual void startDeletionNotifier() override {}
};
class MediaLibraryWithNotifier : public MediaLibraryTester
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment