Streams.cpp 14.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Streams.cpp
 *****************************************************************************
 * Copyright (C) 2014 - VideoLAN authors
 *
 * This program is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation; either version 2.1 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
 *****************************************************************************/
#include "Streams.hpp"
21
#include "StreamsType.hpp"
22
#include "http/HTTPConnection.hpp"
23
24
25
#include "http/HTTPConnectionManager.h"
#include "http/Chunk.h"
#include "logic/AbstractAdaptationLogic.h"
26
#include "SegmentTracker.hpp"
27
28
#include <vlc_stream.h>
#include <vlc_demux.h>
29

30
using namespace adaptative;
31
32
33
using namespace adaptative::http;
using namespace adaptative::logic;

34
Stream::Stream(const StreamType type, const StreamFormat &format)
35
{
36
    init(type, format);
37
38
}

39
void Stream::init(const StreamType type_, const StreamFormat &format_)
40
41
{
    type = type_;
42
    format = format_;
43
    output = NULL;
44
    adaptationLogic = NULL;
45
46
    currentChunk = NULL;
    eof = false;
47
    segmentTracker = NULL;
48
49
50
51
52
53
54
}

Stream::~Stream()
{
    delete currentChunk;
    delete adaptationLogic;
    delete output;
55
    delete segmentTracker;
56
57
}

58
StreamType Stream::mimeToType(const std::string &mime)
59
{
60
    StreamType mimetype;
61
    if (!mime.compare(0, 6, "video/"))
62
        mimetype = VIDEO;
63
    else if (!mime.compare(0, 6, "audio/"))
64
        mimetype = AUDIO;
65
    else if (!mime.compare(0, 12, "application/"))
66
        mimetype = APPLICATION;
67
    else /* unknown of unsupported */
68
        mimetype = UNKNOWN;
69
70
71
    return mimetype;
}

72
void Stream::create(demux_t *demux, AbstractAdaptationLogic *logic,
73
                    SegmentTracker *tracker, const AbstractStreamOutputFactory *factory)
74
{
75
    updateFormat(demux, format, factory);
76
77
    adaptationLogic = logic;
    segmentTracker = tracker;
78
79
}

80
81
82
83
84
85
86
87
88
89
90
91
void Stream::updateFormat(demux_t *demux, StreamFormat &newformat, const AbstractStreamOutputFactory *factory)
{
    if( format == newformat && output )
        return;

    delete output;
    format = newformat;
    output = factory->create(demux, format);
    if(!output)
        throw VLC_EGENERIC;
}

92
93
94
95
96
97
98
bool Stream::isEOF() const
{
    return false;
}

mtime_t Stream::getPCR() const
{
99
100
    if(!output)
        return 0;
101
102
103
    return output->getPCR();
}

104
105
mtime_t Stream::getFirstDTS() const
{
106
107
    if(!output)
        return 0;
108
109
110
    return output->getFirstDTS();
}

111
112
int Stream::getGroup() const
{
113
114
    if(!output)
        return 0;
115
116
117
118
119
    return output->getGroup();
}

int Stream::esCount() const
{
120
121
    if(!output)
        return 0;
122
123
124
    return output->esCount();
}

125
126
127
128
bool Stream::operator ==(const Stream &stream) const
{
    return stream.type == type;
}
129
130
131

Chunk * Stream::getChunk()
{
132
    if (currentChunk == NULL && output)
133
    {
134
        currentChunk = segmentTracker->getNextChunk(type, output->switchAllowed());
135
136
137
138
139
140
        if (currentChunk == NULL)
            eof = true;
    }
    return currentChunk;
}

141
142
143
144
145
bool Stream::seekAble() const
{
    return (output && output->seekAble());
}

146
Stream::status Stream::demux(HTTPConnectionManager *connManager, mtime_t nz_deadline, bool send)
147
{
148
149
150
    if(!output)
        return Stream::status_eof;

151
152
153
154
155
156
157
158
159
160
    if(nz_deadline + VLC_TS_0 > output->getPCR()) /* not already demuxed */
    {
        /* need to read, demuxer still buffering, ... */
        if(read(connManager) <= 0)
            return Stream::status_eof;

        if(nz_deadline + VLC_TS_0 > output->getPCR()) /* need to read more */
            return Stream::status_buffering;
    }

161
162
    if(send)
        output->sendToDecoder(nz_deadline);
163
164
165
    return Stream::status_demuxed;
}

166
167
168
169
170
171
172
173
174
175
176
177
178
179
size_t Stream::read(HTTPConnectionManager *connManager)
{
    Chunk *chunk = getChunk();
    if(!chunk)
        return 0;

    if(!chunk->getConnection())
    {
       if(!connManager->connectChunk(chunk))
        return 0;
    }

    size_t readsize = 0;

180
    /* New chunk, do query */
181
182
    if(chunk->getBytesRead() == 0)
    {
183
184
185
186
187
188
189
        if(chunk->getConnection()->query(chunk->getPath()) != VLC_SUCCESS)
        {
            chunk->getConnection()->releaseChunk();
            currentChunk = NULL;
            delete chunk;
            return 0;
        }
190
191
    }

192
193
194
195
    /* Because we don't know Chunk size at start, we need to get size
       from content length */
    readsize = chunk->getBytesToRead();
    if (readsize > 32768)
196
197
198
199
200
201
202
203
204
205
        readsize = 32768;

    block_t *block = block_Alloc(readsize);
    if(!block)
        return 0;

    mtime_t time = mdate();
    ssize_t ret = chunk->getConnection()->read(block->p_buffer, readsize);
    time = mdate() - time;

206
    if(ret < 0)
207
208
209
210
211
212
213
214
215
216
217
218
    {
        block_Release(block);
        chunk->getConnection()->releaseChunk();
        currentChunk = NULL;
        delete chunk;
        return 0;
    }
    else
    {
        block->i_buffer = (size_t)ret;

        adaptationLogic->updateDownloadRate(block->i_buffer, time);
219
        chunk->onDownload(&block);
220
221
222
223
224
225
226
227
228
229
230

        if (chunk->getBytesToRead() == 0)
        {
            chunk->getConnection()->releaseChunk();
            currentChunk = NULL;
            delete chunk;
        }
    }

    readsize = block->i_buffer;

231
232
233
234
    if(output)
        output->pushBlock(block);
    else
        block_Release(block);
235
236
237
238

    return readsize;
}

239
240
bool Stream::setPosition(mtime_t time, bool tryonly)
{
241
242
243
    if(!output)
        return false;

244
    bool ret = segmentTracker->setPosition(time, output->reinitsOnSeek(), tryonly);
245
    if(!tryonly && ret)
246
    {
247
        output->setPosition(time);
248
249
250
251
252
253
254
255
256
257
        if(output->reinitsOnSeek())
        {
            if(currentChunk)
            {
                currentChunk->getConnection()->releaseChunk();
                delete currentChunk;
            }
            currentChunk = NULL;
        }
    }
258
259
260
    return ret;
}

261
262
263
264
265
mtime_t Stream::getPosition() const
{
    return segmentTracker->getSegmentStart();
}

266
267
268
269
270
void Stream::prune()
{
    segmentTracker->pruneFromCurrent();
}

271
272
273
274
AbstractStreamOutput::AbstractStreamOutput(demux_t *demux)
{
    realdemux = demux;
    pcr = VLC_TS_0;
275
    group = 0;
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
}

AbstractStreamOutput::~AbstractStreamOutput()
{
}

mtime_t AbstractStreamOutput::getPCR() const
{
    return pcr;
}

int AbstractStreamOutput::getGroup() const
{
    return group;
}

BaseStreamOutput::BaseStreamOutput(demux_t *demux, const std::string &name) :
    AbstractStreamOutput(demux)
{
    this->name = name;
296
    seekable = true;
297
    restarting = false;
298
    demuxstream = NULL;
299
    b_drop = false;
300
301
302
303
304

    fakeesout = new es_out_t;
    if (!fakeesout)
        throw VLC_ENOMEM;

305
306
    vlc_mutex_init(&lock);

307
308
309
310
311
312
    fakeesout->pf_add = esOutAdd;
    fakeesout->pf_control = esOutControl;
    fakeesout->pf_del = esOutDel;
    fakeesout->pf_destroy = esOutDestroy;
    fakeesout->pf_send = esOutSend;
    fakeesout->p_sys = (es_out_sys_t*) this;
313

314
315
    demuxstream = stream_DemuxNew(realdemux, name.c_str(), fakeesout);
    if(!demuxstream)
316
        throw VLC_EGENERIC;
317
318
}

319
BaseStreamOutput::~BaseStreamOutput()
320
321
322
{
    if (demuxstream)
        stream_Delete(demuxstream);
323
324
325
326
327

    /* shouldn't be any */
    std::list<Demuxed *>::const_iterator it;
    for(it=queues.begin(); it!=queues.end();++it)
        delete *it;
328
329
330

    delete fakeesout;
    vlc_mutex_destroy(&lock);
331
332
}

