Commit e500dede authored by Hugo Beauzée-Luyssen's avatar Hugo Beauzée-Luyssen

Rewrite IMetadataService code

Processing is done by a threadpool (limited to a single thread so far)
This will allow using asynchronous modules one after another
Fixing VLC service cleaup code
parent f7ca3e59
......@@ -6,6 +6,27 @@
#include "Types.h"
class IParserCb
{
public:
virtual ~IParserCb() {}
/**
* @brief onServiceDone will be called after each MetadataService completes
* @param file The file being parsed
* @param status A flag describing the parsing outcome
*/
virtual void onServiceDone( FilePtr file, ServiceStatus status ) = 0;
/**
* @brief onFileDone will be called when all parsing operations on a given file have been completed
*
* This doesn't imply all modules have succeeded
*/
//FIXME: We should probably expose some way of telling if the user should retry later in
// case of tmeporary failure
virtual void onFileDone( FilePtr file ) = 0;
};
class IMediaLibrary
{
public:
......@@ -29,11 +50,13 @@ class IMediaLibrary
/**
* @brief addMetadataService Adds a service to parse media
*
* Use is expected to add all services before calling parse for the first time.
* Once parse has been called, adding another service is an undefined behavior
* User is expected to add all services before calling parse for the first time.
* Once parse has been called, adding another service is an undefined behavior.
* This method will call service->initialize(), therefor the passed ServiceStatus
* is expected to be uninitialized.
*/
virtual void addMetadataService( IMetadataService* service ) = 0;
virtual void parse( FilePtr file ) = 0;
virtual void parse( FilePtr file, IParserCb* cb ) = 0;
};
class MediaLibraryFactory
......
......@@ -7,8 +7,7 @@ class IMetadataServiceCb
{
public:
virtual ~IMetadataServiceCb(){}
virtual void done( FilePtr file ) = 0;
virtual void error( FilePtr file, const std::string& error ) = 0;
virtual void done( FilePtr file, ServiceStatus status, void* data ) = 0;
};
class IMetadataService
......@@ -17,7 +16,7 @@ class IMetadataService
virtual ~IMetadataService() {}
virtual bool initialize( IMetadataServiceCb* callback, IMediaLibrary* ml ) = 0;
virtual unsigned int priority() const = 0;
virtual bool run( FilePtr file ) = 0;
virtual bool run( FilePtr file, void* data ) = 0;
};
#endif // IMETADATASERVICE_H
......@@ -28,5 +28,18 @@ typedef std::shared_ptr<IVideoTrack> VideoTrackPtr;
typedef std::weak_ptr<sqlite3> DBConnection;
enum ServiceStatus
{
/// All good
StatusSuccess,
/// Something failed, but it's not critical (For instance, no internet connection for a
/// module that uses an online database)
StatusError,
/// We can't compute this file for now (for instance the file was on a network drive which
/// isn't connected anymore)
StatusTemporaryUnavailable,
/// Something failed and we won't continue
StatusFatal
};
#endif // TYPES_H
......@@ -33,6 +33,7 @@ list(APPEND SRC_LIST ${HEADERS_LIST}
Movie.cpp
VideoTrack.cpp
AudioTrack.cpp
Parser.cpp
metadata_services/VLCMetadataService.cpp
)
......
......@@ -9,12 +9,14 @@
#include "IMetadataService.h"
#include "Label.h"
#include "Movie.h"
#include "Parser.h"
#include "Show.h"
#include "ShowEpisode.h"
#include "SqliteTools.h"
#include "VideoTrack.h"
MediaLibrary::MediaLibrary()
: m_parser( new Parser )
{
}
......@@ -134,17 +136,12 @@ MoviePtr MediaLibrary::createMovie( const std::string& title )
void MediaLibrary::addMetadataService(IMetadataService* service)
{
typedef std::unique_ptr<IMetadataService> MdsPtr;
std::function<bool(const MdsPtr&, const MdsPtr&)> comp = []( const MdsPtr& a, const MdsPtr& b )
{
// We want higher priority first
return a->priority() > b->priority();
};
m_mdServices.push_back( MdsPtr( service ) );
std::push_heap( m_mdServices.begin(), m_mdServices.end(), comp );
service->initialize( m_parser.get(), this );
m_parser->addService( service );
}
void MediaLibrary::parse( FilePtr file )
void MediaLibrary::parse(FilePtr file , IParserCb* cb)
{
assert(false);
m_parser->parse( file, cb );
}
#ifndef MEDIALIBRARY_H
#define MEDIALIBRARY_H
class Parser;
#include <sqlite3.h>
#include "IMediaLibrary.h"
......@@ -32,9 +34,10 @@ class MediaLibrary : public IMediaLibrary
virtual MoviePtr createMovie( const std::string& title );
virtual void addMetadataService( IMetadataService* service );
virtual void parse( FilePtr file );
virtual void parse( FilePtr file, IParserCb* cb );
private:
std::shared_ptr<sqlite3> m_dbConnection;
std::vector<std::unique_ptr<IMetadataService>> m_mdServices;
std::unique_ptr<Parser> m_parser;
};
#endif // MEDIALIBRARY_H
#include <algorithm>
#include "Parser.h"
Parser::Parser()
{
}
Parser::~Parser()
{
if ( m_thread == nullptr )
return;
{
std::lock_guard<std::mutex> lock( m_lock );
if ( m_tasks.empty() == true )
m_cond.notify_all();
m_stopParser = true;
}
m_thread->join();
}
void Parser::addService(IMetadataService* service)
{
std::function<bool(const ServicePtr&, const ServicePtr&)> comp = []( const ServicePtr& a, const ServicePtr& b )
{
// We want higher priority first
return a->priority() > b->priority();
};
m_services.push_back( ServicePtr( service ) );
std::push_heap( m_services.begin(), m_services.end(), comp );
}
void Parser::parse(FilePtr file, IParserCb* cb)
{
std::lock_guard<std::mutex> lock( m_lock );
m_tasks.push_back( new Task( file, m_services, cb ) );
if ( m_thread == nullptr )
m_thread.reset( new std::thread( &Parser::run, this ) );
}
void Parser::run()
{
while ( m_stopParser == false )
{
Task* task = nullptr;
{
std::unique_lock<std::mutex> lock( m_lock );
if ( m_tasks.empty() == true )
{
m_cond.wait( lock, [this]() { return m_tasks.empty() == false || m_stopParser == true; });
// We might have been woken up because the parser is being destroyed
if ( m_stopParser == true )
return;
}
// Otherwise it's safe to assume we have at least one element.
task = m_tasks.front();
m_tasks.erase(m_tasks.begin());
}
(*task->it)->run( task->file, task );
}
}
Parser::Task::Task( FilePtr file, Parser::ServiceList& serviceList, IParserCb* parserCb )
: file(file)
, it( serviceList.begin() )
, end( serviceList.end() )
, cb( parserCb )
{
}
void Parser::done( FilePtr file, ServiceStatus status, void* data )
{
Task *t = reinterpret_cast<Task*>( data );
t->cb->onServiceDone( file, status );
if ( status == StatusTemporaryUnavailable || status == StatusFatal )
{
delete t;
return ;
}
++t->it;
if (t->it == t->end)
{
t->cb->onFileDone( file );
delete t;
return;
}
std::lock_guard<std::mutex> lock( m_lock );
m_tasks.push_back( t );
}
#ifndef PARSER_H
#define PARSER_H
#include <condition_variable>
#include <mutex>
#include <thread>
#include <memory>
#include <vector>
#include "IMetadataService.h"
#include "IMediaLibrary.h"
class Parser : public IMetadataServiceCb
{
public:
public:
Parser();
~Parser();
void addService( IMetadataService* service );
void parse( FilePtr file, IParserCb* cb );
private:
virtual void done( FilePtr file, ServiceStatus status, void* data );
void run();
private:
typedef std::unique_ptr<IMetadataService> ServicePtr;
typedef std::vector<ServicePtr> ServiceList;
struct Task
{
Task(FilePtr file, ServiceList& serviceList , IParserCb* parserCb);
FilePtr file;
ServiceList::iterator it;
ServiceList::iterator end;
IParserCb* cb;
};
private:
ServiceList m_services;
std::vector<Task*> m_tasks;
std::unique_ptr<std::thread> m_thread;
std::mutex m_lock;
std::condition_variable m_cond;
volatile bool m_stopParser;
};
#endif // PARSER_H
......@@ -17,6 +17,7 @@ VLCMetadataService::VLCMetadataService( libvlc_instance_t* vlc )
VLCMetadataService::~VLCMetadataService()
{
cleanup();
libvlc_release( m_instance );
}
......@@ -33,21 +34,22 @@ unsigned int VLCMetadataService::priority() const
return 100;
}
bool VLCMetadataService::run( FilePtr file )
bool VLCMetadataService::run( FilePtr file , void* data )
{
cleanup();
auto ctx = new Context;
ctx->self = this;
ctx->file = file;
ctx->data = data;
ctx->media = libvlc_media_new_path( m_instance, file->mrl().c_str() );
ctx->m_em = libvlc_media_event_manager( ctx->media );
ctx->mp = libvlc_media_player_new_from_media( ctx->media );
ctx->mp_em = libvlc_media_player_event_manager( ctx->mp );
libvlc_audio_output_set( ctx->mp, "dummy" );
// libvlc_audio_output_set( ctx->mp, "dummy" );
//attach events
libvlc_event_attach( ctx->mp_em, libvlc_MediaPlayerLengthChanged, &eventCallback, ctx );
libvlc_event_attach( ctx->m_em, libvlc_MediaParsedChanged, &eventCallback, ctx );
//libvlc_media_player_play( ctx->mp );
libvlc_media_parse_async( ctx->media );
return true;
}
......@@ -58,19 +60,31 @@ void VLCMetadataService::eventCallback( const libvlc_event_t* e, void* data )
switch ( e->type )
{
case libvlc_MediaParsedChanged:
ctx->self->handleMediaMeta( ctx );
ctx->self->parse( ctx );
return;
default:
return;
}
}
void VLCMetadataService::handleMediaMeta( VLCMetadataService::Context* ctx )
void VLCMetadataService::parse(VLCMetadataService::Context* ctx)
{
auto status = handleMediaMeta( ctx );
ctx->self->m_cb->done( ctx->file, status, ctx->data );
std::lock_guard<std::mutex> lock( m_lock );
m_cleanup.push_back( ctx );
}
ServiceStatus VLCMetadataService::handleMediaMeta( VLCMetadataService::Context* ctx )
{
libvlc_media_track_t** tracks;
auto nbTracks = libvlc_media_tracks_get( ctx->media, &tracks );
if ( nbTracks == 0 )
ctx->self->m_cb->error( ctx->file, "Failed to fetch tracks" );
{
std::cerr << "Failed to fetch tracks" << std::endl;
return StatusFatal;
}
bool isAudio = true;
for ( unsigned int i = 0; i < nbTracks; ++i )
{
......@@ -99,10 +113,7 @@ void VLCMetadataService::handleMediaMeta( VLCMetadataService::Context* ctx )
auto file = std::static_pointer_cast<File>( ctx->file );
file->setReady();
libvlc_media_tracks_release( tracks, nbTracks );
ctx->self->m_cb->done( ctx->file );
libvlc_event_detach( ctx->m_em, libvlc_MediaParsedChanged, &eventCallback, ctx );
libvlc_event_detach( ctx->mp_em, libvlc_MediaPlayerLengthChanged, &eventCallback, ctx );
delete ctx;
return StatusSuccess;
}
void VLCMetadataService::parseAudioFile( VLCMetadataService::Context* ctx )
......@@ -173,6 +184,16 @@ void VLCMetadataService::parseVideoFile( VLCMetadataService::Context* )
}
void VLCMetadataService::cleanup()
{
std::lock_guard<std::mutex> lock( m_lock );
while ( m_cleanup.empty() == false )
{
delete m_cleanup.back();
m_cleanup.pop_back();
}
}
VLCMetadataService::Context::Context()
: self( nullptr )
, media( nullptr )
......@@ -184,6 +205,8 @@ VLCMetadataService::Context::Context()
VLCMetadataService::Context::~Context()
{
libvlc_event_detach( m_em, libvlc_MediaParsedChanged, &VLCMetadataService::eventCallback, this );
libvlc_event_detach( mp_em, libvlc_MediaPlayerLengthChanged, &VLCMetadataService::eventCallback, this );
libvlc_media_release( media );
libvlc_media_player_release( mp );
}
#ifndef VLCMETADATASERVICE_H
#define VLCMETADATASERVICE_H
#include "IMetadataService.h"
#include <vlc/vlc.h>
#include <mutex>
#include "IMetadataService.h"
class VLCMetadataService : public IMetadataService
{
......@@ -13,7 +14,7 @@ class VLCMetadataService : public IMetadataService
virtual bool initialize( IMetadataServiceCb *callback, IMediaLibrary* ml );
virtual unsigned int priority() const;
virtual bool run( FilePtr file );
virtual bool run( FilePtr file, void *data );
private:
struct Context
......@@ -23,6 +24,7 @@ class VLCMetadataService : public IMetadataService
VLCMetadataService* self;
FilePtr file;
void* data;
libvlc_media_t* media;
libvlc_event_manager_t* m_em;
libvlc_media_player_t* mp;
......@@ -31,13 +33,20 @@ class VLCMetadataService : public IMetadataService
private:
static void eventCallback( const libvlc_event_t* e, void* data );
void handleMediaMeta( Context* ctx );
void parse( Context* ctx );
ServiceStatus handleMediaMeta( Context* ctx );
void parseAudioFile( Context* ctx );
void parseVideoFile( Context* ctx );
void cleanup();
libvlc_instance_t* m_instance;
IMetadataServiceCb* m_cb;
IMediaLibrary* m_ml;
// We can't cleanup from the callback since it's called from a VLC thread.
// If the refcounter was to reach 0 from there, it would destroy resources
// that are still held.
std::vector<Context*> m_cleanup;
std::mutex m_lock;
};
#endif // VLCMETADATASERVICE_H
#include "gtest/gtest.h"
#include <iostream>
#include <vlc/vlc.h>
#include <chrono>
#include <condition_variable>
......@@ -12,25 +11,31 @@
#include "IAlbumTrack.h"
#include "metadata_services/VLCMetadataService.h"
class ServiceCb : public IMetadataServiceCb
class ServiceCb : public IParserCb
{
public:
std::condition_variable waitCond;
std::mutex mutex;
volatile bool failed;
ServiceCb()
: failed(false)
{
}
virtual void done( FilePtr )
~ServiceCb()
{
}
virtual void onServiceDone( FilePtr, ServiceStatus status )
{
if ( status != StatusSuccess )
failed = true;
waitCond.notify_all();
}
virtual void error( FilePtr, const std::string& error )
virtual void onFileDone( FilePtr )
{
std::cerr << "Error: " << error << std::endl;
FAIL();
}
};
......@@ -49,12 +54,17 @@ class VLCMetadataServices : public testing::Test
virtual void SetUp()
{
cb->failed = false;
ml.reset( MediaLibraryFactory::create() );
vlcInstance = libvlc_new( 0, NULL );
if ( vlcInstance != nullptr )
libvlc_release( vlcInstance );
const char* args[] = {
"-vv"
};
vlcInstance = libvlc_new( sizeof(args) /sizeof(args[0]), args );
ASSERT_NE(vlcInstance, nullptr);
auto vlcService = new VLCMetadataService( vlcInstance );
vlcService->initialize( cb.get(), ml.get() );
// This takes ownership of vlcService
ml->addMetadataService( vlcService );
bool res = ml->initialize( "test.db" );
......@@ -64,6 +74,7 @@ class VLCMetadataServices : public testing::Test
virtual void TearDown()
{
libvlc_release( vlcInstance );
vlcInstance = nullptr;
ml.reset();
unlink("test.db");
}
......@@ -77,13 +88,17 @@ TEST_F( VLCMetadataServices, ParseAudio )
{
std::unique_lock<std::mutex> lock( cb->mutex );
auto file = ml->addFile( "mr-zebra.mp3" );
ml->parse(file);
ml->parse( file, cb.get() );
std::vector<AudioTrackPtr> tracks;
cb->waitCond.wait( lock, [&]{ return file->audioTracks( tracks ) == true && tracks.size() > 0; } );
bool res = cb->waitCond.wait_for( lock, std::chrono::seconds( 5 ), [&]{
return cb->failed == true || ( file->audioTracks( tracks ) == true && tracks.size() > 0 );
} );
ASSERT_TRUE( res );
ASSERT_FALSE( cb->failed );
SetUp();
file = ml->file( "mr-zebra.mp3" );
bool res = file->audioTracks( tracks );
res = file->audioTracks( tracks );
ASSERT_TRUE( res );
ASSERT_EQ( tracks.size(), 1u );
auto track = tracks[0];
......@@ -97,9 +112,10 @@ TEST_F( VLCMetadataServices, ParseAlbum )
{
std::unique_lock<std::mutex> lock( cb->mutex );
auto file = ml->addFile( "mr-zebra.mp3" );
ml->parse(file);
bool res = cb->waitCond.wait_for( lock, std::chrono::seconds( 2 ),
[&]{ return file->albumTrack() != nullptr; } );
ml->parse( file, cb.get() );
bool res = cb->waitCond.wait_for( lock, std::chrono::seconds( 5 ), [&]{
return cb->failed == true || file->albumTrack() != nullptr;
} );
ASSERT_TRUE( res );
SetUp();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment