ParserWorker.cpp 7.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*****************************************************************************
 * Media Library
 *****************************************************************************
 * Copyright (C) 2015 Hugo Beauzée-Luyssen, Videolabs
 *
 * Authors: Hugo Beauzée-Luyssen<hugo@beauzee.fr>
 *
 * This program is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation; either version 2.1 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
 *****************************************************************************/

23 24 25 26
#if HAVE_CONFIG_H
# include "config.h"
#endif

27
#include "ParserWorker.h"
28
#include "Parser.h"
29
#include "Media.h"
30

31 32 33
namespace medialibrary
{

34
ParserWorker::ParserWorker()
35
    : m_parserCb( nullptr )
36
    , m_stopParser( false )
37
    , m_paused( false )
38
    , m_idle( true )
39 40 41
{
}

42
void ParserWorker::start()
43 44
{
    // Ensure we don't start multiple times.
45
    assert( m_threads.size() == 0 );
46
    for ( auto i = 0u; i < m_service->nbThreads(); ++i )
47
        m_threads.emplace_back( &ParserWorker::mainloop, this );
48 49
}

50
void ParserWorker::pause()
51
{
52
    std::lock_guard<compat::Mutex> lock( m_lock );
53 54 55
    m_paused = true;
}

56
void ParserWorker::resume()
57
{
58
    std::lock_guard<compat::Mutex> lock( m_lock );
59 60 61 62
    m_paused = false;
    m_cond.notify_all();
}

63
void ParserWorker::signalStop()
64 65 66 67 68
{
    for ( auto& t : m_threads )
    {
        if ( t.joinable() )
        {
69 70 71
            std::lock_guard<compat::Mutex> lock( m_lock );
            m_cond.notify_all();
            m_stopParser = true;
72 73 74 75
        }
    }
}

76
void ParserWorker::stop()
77 78 79 80 81 82 83 84
{
    for ( auto& t : m_threads )
    {
        if ( t.joinable() )
            t.join();
    }
}

85
void ParserWorker::parse( std::shared_ptr<parser::Task> t )
86
{
87 88 89 90 91 92 93 94 95 96 97 98
    if ( m_threads.size() == 0 )
    {
        // Since the thread isn't started, no need to lock the mutex before pushing the task
        m_tasks.push( std::move( t ) );
        start();
    }
    else
    {
        std::lock_guard<compat::Mutex> lock( m_lock );
        m_tasks.push( std::move( t ) );
        m_cond.notify_all();
    }
99 100
}

101 102
bool ParserWorker::initialize( MediaLibrary* ml, parser::IParserCb* parserCb,
                               std::unique_ptr<IParserService> service )
103
{
104
    m_ml = ml;
105
    m_service = std::move( service );
106 107
    m_parserCb = parserCb;
    // Run the service specific initializer
108
    return m_service->initialize( ml );
109 110
}

111
bool ParserWorker::isIdle() const
112 113 114 115
{
    return m_idle;
}

116
void ParserWorker::flush()
117 118 119 120 121 122 123 124
{
    std::unique_lock<compat::Mutex> lock( m_lock );
    assert( m_paused == true || m_threads.empty() == true );
    m_idleCond.wait( lock, [this]() {
        return m_idle == true;
    });
    while ( m_tasks.empty() == false )
        m_tasks.pop();
125
    m_service->onFlushing();
126 127
}

128
void ParserWorker::restart()
129
{
130
    m_service->onRestarted();
131 132
}

133
void ParserWorker::mainloop()
134
{
135 136 137
    // It would be unsafe to call name() at the end of this function, since
    // we might stop the thread during ParserService destruction. This implies
    // that the underlying service has been deleted already.
138
    std::string serviceName = m_service->name();
139
    LOG_INFO("Entering ParserService [", serviceName, "] thread");
140
    setIdle( false );
141 142 143

    while ( m_stopParser == false )
    {
144
        std::shared_ptr<parser::Task> task;
145
        {
146
            std::unique_lock<compat::Mutex> lock( m_lock );
147 148
            if ( m_tasks.empty() == true || m_paused == true )
            {
149
                LOG_INFO( "Halting ParserService [", serviceName, "] mainloop" );
150
                setIdle( true );
151
                m_idleCond.notify_all();
152 153 154 155
                m_cond.wait( lock, [this]() {
                    return ( m_tasks.empty() == false && m_paused == false )
                            || m_stopParser == true;
                });
156
                LOG_INFO( "Resuming ParserService [", serviceName, "] mainloop" );
157 158 159
                // We might have been woken up because the parser is being destroyed
                if ( m_stopParser  == true )
                    break;
160
                setIdle( false );
161 162
            }
            // Otherwise it's safe to assume we have at least one element.
163
            LOG_INFO('[', serviceName, "] has ", m_tasks.size(), " tasks remaining" );
164 165 166
            task = std::move( m_tasks.front() );
            m_tasks.pop();
        }
167
        if ( task->isStepCompleted( m_service->targetedStep() ) == true )
168
        {
169
            LOG_INFO( "Skipping completed task [", serviceName, "] on ", task->item().mrl() );
170
            m_parserCb->done( std::move( task ), parser::Status::Success );
171 172
            continue;
        }
173
        parser::Status status;
174 175
        try
        {
176
            LOG_INFO( "Executing ", serviceName, " task on ", task->item().mrl() );
177
            auto chrono = std::chrono::steady_clock::now();
178 179 180 181
            auto file = std::static_pointer_cast<File>( task->item().file() );
            auto media = std::static_pointer_cast<Media>( task->item().media() );
            if ( ( file != nullptr && file->isDeleted() )
                 || ( media != nullptr && media->isDeleted() ) )
182
                status = parser::Status::Fatal;
183 184
            else
            {
185
                task->startParserStep();
186
                status = m_service->run( task->item() );
187
                auto duration = std::chrono::steady_clock::now() - chrono;
188
                LOG_INFO( "Done executing ", serviceName, " task on ", task->item().mrl(), " in ",
189 190
                          std::chrono::duration_cast<std::chrono::milliseconds>( duration ).count(), "ms" );
            }
191
        }
192
        catch ( const std::exception& ex )
193
        {
194
            LOG_ERROR( "Caught an exception during ", task->item().mrl(), " [", serviceName, "] parsing: ", ex.what() );
195
            status = parser::Status::Fatal;
196
        }
197
        if ( handleServiceResult( *task, status ) == false )
198
            status = parser::Status::Fatal;
199
        m_parserCb->done( std::move( task ), status );
200
    }
201
    LOG_INFO("Exiting ParserService [", serviceName, "] thread");
202 203 204
    setIdle( true );
}

205
void ParserWorker::setIdle(bool isIdle)
206 207 208 209 210
{
    // Calling the idleChanged callback will trigger a call to isIdle, so set the value before
    // invoking it, otherwise we have an incoherent state.
    m_idle = isIdle;
    m_parserCb->onIdleChanged( isIdle );
211 212
}

213
bool ParserWorker::handleServiceResult( parser::Task& task, parser::Status status )
214
{
215
    if ( status == parser::Status::Success )
216 217 218 219 220
    {
        task.markStepCompleted( m_service->targetedStep() );
        // We don't want to save the extraction step in database, as restarting a
        // task with extraction completed but analysis uncompleted wouldn't run
        // the extraction again, causing the analysis to run with no info.
221
        if ( m_service->targetedStep() != parser::Step::MetadataExtraction )
222 223 224
            return task.saveParserStep();
        return true;
    }
225
    else if ( status == parser::Status::Completed )
226
    {
227
        task.markStepCompleted( parser::Step::Completed );
228 229
        return task.saveParserStep();
    }
230
    else if ( status == parser::Status::Discarded )
231 232 233 234 235 236
    {
        return parser::Task::destroy( m_ml, task.id() );
    }
    return true;
}

237
}