333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
mtime_t BaseStreamOutput::getFirstDTS() const
{
    mtime_t ret = VLC_TS_INVALID;
    vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
    std::list<Demuxed *>::const_iterator it;
    for(it=queues.begin(); it!=queues.end();++it)
    {
        const Demuxed *pair = *it;
        const block_t *p_block = pair->p_queue;
        while( p_block && p_block->i_dts == VLC_TS_INVALID )
        {
            p_block = p_block->p_next;
        }

        if(p_block)
        {
            ret = p_block->i_dts;
            break;
        }
    }
    vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
    return ret;
}

357
int BaseStreamOutput::esCount() const
358
{
359
    return queues.size();
360
361
}

362
void BaseStreamOutput::pushBlock(block_t *block)
363
364
365
366
{
    stream_DemuxSend(demuxstream, block);
}

367
bool BaseStreamOutput::seekAble() const
368
{
369
370
    bool b_canswitch = switchAllowed();
    return (demuxstream && seekable && b_canswitch);
371
372
}

373
void BaseStreamOutput::setPosition(mtime_t nztime)
374
{
375
    vlc_mutex_lock(&lock);
376
377
378
379
380
381
382
    std::list<Demuxed *>::const_iterator it;
    for(it=queues.begin(); it!=queues.end();++it)
    {
        Demuxed *pair = *it;
        if(pair->p_queue && pair->p_queue->i_dts > VLC_TS_0 + nztime)
            pair->drop();
    }
383
384
385
386
387
388
389
390
391
    /* disable appending until restarted */
    b_drop = true;
    vlc_mutex_unlock(&lock);

    if(reinitsOnSeek())
        restart();

    vlc_mutex_lock(&lock);
    b_drop = false;
392
    pcr = VLC_TS_INVALID;
393
    vlc_mutex_unlock(&lock);
394

395
396
397
398
    es_out_Control(realdemux->out, ES_OUT_SET_NEXT_DISPLAY_TIME,
                   VLC_TS_0 + nztime);
}

399
400
401
402
403
404
405
bool BaseStreamOutput::restart()
{
    stream_t *newdemuxstream = stream_DemuxNew(realdemux, name.c_str(), fakeesout);
    if(!newdemuxstream)
        return false;

    vlc_mutex_lock(&lock);
406
    restarting = true;
407
408
409
410
411
412
413
414
415
416
417
418
    stream_t *olddemuxstream = demuxstream;
    demuxstream = newdemuxstream;
    vlc_mutex_unlock(&lock);

    if(olddemuxstream)
        stream_Delete(olddemuxstream);

    return true;
}

bool BaseStreamOutput::reinitsOnSeek() const
{
419
420
421
422
423
424
425
426
427
428
    return true;
}

bool BaseStreamOutput::switchAllowed() const
{
    bool b_allowed;
    vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
    b_allowed = !restarting;
    vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
    return b_allowed;
429
430
431
}

void BaseStreamOutput::sendToDecoder(mtime_t nzdeadline)
432
{
433
    vlc_mutex_lock(&lock);
434
435
436
437
    sendToDecoderUnlocked(nzdeadline);
    vlc_mutex_unlock(&lock);
}

438
void BaseStreamOutput::sendToDecoderUnlocked(mtime_t nzdeadline)
439
{
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
    std::list<Demuxed *>::const_iterator it;
    for(it=queues.begin(); it!=queues.end();++it)
    {
        Demuxed *pair = *it;
        while(pair->p_queue && pair->p_queue->i_dts <= VLC_TS_0 + nzdeadline)
        {
            block_t *p_block = pair->p_queue;
            pair->p_queue = pair->p_queue->p_next;
            p_block->p_next = NULL;

            if(pair->pp_queue_last == &p_block->p_next)
                pair->pp_queue_last = &pair->p_queue;

            realdemux->out->pf_send(realdemux->out, pair->es_id, p_block);
        }
    }
}

458
BaseStreamOutput::Demuxed::Demuxed(es_out_id_t *id, const es_format_t *fmt)
459
460
461
{
    p_queue = NULL;
    pp_queue_last = &p_queue;
462
463
464
    es_id = id;
    es_format_Init(&fmtcpy, UNKNOWN_ES, 0);
    es_format_Copy(&fmtcpy, fmt);
465
466
}

467
BaseStreamOutput::Demuxed::~Demuxed()
468
{
469
    es_format_Clean(&fmtcpy);
470
471
472
    drop();
}

473
void BaseStreamOutput::Demuxed::drop()
474
475
476
477
478
479
{
    block_ChainRelease(p_queue);
    p_queue = NULL;
    pp_queue_last = &p_queue;
}

