Streams.cpp 15.6 KB
Newer Older
1 2 3
/*
 * Streams.cpp
 *****************************************************************************
4
 * Copyright (C) 2014 - VideoLAN and VLC authors
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
 *
 * This program is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation; either version 2.1 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
 *****************************************************************************/
20 21 22 23 24

#ifdef HAVE_CONFIG_H
# include "config.h"
#endif

25
#include "Streams.hpp"
26
#include "logic/AbstractAdaptationLogic.h"
27
#include "http/HTTPConnection.hpp"
28
#include "http/HTTPConnectionManager.h"
29
#include "playlist/BaseRepresentation.h"
30
#include "playlist/SegmentChunk.hpp"
31 32 33
#include "plumbing/SourceStream.hpp"
#include "plumbing/CommandsQueue.hpp"
#include "tools/Debug.hpp"
34
#include <vlc_demux.h>
35

36 37
#include <algorithm>

38 39
using namespace adaptive;
using namespace adaptive::http;
40

41
AbstractStream::AbstractStream(demux_t * demux_)
42
{
43
    p_realdemux = demux_;
44
    format = StreamFormat::UNSUPPORTED;
45 46
    currentChunk = NULL;
    eof = false;
47
    dead = false;
48
    disabled = false;
49
    discontinuity = false;
50
    needrestart = false;
51
    inrestart = false;
52
    segmentTracker = NULL;
53
    demuxersource = NULL;
54
    commandsqueue = NULL;
55 56
    demuxer = NULL;
    fakeesout = NULL;
57
    last_buffer_status = buffering_lessthanmin;
58
    vlc_mutex_init(&lock);
59
}
60

61
bool AbstractStream::init(const StreamFormat &format_, SegmentTracker *tracker, AbstractConnectionManager *conn)
62 63 64 65
{
    /* Don't even try if not supported or already init */
    if((unsigned)format_ == StreamFormat::UNSUPPORTED || demuxersource)
        return false;
66 67

    demuxersource = new (std::nothrow) ChunksSourceStream( VLC_OBJECT(p_realdemux), this );
68
    if(demuxersource)
69
    {
70 71 72 73 74 75 76 77 78 79 80 81 82 83
        CommandsFactory *factory = new (std::nothrow) CommandsFactory();
        if(factory)
        {
            commandsqueue = new (std::nothrow) CommandsQueue(factory);
            if(commandsqueue)
            {
                fakeesout = new (std::nothrow) FakeESOut(p_realdemux->out, commandsqueue);
                if(fakeesout)
                {
                    /* All successfull */
                    fakeesout->setExtraInfoProvider( this );
                    format = format_;
                    segmentTracker = tracker;
                    segmentTracker->registerListener(this);
84
                    segmentTracker->notifyBufferingState(true);
85 86 87 88 89 90 91 92 93 94 95
                    connManager = conn;
                    return true;
                }
                delete commandsqueue;
                commandsqueue = NULL;
            }
            else
            {
                delete factory;
            }
        }
96 97 98
        delete demuxersource;
    }

99
    return false;
100 101
}

102
AbstractStream::~AbstractStream()
103 104
{
    delete currentChunk;
105 106
    if(segmentTracker)
        segmentTracker->notifyBufferingState(false);
107
    delete segmentTracker;
108 109 110 111

    delete demuxer;
    delete demuxersource;
    delete fakeesout;
112
    delete commandsqueue;
113 114

    vlc_mutex_destroy(&lock);
115 116
}

117
void AbstractStream::prepareRestart(bool b_discontinuity)
118
{
119 120 121 122
    if(demuxer)
    {
        /* Enqueue Del Commands for all current ES */
        demuxer->drain();
123
        setTimeOffset(true);
124 125
        /* Enqueue Del Commands for all current ES */
        fakeesout->scheduleAllForDeletion();
126 127
        if(b_discontinuity)
            fakeesout->schedulePCRReset();
128
        commandsqueue->Commit();
129
        /* ignoring demuxer's own Del commands */
130
        commandsqueue->setDrop(true);
131
        delete demuxer;
132
        commandsqueue->setDrop(false);
133 134
        demuxer = NULL;
    }
135 136
}

137
void AbstractStream::setLanguage(const std::string &lang)
138 139
{
    language = lang;
140 141
}

142
void AbstractStream::setDescription(const std::string &desc)
143 144 145 146
{
    description = desc;
}

147
mtime_t AbstractStream::getPCR() const
148
{
149
    vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
150
    mtime_t pcr = isDisabled() ? VLC_TS_INVALID : commandsqueue->getPCR();
151 152
    vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
    return pcr;
153 154
}

