Commit 0f9b4324 authored by Hugo Beauzée-Luyssen's avatar Hugo Beauzée-Luyssen

Refactor Parser & Services

This cleans the way we signal task completion, reduces contention, and
will allow us to handle multithreading better
parent b37a0da0
......@@ -70,6 +70,7 @@ list(APPEND SRC_LIST ${HEADERS_LIST}
Playlist.cpp
parser/Parser.cpp
parser/ParserService.cpp
factory/FileSystem.cpp
filesystem/common/CommonFile.cpp
......
......@@ -35,7 +35,6 @@
#include "Folder.h"
#include "Media.h"
#include "MediaLibrary.h"
#include "IMetadataService.h"
#include "Label.h"
#include "logging/Logger.h"
#include "Movie.h"
......@@ -365,20 +364,9 @@ bool MediaLibrary::deletePlaylist( unsigned int playlistId )
return Playlist::destroy( m_dbConnection.get(), playlistId );
}
void MediaLibrary::addMetadataService(std::unique_ptr<IMetadataService> service)
{
if ( service->initialize( m_parser.get(), this ) == false )
{
//FIXME: This is missing a name or something more specific
LOG_ERROR( "Failed to initialize service" );
return;
}
m_parser->addService( std::move( service ) );
}
void MediaLibrary::startParser()
{
m_parser.reset( new Parser( m_dbConnection.get(), m_callback ) );
m_parser.reset( new Parser( m_dbConnection.get(), this, m_callback ) );
const char* args[] = {
"-vv",
......@@ -397,8 +385,8 @@ void MediaLibrary::startParser()
auto vlcService = std::unique_ptr<VLCMetadataService>( new VLCMetadataService( m_vlcInstance, m_dbConnection.get(), m_fsFactory ) );
auto thumbnailerService = std::unique_ptr<VLCThumbnailer>( new VLCThumbnailer( m_vlcInstance ) );
addMetadataService( std::move( vlcService ) );
addMetadataService( std::move( thumbnailerService ) );
m_parser->addService( std::move( vlcService ) );
m_parser->addService( std::move( thumbnailerService ) );
m_parser->start();
}
......
......@@ -24,6 +24,7 @@
#define MEDIALIBRARY_H
class Parser;
class ParserService;
class DiscovererWorker;
class SqliteConnection;
......@@ -100,7 +101,6 @@ class MediaLibrary : public IMediaLibrary
static const std::vector<std::string> supportedAudioExtensions;
private:
void addMetadataService( std::unique_ptr<IMetadataService> service );
virtual void startParser();
virtual void startDiscoverer();
bool updateDatabaseModel( unsigned int previousVersion );
......
......@@ -31,37 +31,30 @@
#include "Show.h"
#include "utils/Filename.h"
VLCMetadataService::VLCMetadataService(const VLC::Instance& vlc, DBConnection dbConnection, std::shared_ptr<factory::IFileSystem> fsFactory )
VLCMetadataService::VLCMetadataService( const VLC::Instance& vlc, DBConnection dbConnection, std::shared_ptr<factory::IFileSystem> fsFactory )
: m_instance( vlc )
, m_cb( nullptr )
, m_ml( nullptr )
, m_dbConn( dbConnection )
, m_fsFactory( fsFactory )
{
}
bool VLCMetadataService::initialize( IMetadataServiceCb* callback, MediaLibrary* ml )
bool VLCMetadataService::initialize()
{
m_cb = callback;
m_ml = ml;
m_ml = mediaLibrary();
m_unknownArtist = Artist::fetch( m_dbConn, medialibrary::UnknownArtistID );
if ( m_unknownArtist == nullptr )
LOG_ERROR( "Failed to cache unknown artist" );
return m_unknownArtist != nullptr;
}
unsigned int VLCMetadataService::priority() const
{
return 100;
}
void VLCMetadataService::run(std::shared_ptr<Media> media, std::shared_ptr<File> file, void* data )
parser::Task::Status VLCMetadataService::run( parser::Task& task )
{
auto media = task.media;
auto file = task.file;
if ( media->duration() != -1 )
{
LOG_INFO( file->mrl(), " was already parsed" );
m_cb->done( media, file, Status::Success, data );
return;
return parser::Task::Status::Success;
}
LOG_INFO( "Parsing ", file->mrl() );
auto chrono = std::chrono::steady_clock::now();
......@@ -86,23 +79,20 @@ void VLCMetadataService::run(std::shared_ptr<Media> media, std::shared_ptr<File>
ctx->vlcMedia.parseAsync();
auto success = m_cond.wait_for( lock, std::chrono::seconds( 5 ), [&done]() { return done == true; } );
if ( success == false )
m_cb->done( ctx->media, file, Status::Fatal, data );
else
{
auto status = handleMediaMeta( media, file, ctx->vlcMedia );
m_cb->done( ctx->media, file, status, data );
}
return parser::Task::Status::Fatal;
auto status = handleMediaMeta( media, file, ctx->vlcMedia );
auto duration = std::chrono::steady_clock::now() - chrono;
LOG_DEBUG( "Parsed ", file->mrl(), " in ", std::chrono::duration_cast<std::chrono::milliseconds>( duration ).count(), "ms" );
return status;
}
IMetadataService::Status VLCMetadataService::handleMediaMeta( std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::Media& vlcMedia ) const
parser::Task::Status VLCMetadataService::handleMediaMeta( std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::Media& vlcMedia ) const
{
const auto tracks = vlcMedia.tracks();
if ( tracks.size() == 0 )
{
LOG_ERROR( "Failed to fetch tracks" );
return Status::Fatal;
return parser::Task::Status::Fatal;
}
auto t = m_dbConn->newTransaction();
......@@ -127,18 +117,18 @@ IMetadataService::Status VLCMetadataService::handleMediaMeta( std::shared_ptr<Me
if ( isAudio == true )
{
if ( parseAudioFile( *media, *file, vlcMedia ) == false )
return Status::Fatal;
return parser::Task::Status::Fatal;
}
else
{
if (parseVideoFile( media, vlcMedia ) == false )
return Status::Fatal;
return parser::Task::Status::Fatal;
}
auto duration = vlcMedia.duration();
media->setDuration( duration );
if ( media->save() == false )
return Status::Error;
return Status::Success;
return parser::Task::Status::Error;
return parser::Task::Status::Success;
}
/* Video files */
......
......@@ -27,15 +27,16 @@
#include <vlcpp/vlc.hpp>
#include <mutex>
#include "IMetadataService.h"
#include "parser/ParserService.h"
#include "MediaLibrary.h"
#include "AlbumTrack.h"
class VLCMetadataService : public IMetadataService
class VLCMetadataService : public ParserService
{
struct Context
{
explicit Context( std::shared_ptr<Media> media_ )
: media( media_ )
explicit Context( std::shared_ptr<Media> media )
: media( media )
{
}
......@@ -44,14 +45,13 @@ class VLCMetadataService : public IMetadataService
};
public:
explicit VLCMetadataService(const VLC::Instance& vlc, DBConnection dbConnection, std::shared_ptr<factory::IFileSystem> fsFactory);
explicit VLCMetadataService( const VLC::Instance& vlc, DBConnection dbConnection, std::shared_ptr<factory::IFileSystem> fsFactory );
virtual bool initialize( IMetadataServiceCb *callback, MediaLibrary* ml ) override;
virtual unsigned int priority() const override;
virtual void run( std::shared_ptr<Media> media, std::shared_ptr<File> file, void *data ) override;
virtual bool initialize() override;
virtual parser::Task::Status run( parser::Task& task ) override;
private:
Status handleMediaMeta(std::shared_ptr<Media> media , std::shared_ptr<File> file, VLC::Media &vlcMedia ) const;
parser::Task::Status handleMediaMeta(std::shared_ptr<Media> media , std::shared_ptr<File> file, VLC::Media &vlcMedia ) const;
std::shared_ptr<Album> findAlbum(File& file, VLC::Media& vlcMedia, const std::string& title, Artist* albumArtist ) const;
bool parseAudioFile(Media& media, File& file, VLC::Media &vlcMedia ) const;
bool parseVideoFile( std::shared_ptr<Media> media, VLC::Media& vlcMedia ) const;
......@@ -61,14 +61,13 @@ private:
std::shared_ptr<Album> handleAlbum(Media& media, File& file, VLC::Media& vlcMedia, std::shared_ptr<Artist> albumArtist, std::shared_ptr<Artist> artist ) const;
VLC::Instance m_instance;
IMetadataServiceCb* m_cb;
MediaLibrary* m_ml;
std::vector<Context*> m_keepAlive;
std::mutex m_mutex;
std::condition_variable m_cond;
DBConnection m_dbConn;
std::shared_ptr<factory::IFileSystem> m_fsFactory;
std::shared_ptr<Artist> m_unknownArtist;
MediaLibrary* m_ml;
};
#endif // VLCMETADATASERVICE_H
......@@ -42,9 +42,8 @@
#include "logging/Logger.h"
#include "MediaLibrary.h"
VLCThumbnailer::VLCThumbnailer(const VLC::Instance &vlc)
VLCThumbnailer::VLCThumbnailer( const VLC::Instance &vlc )
: m_instance( vlc )
, m_cb( nullptr )
, m_ml( nullptr )
#ifdef WITH_EVAS
, m_canvas( nullptr, &evas_free )
......@@ -86,40 +85,33 @@ VLCThumbnailer::~VLCThumbnailer()
#endif
}
bool VLCThumbnailer::initialize(IMetadataServiceCb *callback, MediaLibrary* ml)
bool VLCThumbnailer::initialize()
{
m_cb = callback;
m_ml = ml;
m_ml = mediaLibrary();
return true;
}
unsigned int VLCThumbnailer::priority() const
parser::Task::Status VLCThumbnailer::run( parser::Task& task )
{
// This needs to be lower than the VLCMetadataService, since we want to know the media type.
return 50;
}
auto media = task.media;
auto file = task.file;
void VLCThumbnailer::run( std::shared_ptr<Media> media, std::shared_ptr<File> file, void *data )
{
if ( media->type() == IMedia::Type::UnknownType )
{
// If we don't know the media type yet, it actually looks more like a bug
// since this should run after media type deduction, and not run in case
// that step fails.
m_cb->done( media, file, Status::Fatal, data );
return;
return parser::Task::Status::Fatal;
}
else if ( media->type() != IMedia::Type::VideoType )
{
// There's no point in generating a thumbnail for a non-video media.
m_cb->done( media, file, Status::Success, data );
return;
return parser::Task::Status::Success;
}
else if ( media->thumbnail().empty() == false )
{
LOG_INFO(media->thumbnail(), " already has a thumbnail" );
m_cb->done( media, file, Status::Success, data );
return;
return parser::Task::Status::Success;
}
LOG_INFO( "Generating ", file->mrl(), " thumbnail..." );
......@@ -130,22 +122,23 @@ void VLCThumbnailer::run( std::shared_ptr<Media> media, std::shared_ptr<File> fi
setupVout( mp );
if ( startPlayback( media, file, mp, data ) == false )
auto res = startPlayback( mp );
if ( res != parser::Task::Status::Success )
{
LOG_WARN( "Failed to generate ", file->mrl(), " thumbnail" );
return;
return res;
}
// Seek ahead to have a significant preview
if ( seekAhead( media, file, mp, data ) == false )
res = seekAhead( mp );
if ( res != parser::Task::Status::Success )
{
LOG_WARN( "Failed to generate ", file->mrl(), " thumbnail" );
return;
return res;
}
takeThumbnail( media, file, mp, data );
LOG_INFO( "Done generating ", file->mrl(), " thumbnail" );
return takeThumbnail( media, file, mp );
}
bool VLCThumbnailer::startPlayback(std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::MediaPlayer &mp, void* data )
parser::Task::Status VLCThumbnailer::startPlayback( VLC::MediaPlayer &mp )
{
mp.eventManager().onPlaying([this]() {
......@@ -164,13 +157,12 @@ bool VLCThumbnailer::startPlayback(std::shared_ptr<Media> media, std::shared_ptr
if ( success == false || s == libvlc_Error || s == libvlc_Ended )
{
// In case of timeout or error, don't go any further
m_cb->done( media, file, Status::Error, data );
return false;
return parser::Task::Status::Error;
}
return true;
return parser::Task::Status::Success;
}
bool VLCThumbnailer::seekAhead(std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::MediaPlayer& mp, void* data )
parser::Task::Status VLCThumbnailer::seekAhead( VLC::MediaPlayer& mp )
{
std::unique_lock<std::mutex> lock( m_mutex );
float pos = .0f;
......@@ -186,11 +178,8 @@ bool VLCThumbnailer::seekAhead(std::shared_ptr<Media> media, std::shared_ptr<Fil
// Since we're locking a mutex for each position changed, let's unregister ASAP
event->unregister();
if ( success == false )
{
m_cb->done( media, file, Status::Error, data );
return false;
}
return true;
return parser::Task::Status::Error;
return parser::Task::Status::Success;
}
void VLCThumbnailer::setupVout( VLC::MediaPlayer& mp )
......@@ -245,7 +234,7 @@ void VLCThumbnailer::setupVout( VLC::MediaPlayer& mp )
);
}
bool VLCThumbnailer::takeThumbnail(std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::MediaPlayer &mp, void *data)
parser::Task::Status VLCThumbnailer::takeThumbnail( std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::MediaPlayer &mp )
{
// lock, signal that we want a thumbnail, and wait.
{
......@@ -256,13 +245,10 @@ bool VLCThumbnailer::takeThumbnail(std::shared_ptr<Media> media, std::shared_ptr
return m_thumbnailRequired == false;
});
if ( success == false )
{
m_cb->done( media, file, Status::Error, data );
return false;
}
return parser::Task::Status::Error;
}
mp.stop();
return compress( media, file, data );
return compress( media, file );
}
#ifdef WITH_JPEG
......@@ -282,7 +268,7 @@ struct jpegError : public jpeg_error_mgr
#endif
bool VLCThumbnailer::compress( std::shared_ptr<Media> media, std::shared_ptr<File> file, void *data )
parser::Task::Status VLCThumbnailer::compress( std::shared_ptr<Media> media, std::shared_ptr<File> file )
{
auto path = m_ml->thumbnailPath();
path += "/";
......@@ -303,8 +289,7 @@ bool VLCThumbnailer::compress( std::shared_ptr<Media> media, std::shared_ptr<Fil
if ( fOut == nullptr )
{
LOG_ERROR("Failed to open thumbnail file ", path);
m_cb->done( media, file, Status::Error, data );
return false;
return parser::Task::Status::Error;
}
jpeg_compress_struct compInfo;
......@@ -320,8 +305,7 @@ bool VLCThumbnailer::compress( std::shared_ptr<Media> media, std::shared_ptr<Fil
{
LOG_ERROR("JPEG failure: ", err.message);
jpeg_destroy_compress(&compInfo);
m_cb->done( media, file, Status::Error, data );
return false;
return parser::Task::Status::Error;
}
jpeg_create_compress(&compInfo);
......@@ -346,7 +330,7 @@ bool VLCThumbnailer::compress( std::shared_ptr<Media> media, std::shared_ptr<Fil
#elif defined(WITH_EVAS)
auto evas_obj = std::unique_ptr<Evas_Object, void(*)(Evas_Object*)>( evas_object_image_add( m_canvas.get() ), evas_object_del );
if ( evas_obj == nullptr )
return false;
return parser::Task::Status::Error;
uint8_t *p_buff = m_buff.get();
if ( DesiredWidth != m_width )
......@@ -370,9 +354,8 @@ bool VLCThumbnailer::compress( std::shared_ptr<Media> media, std::shared_ptr<Fil
#endif
media->setThumbnail( path );
LOG_INFO( "Done generating ", file->mrl(), " thumbnail" );
if ( media->save() == false )
m_cb->done( media, file, Status::Error, data );
else
m_cb->done( media, file, Status::Success, data );
return true;
return parser::Task::Status::Error;
return parser::Task::Status::Success;
}
......@@ -30,7 +30,7 @@
#include <Evas.h>
#endif
#include "IMetadataService.h"
#include "parser/ParserService.h"
#ifdef WITH_JPEG
#define BPP 3
......@@ -42,21 +42,20 @@
#error No compression strategy
#endif
class VLCThumbnailer : public IMetadataService
class VLCThumbnailer : public ParserService
{
public:
explicit VLCThumbnailer( const VLC::Instance& vlc );
virtual ~VLCThumbnailer();
virtual bool initialize(IMetadataServiceCb *callback, MediaLibrary *ml) override;
virtual unsigned int priority() const override;
virtual void run(std::shared_ptr<Media> media, std::shared_ptr<File> file, void *data ) override;
virtual parser::Task::Status run( parser::Task& task ) override;
virtual bool initialize() override;
private:
bool startPlayback(std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::MediaPlayer& mp, void *data);
bool seekAhead( std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::MediaPlayer &mp, void *data);
void setupVout(VLC::MediaPlayer &mp);
bool takeThumbnail(std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::MediaPlayer &mp, void* data);
bool compress(std::shared_ptr<Media> media, std::shared_ptr<File> file, void* data );
parser::Task::Status startPlayback( VLC::MediaPlayer& mp );
parser::Task::Status seekAhead( VLC::MediaPlayer &mp );
void setupVout( VLC::MediaPlayer &mp );
parser::Task::Status takeThumbnail( std::shared_ptr<Media> media, std::shared_ptr<File> file, VLC::MediaPlayer &mp );
parser::Task::Status compress( std::shared_ptr<Media> media, std::shared_ptr<File> file );
private:
// Force a base width, let height be computed depending on A/R
......@@ -66,7 +65,6 @@ private:
private:
VLC::Instance m_instance;
IMetadataServiceCb* m_cb;
MediaLibrary* m_ml;
std::mutex m_mutex;
std::condition_variable m_cond;
......
......@@ -24,120 +24,52 @@
#include <algorithm>
#include "IMediaLibrary.h"
#include "Media.h"
#include "File.h"
Parser::Parser( DBConnection dbConnection, IMediaLibraryCb* cb )
: m_stopParser( false )
, m_dbConnection( dbConnection )
Parser::Parser( DBConnection dbConnection, MediaLibrary* ml, IMediaLibraryCb* cb )
: m_dbConnection( dbConnection )
, m_ml( ml )
, m_callback( cb )
, m_opToDo( 0 )
, m_opDone( 0 )
, m_percent( 0 )
, m_paused( false )
{
}
Parser::~Parser()
void Parser::addService( ServicePtr service )
{
if ( m_thread.joinable() )
{
{
std::lock_guard<std::mutex> lock( m_lock );
if ( m_tasks.empty() == true )
m_cond.notify_all();
m_stopParser = true;
}
m_thread.join();
}
while ( m_tasks.empty() == false )
{
delete m_tasks.front();
m_tasks.pop();
}
}
void Parser::addService( std::unique_ptr<IMetadataService> service )
{
m_services.emplace_back( std::move( service ) );
std::push_heap( m_services.begin(), m_services.end(), []( const ServicePtr& a, const ServicePtr& b )
{
// We want higher priority first
return a->priority() < b->priority();
});
service->initialize( m_ml, this );
m_services.push_back( std::move( service ) );
}
void Parser::parse( std::shared_ptr<Media> media, std::shared_ptr<File> file )
{
std::lock_guard<std::mutex> lock( m_lock );
if ( m_services.size() == 0 )
return;
m_tasks.push( new Task( media, file, m_services, m_callback ) );
m_services[0]->parse( std::unique_ptr<parser::Task>( new parser::Task( media, file ) ) );
m_opToDo += m_services.size();
updateStats();
if ( m_paused == false )
m_cond.notify_all();
}
void Parser::start()
{
// Ensure we don't start multiple times.
assert( m_thread.joinable() == false );
m_thread = std::thread{ &Parser::run, this };
restore();
for ( auto& s : m_services )
s->start();
}
void Parser::pause()
{
std::lock_guard<std::mutex> lock( m_lock );
m_paused = true;
for ( auto& s : m_services )
s->pause();
}
void Parser::resume()
{
std::lock_guard<std::mutex> lock( m_lock );
m_paused = false;
m_cond.notify_all();
}
void Parser::run()
{
LOG_INFO("Starting Parser thread");
restore();
while ( m_stopParser == false )
{
Task* task = nullptr;
{
std::unique_lock<std::mutex> lock( m_lock );
if ( m_tasks.empty() == true || m_paused == true )
{
m_cond.wait( lock, [this]() {
return ( m_tasks.empty() == false && m_paused == false )
|| m_stopParser == true;
});
// We might have been woken up because the parser is being destroyed
if ( m_stopParser == true )
break;
}
// Otherwise it's safe to assume we have at least one element.
task = m_tasks.front();
m_tasks.pop();
}
try
{
(*task->it)->run( task->media, task->file, task );
// Consider the task invalid starting from this point. If it completed
// it cleared itself afterward.
}
catch (const std::exception& ex)
{
LOG_ERROR( "Caught an exception during ", task->file->mrl(), " parsing: ", ex.what() );
done( task->media, task->file, IMetadataService::Status::Fatal, task );
}
}
LOG_INFO("Exiting Parser thread");
for ( auto& s : m_services )
s->resume();
}
void Parser::restore()
......@@ -149,11 +81,10 @@ void Parser::restore()
+ " WHERE parsed = 0 AND is_present = 1";
auto files = File::fetchAll<File>( m_dbConnection, req );
std::lock_guard<std::mutex> lock( m_lock );
for ( auto& f : files )
{
auto m = f->media();
m_tasks.push( new Task( m, f, m_services, m_callback ) );
parse( m, f );
}
}
......@@ -170,41 +101,27 @@ void Parser::updateStats()
}
}
Parser::Task::Task( std::shared_ptr<Media> media, std::shared_ptr<File> file, Parser::ServiceList& serviceList, IMediaLibraryCb* metadataCb )
: media( media )
, file( file )
, it( serviceList.begin() )
, end( serviceList.end() )
, cb( metadataCb )
{
}
void Parser::done( std::shared_ptr<Media> media, std::shared_ptr<File> file, IMetadataService::Status status, void* data )
void Parser::done( std::unique_ptr<parser::Task> t, parser::Task::Status status )
{
++m_opDone;
updateStats();
Task *t = reinterpret_cast<Task*>( data );
if ( status == IMetadataService::Status::TemporaryUnavailable ||
status == IMetadataService::Status::Fatal )
if ( status == parser::Task::Status::TemporaryUnavailable ||
status == parser::Task::Status::Fatal )
{
delete t;
return;
}
else if ( status == IMetadataService::Status::Success )
if ( status == parser::Task::Status::Success )
{
if ( t->cb != nullptr )
t->cb->onFileUpdated( media );
if ( m_callback != nullptr )
m_callback->onFileUpdated( t->media );
}
++t->it;
if (t->it == t->end)
auto serviceIdx = ++t->currentService;
if ( serviceIdx == m_services.size() )
{
file->markParsed();
delete t;
t->file->markParsed();
return;
}
std::lock_guard<std::mutex> lock( m_lock );
m_tasks.push( t );
m_cond.notify_all();
m_services[serviceIdx]->parse( std::move( t ) );
}
......@@ -22,60 +22,51 @@
#pragma once
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <memory>
#include <queue>
#include "IMetadataService.h"
#include "IMediaLibrary.h"
#include "ParserService.h"
#include "File.h"
class Parser : public IMetadataServiceCb
class IMediaLibraryCb;
// Use an interface to expose only the "done" method
class IParserCb
{
public:
Parser( DBConnection dbConnection, IMediaLibraryCb* cb );
~Parser();
void addService( std::unique_ptr<IMetadataService> service );
void parse(std::shared_ptr<Media> media , std::shared_ptr<File> file);
virtual ~IParserCb() = default;
virtual void done( std::unique_ptr<parser::Task> task, parser::Task::Status status ) = 0;
};
class Parser : IParserCb
{
public:
using ServicePtr = std::unique_ptr<ParserService>;
Parser( DBConnection dbConnection, MediaLibrary* ml, IMediaLibraryCb* cb );
void addService( ServicePtr service );