480
/* Static callbacks */
481
es_out_id_t * BaseStreamOutput::esOutAdd(es_out_t *fakees, const es_format_t *p_fmt)
482
{
483
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518

    es_out_id_t *p_es = NULL;

    vlc_mutex_lock(&me->lock);

    std::list<Demuxed *>::iterator it;
    bool b_hasestorecyle = false;
    for(it=me->queues.begin(); it!=me->queues.end();++it)
    {
        Demuxed *pair = *it;
        b_hasestorecyle |= pair->recycle;

        if( p_es )
            continue;

        if( me->restarting )
        {
            /* If we're recycling from same format */
            if( es_format_IsSimilar(p_fmt, &pair->fmtcpy) &&
                    p_fmt->i_extra == pair->fmtcpy.i_extra &&
                    !memcmp(p_fmt->p_extra, pair->fmtcpy.p_extra, p_fmt->i_extra) )
            {
                msg_Err(me->realdemux, "using recycled");
                pair->recycle = false;
                p_es = pair->es_id;
            }
        }
    }

    if(!b_hasestorecyle)
    {
        me->restarting = false;
    }

    if(!p_es)
519
    {
520
521
        p_es = me->realdemux->out->pf_add(me->realdemux->out, p_fmt);
        if(p_es)
522
        {
523
524
525
            Demuxed *pair = new (std::nothrow) Demuxed(p_es, p_fmt);
            if(pair)
                me->queues.push_back(pair);
526
527
        }
    }
528
529
    vlc_mutex_unlock(&me->lock);

530
    return p_es;
531
532
}

533
int BaseStreamOutput::esOutSend(es_out_t *fakees, es_out_id_t *p_es, block_t *p_block)
534
{
535
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
536
    vlc_mutex_lock(&me->lock);
537
    if(me->b_drop)
538
    {
539
540
541
542
543
544
        block_ChainRelease( p_block );
    }
    else
    {
        std::list<Demuxed *>::const_iterator it;
        for(it=me->queues.begin(); it!=me->queues.end();++it)
545
        {
546
547
548
549
550
551
            Demuxed *pair = *it;
            if(pair->es_id == p_es)
            {
                block_ChainLastAppend(&pair->pp_queue_last, p_block);
                break;
            }
552
553
        }
    }
554
    vlc_mutex_unlock(&me->lock);
555
    return VLC_SUCCESS;
556
557
}

558
void BaseStreamOutput::esOutDel(es_out_t *fakees, es_out_id_t *p_es)
559
{
560
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
561
    vlc_mutex_lock(&me->lock);
562
563
564
565
566
    std::list<Demuxed *>::iterator it;
    for(it=me->queues.begin(); it!=me->queues.end();++it)
    {
        if((*it)->es_id == p_es)
        {
567
            me->sendToDecoderUnlocked(INT64_MAX - VLC_TS_0);
568
569
570
571
572
573
574
575
576
577
578
579
            break;
        }
    }

    if(it != me->queues.end())
    {
        if(me->restarting)
        {
            (*it)->recycle = true;
        }
        else
        {
580
581
582
583
            delete *it;
            me->queues.erase(it);
        }
    }
584
585
586
587

    if(!me->restarting)
        me->realdemux->out->pf_del(me->realdemux->out, p_es);

588
    vlc_mutex_unlock(&me->lock);
589
590
}

591
int BaseStreamOutput::esOutControl(es_out_t *fakees, int i_query, va_list args)
592
{
593
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
594
595
    if (i_query == ES_OUT_SET_PCR )
    {
596
        vlc_mutex_lock(&me->lock);
597
        me->pcr = (int64_t)va_arg( args, int64_t );
598
        vlc_mutex_unlock(&me->lock);
599
600
601
602
        return VLC_SUCCESS;
    }
    else if( i_query == ES_OUT_SET_GROUP_PCR )
    {
603
        vlc_mutex_lock(&me->lock);
604
605
        me->group = (int) va_arg( args, int );
        me->pcr = (int64_t)va_arg( args, int64_t );
606
        vlc_mutex_unlock(&me->lock);
607
608
        return VLC_SUCCESS;
    }
609
610
611
612
613
614
615
    else if( i_query == ES_OUT_GET_ES_STATE )
    {
        va_arg( args, es_out_id_t * );
        bool *pb = va_arg( args, bool * );
        *pb = true;
        return VLC_SUCCESS;
    }
616
617
618
619
620
621

    vlc_mutex_lock(&me->lock);
    bool b_restarting = me->restarting;
    vlc_mutex_unlock(&me->lock);

    if( b_restarting )
622
623
624
    {
        return VLC_EGENERIC;
    }
625
626
627
628

    return me->realdemux->out->pf_control(me->realdemux->out, i_query, args);
}

629
void BaseStreamOutput::esOutDestroy(es_out_t *fakees)
630
{
631
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
632
633
634
    me->realdemux->out->pf_destroy(me->realdemux->out);
}
/* !Static callbacks */