155 156 157 158 159 160 161
mtime_t AbstractStream::getMinAheadTime() const
{
    if(!segmentTracker)
        return 0;
    return segmentTracker->getMinAheadTime();
}

162
mtime_t AbstractStream::getFirstDTS() const
163
{
164 165
    mtime_t dts;
    vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
166
    if(isDisabled())
167 168 169 170 171 172 173 174 175 176 177
    {
        dts = VLC_TS_INVALID;
    }
    else
    {
        dts = commandsqueue->getFirstDTS();
        if(dts == VLC_TS_INVALID)
            dts = commandsqueue->getPCR();
    }
    vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
    return dts;
178 179
}

180
int AbstractStream::esCount() const
181
{
182
    return fakeesout->esCount();
183 184
}

185
bool AbstractStream::seekAble() const
186
{
187 188 189
    return (demuxer &&
            !fakeesout->restarting() &&
            !discontinuity &&
190
            !commandsqueue->isDraining() );
191 192
}

193
bool AbstractStream::isSelected() const
194
{
195
    return fakeesout->hasSelectedEs();
196 197
}

198
bool AbstractStream::reactivate(mtime_t basetime)
199 200 201
{
    if(setPosition(basetime, false))
    {
202
        setDisabled(false);
203 204 205 206 207 208 209 210 211
        return true;
    }
    else
    {
        eof = true; /* can't reactivate */
        return false;
    }
}

212 213 214 215 216
bool AbstractStream::startDemux()
{
    if(demuxer)
        return false;

217 218
    demuxersource->Reset();
    demuxer = createDemux(format);
219
    if(!demuxer && format != StreamFormat())
220 221
        msg_Err(p_realdemux, "Failed to create demuxer %p %s", (void *)demuxer,
                format.str().c_str());
222 223 224 225 226 227

    return !!demuxer;
}

bool AbstractStream::restartDemux()
{
228
    bool b_ret = true;
229 230
    if(!demuxer)
    {
231
        b_ret = startDemux();
232
    }
233
    else if(demuxer->needsRestartOnSeek())
234
    {
235
        inrestart = true;
236 237
        /* Push all ES as recycling candidates */
        fakeesout->recycleAll();
238 239 240 241
        /* Restart with ignoring es_Del pushes to queue when terminating demux */
        commandsqueue->setDrop(true);
        demuxer->destroy();
        commandsqueue->setDrop(false);
242 243
        b_ret = demuxer->create();
        inrestart = false;
244
    }
245 246 247 248 249
    else
    {
        commandsqueue->Commit();
    }
    return b_ret;
250 251
}

252 253
void AbstractStream::setDisabled(bool b)
{
254 255
    if(disabled != b)
        segmentTracker->notifyBufferingState(!b);
256 257 258
    disabled = b;
}

259
bool AbstractStream::isDisabled() const
260
{
261 262 263 264 265 266
    return dead || disabled;
}

bool AbstractStream::canActivate() const
{
    return !dead;
267 268
}

269
bool AbstractStream::decodersDrained()
270
{
271
    return fakeesout->decodersDrained();
272 273
}

274 275 276 277 278
AbstractStream::buffering_status AbstractStream::getLastBufferStatus() const
{
    return last_buffer_status;
}

279 280 281 282 283
mtime_t AbstractStream::getDemuxedAmount() const
{
    return commandsqueue->getDemuxedAmount();
}

284 285
AbstractStream::buffering_status AbstractStream::bufferize(mtime_t nz_deadline,
                                                           unsigned i_min_buffering, unsigned i_extra_buffering)
286 287 288 289 290 291 292
{
    last_buffer_status = doBufferize(nz_deadline, i_min_buffering, i_extra_buffering);
    return last_buffer_status;
}

AbstractStream::buffering_status AbstractStream::doBufferize(mtime_t nz_deadline,
                                                             unsigned i_min_buffering, unsigned i_extra_buffering)
293
{
294 295
    vlc_mutex_lock(&lock);

296
    /* Ensure it is configured */
297
    if(!segmentTracker || !connManager || dead)
298
    {
299 300 301
        vlc_mutex_unlock(&lock);
        return AbstractStream::buffering_end;
    }
302

303 304 305
    /* Disable streams that are not selected (alternate streams) */
    if(esCount() && !isSelected() && !fakeesout->restarting())
    {
306
        setDisabled(true);
307 308 309 310 311 312
        segmentTracker->reset();
        commandsqueue->Abort(false);
        msg_Dbg(p_realdemux, "deactivating stream %s", format.str().c_str());
        vlc_mutex_unlock(&lock);
        return AbstractStream::buffering_end;
    }
313

314
    if(commandsqueue->isDraining())
315 316 317
    {
        vlc_mutex_unlock(&lock);
        return AbstractStream::buffering_suspended;
318 319
    }

320
    if(!demuxer)
321
    {
322
        format = segmentTracker->getCurrentFormat();
323
        if(!startDemux())
324
        {
325 326 327
            /* If demux fails because of probing failure / wrong format*/
            if(discontinuity)
            {
328
                msg_Dbg( p_realdemux, "Draining on format change" );
329
                prepareRestart();
330
                discontinuity = false;
331
                commandsqueue->setDraining();
332 333
                vlc_mutex_unlock(&lock);
                return AbstractStream::buffering_ongoing;
334 335
            }
            dead = true; /* Prevent further retries */
336 337 338
            commandsqueue->setEOF();
            vlc_mutex_unlock(&lock);
            return AbstractStream::buffering_end;
339 340 341
        }
    }

342 343 344
    const int64_t i_total_buffering = i_min_buffering + i_extra_buffering;

    mtime_t i_demuxed = commandsqueue->getDemuxedAmount();
345
    segmentTracker->notifyBufferingLevel(i_min_buffering, i_demuxed, i_total_buffering);
346
    if(i_demuxed < i_total_buffering) /* not already demuxed */
347
    {
348
        if(!segmentTracker->segmentsListReady()) /* Live Streams */
349 350 351 352 353
        {
            vlc_mutex_unlock(&lock);
            return AbstractStream::buffering_suspended;
        }

354 355 356
        mtime_t nz_extdeadline = commandsqueue->getBufferingLevel() +
                                (i_total_buffering - commandsqueue->getDemuxedAmount()) / (CLOCK_FREQ/4);
        nz_deadline = std::max(nz_deadline, nz_extdeadline);
357

358
        /* need to read, demuxer still buffering, ... */
359 360 361 362
        vlc_mutex_unlock(&lock);
        int i_ret = demuxer->demux(nz_deadline);
        vlc_mutex_lock(&lock);
        if(i_ret != VLC_DEMUXER_SUCCESS)
363
        {
364
            if(discontinuity || needrestart)
365
            {
366 367 368 369
                msg_Dbg(p_realdemux, "Restarting demuxer");
                prepareRestart(discontinuity);
                if(discontinuity)
                {
370 371
                    msg_Dbg(p_realdemux, "Draining on discontinuity");
                    commandsqueue->setDraining();
372 373 374
                    discontinuity = false;
                }
                needrestart = false;
375 376
                vlc_mutex_unlock(&lock);
                return AbstractStream::buffering_ongoing;
377
            }
378 379 380
            commandsqueue->setEOF();
            vlc_mutex_unlock(&lock);
            return AbstractStream::buffering_end;
381
        }
382
        i_demuxed = commandsqueue->getDemuxedAmount();
383
        segmentTracker->notifyBufferingLevel(i_min_buffering, i_demuxed, i_total_buffering);
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
    }
    vlc_mutex_unlock(&lock);

    if(i_demuxed < i_total_buffering) /* need to read more */
    {
        if(i_demuxed < i_min_buffering)
            return AbstractStream::buffering_lessthanmin; /* high prio */
        return AbstractStream::buffering_ongoing;
    }
    return AbstractStream::buffering_full;
}

AbstractStream::status AbstractStream::dequeue(mtime_t nz_deadline, mtime_t *pi_pcr)
{
    vlc_mutex_locker locker(&lock);

    *pi_pcr = nz_deadline;

402
    if(commandsqueue->isDraining())
403 404 405 406 407 408
    {
        *pi_pcr = commandsqueue->Process(p_realdemux->out, VLC_TS_0 + nz_deadline);
        if(!commandsqueue->isEmpty())
            return AbstractStream::status_demuxed;

        if(!commandsqueue->isEOF())
409
        {
410 411
            commandsqueue->Abort(true); /* reset buffering level and flags */
            return AbstractStream::status_discontinuity;
412
        }
413 414
    }

415
    if(isDisabled() || commandsqueue->isEOF())
416 417 418 419
    {
        *pi_pcr = nz_deadline;
        return AbstractStream::status_eof;
    }
420

421 422 423
    AdvDebug(msg_Dbg(p_realdemux, "Stream %s pcr %ld dts %ld deadline %ld buflevel %ld",
                     description.c_str(), commandsqueue->getPCR(), commandsqueue->getFirstDTS(),
                     nz_deadline, commandsqueue->getBufferingLevel()));
424

425
    if(nz_deadline + VLC_TS_0 <= commandsqueue->getBufferingLevel()) /* demuxed */
426
    {
427 428
        *pi_pcr = commandsqueue->Process( p_realdemux->out, VLC_TS_0 + nz_deadline );
        return AbstractStream::status_demuxed;
429 430
    }

431
    return AbstractStream::status_buffering;
432 433
}

434
block_t * AbstractStream::readNextBlock()
435
{
436 437
    if (currentChunk == NULL && !eof)
        currentChunk = segmentTracker->getNextChunk(!fakeesout->restarting(), connManager);
438

439
    if(discontinuity || needrestart)
440
    {
441 442
        msg_Info(p_realdemux, "Encountered discontinuity");
        /* Force stream/demuxer to end for this call */
443 444 445
        return NULL;
    }

446
    if(currentChunk == NULL)
447
    {
448
        eof = true;
449 450
        return NULL;
    }
451

452
    const bool b_segment_head_chunk = (currentChunk->getBytesRead() == 0);
453

454
    block_t *block = currentChunk->readBlock();
455
    if(block == NULL)
456
    {
457
        delete currentChunk;
458
        currentChunk = NULL;
459
        return NULL;
460 461
    }

462
    if (currentChunk->isEmpty())
463
    {
464
        delete currentChunk;
465
        currentChunk = NULL;
466 467
    }

468
    block = checkBlock(block, b_segment_head_chunk);
469

470
    return block;
471 472
}

473
bool AbstractStream::setPosition(mtime_t time, bool tryonly)
474
{
475
    if(!seekAble())
476 477
        return false;

478
    bool ret = segmentTracker->setPositionByTime(time, demuxer->needsRestartOnSeek(), tryonly);
479
    if(!tryonly && ret)
480
    {
481
        if(demuxer->needsRestartOnSeek())
482 483 484 485
        {
            if(currentChunk)
                delete currentChunk;
            currentChunk = NULL;
486
            needrestart = false;
487

488 489 490
            setTimeOffset(-1);
            setTimeOffset(segmentTracker->getPlaybackTime());

491 492
            if( !restartDemux() )
                dead = true;
493
        }
494
        else commandsqueue->Abort( true );
495 496 497

        es_out_Control(p_realdemux->out, ES_OUT_SET_NEXT_DISPLAY_TIME,
                       VLC_TS_0 + time);
498
    }
499 500 501
    return ret;
}

502
mtime_t AbstractStream::getPlaybackTime() const
503
{
504
    return segmentTracker->getPlaybackTime();
505
}
506

507
void AbstractStream::runUpdates()
508 509 510 511
{
    if(!isDisabled())
        segmentTracker->updateSelected();
}
512 513 514 515 516 517 518 519 520

void AbstractStream::fillExtraFMTInfo( es_format_t *p_fmt ) const
{
    if(!p_fmt->psz_language && !language.empty())
        p_fmt->psz_language = strdup(language.c_str());
    if(!p_fmt->psz_description && !description.empty())
        p_fmt->psz_description = strdup(description.c_str());
}

521
void AbstractStream::setTimeOffset(mtime_t i_offset)
522 523 524
{
    /* Check if we need to set an offset as the demuxer
     * will start from zero from seek point */
525 526
    if(i_offset < 0) /* reset */
    {
527
        fakeesout->setExpectedTimestampOffset(0);
528 529 530
    }
    else if(demuxer)
    {
531
        fakeesout->setExpectedTimestampOffset(i_offset);
532
    }
533 534
}

535 536 537 538 539 540 541 542 543 544 545 546 547
void AbstractStream::trackerEvent(const SegmentTrackerEvent &event)
{
    switch(event.type)
    {
        case SegmentTrackerEvent::DISCONTINUITY:
            discontinuity = true;
            break;

        case SegmentTrackerEvent::FORMATCHANGE:
            /* Check if our current demux is still valid */
            if(*event.u.format.f != format)
            {
                /* Format has changed between segments, we need to drain and change demux */
548 549
                msg_Info(p_realdemux, "Changing stream format %s -> %s",
                         format.str().c_str(), event.u.format.f->str().c_str());
550 551 552 553 554 555 556 557
                format = *event.u.format.f;

                /* This is an implict discontinuity */
                discontinuity = true;
            }
            break;

        case SegmentTrackerEvent::SWITCHING:
558
            if(demuxer && demuxer->needsRestartOnSwitch() && !inrestart)
559 560 561
            {
                needrestart = true;
            }
562 563 564 565
        default:
            break;
    }
}