Streams.cpp 16.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Streams.cpp
 *****************************************************************************
 * Copyright (C) 2014 - VideoLAN authors
 *
 * This program is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation; either version 2.1 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
 *****************************************************************************/
#include "Streams.hpp"
21
#include "StreamsType.hpp"
22
#include "http/HTTPConnection.hpp"
23
24
#include "http/HTTPConnectionManager.h"
#include "logic/AbstractAdaptationLogic.h"
25
#include "playlist/SegmentChunk.hpp"
26
#include "SegmentTracker.hpp"
27
28
#include <vlc_stream.h>
#include <vlc_demux.h>
29

30
using namespace adaptative;
31
32
33
using namespace adaptative::http;
using namespace adaptative::logic;

34
Stream::Stream(demux_t * demux_,const StreamType type_, const StreamFormat &format_)
35
{
36
    p_demux = demux_;
37
    type = type_;
38
    format = format_;
39
    output = NULL;
40
    adaptationLogic = NULL;
41
42
    currentChunk = NULL;
    eof = false;
43
    segmentTracker = NULL;
44
    streamOutputFactory = NULL;
45
46
47
48
49
50
51
}

Stream::~Stream()
{
    delete currentChunk;
    delete adaptationLogic;
    delete output;
52
    delete segmentTracker;
53
54
}

55
StreamType Stream::mimeToType(const std::string &mime)
56
{
57
    StreamType mimetype;
58
    if (!mime.compare(0, 6, "video/"))
59
        mimetype = VIDEO;
60
    else if (!mime.compare(0, 6, "audio/"))
61
        mimetype = AUDIO;
62
    else if (!mime.compare(0, 12, "application/"))
63
        mimetype = APPLICATION;
64
    else /* unknown of unsupported */
65
        mimetype = UNKNOWN;
66
67
68
    return mimetype;
}

69
70
void Stream::create(AbstractAdaptationLogic *logic, SegmentTracker *tracker,
                    const AbstractStreamOutputFactory *factory)
71
{
72
73
    adaptationLogic = logic;
    segmentTracker = tracker;
74
75
    streamOutputFactory = factory;
    updateFormat(format);
76
77
}

78
void Stream::updateFormat(StreamFormat &newformat)
79
80
81
82
83
84
{
    if( format == newformat && output )
        return;

    delete output;
    format = newformat;
85
    output = streamOutputFactory->create(p_demux, format);
86
87
88
89
    if(!output)
        throw VLC_EGENERIC;
}

90
91
92
93
94
95
96
bool Stream::isEOF() const
{
    return false;
}

mtime_t Stream::getPCR() const
{
97
98
    if(!output)
        return 0;
99
100
101
    return output->getPCR();
}

102
103
mtime_t Stream::getFirstDTS() const
{
104
105
    if(!output)
        return 0;
106
107
108
    return output->getFirstDTS();
}

109
110
int Stream::getGroup() const
{
111
112
    if(!output)
        return 0;
113
114
115
116
117
    return output->getGroup();
}

int Stream::esCount() const
{
118
119
    if(!output)
        return 0;
120
121
122
    return output->esCount();
}

123
124
125
126
bool Stream::operator ==(const Stream &stream) const
{
    return stream.type == type;
}
127

128
SegmentChunk * Stream::getChunk()
129
{
130
    if (currentChunk == NULL && output)
131
    {
132
        currentChunk = segmentTracker->getNextChunk(type, output->switchAllowed());
133
134
135
136
137
138
        if (currentChunk == NULL)
            eof = true;
    }
    return currentChunk;
}

139
140
141
142
143
bool Stream::seekAble() const
{
    return (output && output->seekAble());
}

144
Stream::status Stream::demux(HTTPConnectionManager *connManager, mtime_t nz_deadline, bool send)
145
{
146
147
148
    if(!output)
        return Stream::status_eof;

149
150
151
152
153
154
155
156
157
158
    if(nz_deadline + VLC_TS_0 > output->getPCR()) /* not already demuxed */
    {
        /* need to read, demuxer still buffering, ... */
        if(read(connManager) <= 0)
            return Stream::status_eof;

        if(nz_deadline + VLC_TS_0 > output->getPCR()) /* need to read more */
            return Stream::status_buffering;
    }

159
160
    if(send)
        output->sendToDecoder(nz_deadline);
161
162
163
    return Stream::status_demuxed;
}

164
165
size_t Stream::read(HTTPConnectionManager *connManager)
{
166
    SegmentChunk *chunk = getChunk();
167
168
169
170
171
172
173
174
175
176
    if(!chunk)
        return 0;

    if(!chunk->getConnection())
    {
       if(!connManager->connectChunk(chunk))
        return 0;
    }

    size_t readsize = 0;
177
    bool b_segment_head_chunk = false;
178

179
    /* New chunk, do query */
180
181
    if(chunk->getBytesRead() == 0)
    {
182
183
184
185
186
187
188
        if(chunk->getConnection()->query(chunk->getPath()) != VLC_SUCCESS)
        {
            chunk->getConnection()->releaseChunk();
            currentChunk = NULL;
            delete chunk;
            return 0;
        }
189
        b_segment_head_chunk = true;
190
191
    }

192
193
194
195
    /* Because we don't know Chunk size at start, we need to get size
       from content length */
    readsize = chunk->getBytesToRead();
    if (readsize > 32768)
196
197
198
199
200
201
202
203
204
205
        readsize = 32768;

    block_t *block = block_Alloc(readsize);
    if(!block)
        return 0;

    mtime_t time = mdate();
    ssize_t ret = chunk->getConnection()->read(block->p_buffer, readsize);
    time = mdate() - time;

206
    if(ret < 0)
207
208
209
210
211
212
213
214
215
216
217
218
    {
        block_Release(block);
        chunk->getConnection()->releaseChunk();
        currentChunk = NULL;
        delete chunk;
        return 0;
    }
    else
    {
        block->i_buffer = (size_t)ret;

        adaptationLogic->updateDownloadRate(block->i_buffer, time);
219
        chunk->onDownload(&block);
220

221
222
223
224
225
226
227
        StreamFormat chunkStreamFormat = chunk->getStreamFormat();
        if(output && chunkStreamFormat != output->getStreamFormat())
        {
            msg_Info(p_demux, "Changing stream format");
            updateFormat(chunkStreamFormat);
        }

228
229
230
231
232
233
234
235
236
237
        if (chunk->getBytesToRead() == 0)
        {
            chunk->getConnection()->releaseChunk();
            currentChunk = NULL;
            delete chunk;
        }
    }

    readsize = block->i_buffer;

238
    if(output)
239
        output->pushBlock(block, b_segment_head_chunk);
240
241
    else
        block_Release(block);
242
243
244
245

    return readsize;
}

246
247
bool Stream::setPosition(mtime_t time, bool tryonly)
{
248
249
250
    if(!output)
        return false;

251
    bool ret = segmentTracker->setPosition(time, output->reinitsOnSeek(), tryonly);
252
    if(!tryonly && ret)
253
    {
254
        output->setPosition(time);
255
256
257
258
259
260
261
262
263
264
        if(output->reinitsOnSeek())
        {
            if(currentChunk)
            {
                currentChunk->getConnection()->releaseChunk();
                delete currentChunk;
            }
            currentChunk = NULL;
        }
    }
265
266
267
    return ret;
}

268
269
270
271
272
mtime_t Stream::getPosition() const
{
    return segmentTracker->getSegmentStart();
}

273
274
275
276
277
void Stream::prune()
{
    segmentTracker->pruneFromCurrent();
}

278
AbstractStreamOutput::AbstractStreamOutput(demux_t *demux, const StreamFormat &format_)
279
280
{
    realdemux = demux;
281
    pcr = VLC_TS_INVALID;
282
    group = 0;
283
284
285
286
287
288
    format = format_;
}

const StreamFormat & AbstractStreamOutput::getStreamFormat() const
{
    return format;
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
}

AbstractStreamOutput::~AbstractStreamOutput()
{
}

mtime_t AbstractStreamOutput::getPCR() const
{
    return pcr;
}

int AbstractStreamOutput::getGroup() const
{
    return group;
}

305
306
BaseStreamOutput::BaseStreamOutput(demux_t *demux, const StreamFormat &format, const std::string &name) :
    AbstractStreamOutput(demux, format)
307
308
{
    this->name = name;
309
    seekable = true;
310
    restarting = false;
311
    demuxstream = NULL;
312
    b_drop = false;
313
    timestamps_offset = VLC_TS_INVALID;
314
315
316
317
318

    fakeesout = new es_out_t;
    if (!fakeesout)
        throw VLC_ENOMEM;

319
320
    vlc_mutex_init(&lock);

321
322
323
324
325
    fakeesout->pf_add = esOutAdd_Callback;
    fakeesout->pf_control = esOutControl_Callback;
    fakeesout->pf_del = esOutDel_Callback;
    fakeesout->pf_destroy = esOutDestroy_Callback;
    fakeesout->pf_send = esOutSend_Callback;
326
    fakeesout->p_sys = (es_out_sys_t*) this;
327

328
329
    demuxstream = stream_DemuxNew(realdemux, name.c_str(), fakeesout);
    if(!demuxstream)
330
        throw VLC_EGENERIC;
331
332
}

333
BaseStreamOutput::~BaseStreamOutput()
334
335
336
{
    if (demuxstream)
        stream_Delete(demuxstream);
337
338
339
340
341

    /* shouldn't be any */
    std::list<Demuxed *>::const_iterator it;
    for(it=queues.begin(); it!=queues.end();++it)
        delete *it;
342
343
344

    delete fakeesout;
    vlc_mutex_destroy(&lock);
345
346
}

347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
mtime_t BaseStreamOutput::getFirstDTS() const
{
    mtime_t ret = VLC_TS_INVALID;
    vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
    std::list<Demuxed *>::const_iterator it;
    for(it=queues.begin(); it!=queues.end();++it)
    {
        const Demuxed *pair = *it;
        const block_t *p_block = pair->p_queue;
        while( p_block && p_block->i_dts == VLC_TS_INVALID )
        {
            p_block = p_block->p_next;
        }

        if(p_block)
        {
            ret = p_block->i_dts;
            break;
        }
    }
    vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
    return ret;
}

371
int BaseStreamOutput::esCount() const
372
{
373
    return queues.size();
374
375
}

376
void BaseStreamOutput::pushBlock(block_t *block, bool)
377
378
379
380
{
    stream_DemuxSend(demuxstream, block);
}

381
bool BaseStreamOutput::seekAble() const
382
{
383
384
    bool b_canswitch = switchAllowed();
    return (demuxstream && seekable && b_canswitch);
385
386
}

387
void BaseStreamOutput::setPosition(mtime_t nztime)
388
{
389
    vlc_mutex_lock(&lock);
390
391
392
393
394
395
396
    std::list<Demuxed *>::const_iterator it;
    for(it=queues.begin(); it!=queues.end();++it)
    {
        Demuxed *pair = *it;
        if(pair->p_queue && pair->p_queue->i_dts > VLC_TS_0 + nztime)
            pair->drop();
    }
397
398
399
400
401
402
403
404
405
    /* disable appending until restarted */
    b_drop = true;
    vlc_mutex_unlock(&lock);

    if(reinitsOnSeek())
        restart();

    vlc_mutex_lock(&lock);
    b_drop = false;
406
    pcr = VLC_TS_INVALID;
407
    vlc_mutex_unlock(&lock);
408

409
410
411
412
    es_out_Control(realdemux->out, ES_OUT_SET_NEXT_DISPLAY_TIME,
                   VLC_TS_0 + nztime);
}

413
414
415
416
417
418
419
bool BaseStreamOutput::restart()
{
    stream_t *newdemuxstream = stream_DemuxNew(realdemux, name.c_str(), fakeesout);
    if(!newdemuxstream)
        return false;

    vlc_mutex_lock(&lock);
420
    restarting = true;
421
422
423
424
425
426
427
428
429
430
431
432
    stream_t *olddemuxstream = demuxstream;
    demuxstream = newdemuxstream;
    vlc_mutex_unlock(&lock);

    if(olddemuxstream)
        stream_Delete(olddemuxstream);

    return true;
}

bool BaseStreamOutput::reinitsOnSeek() const
{
433
434
435
436
437
438
439
440
441
442
    return true;
}

bool BaseStreamOutput::switchAllowed() const
{
    bool b_allowed;
    vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
    b_allowed = !restarting;
    vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
    return b_allowed;
443
444
445
}

void BaseStreamOutput::sendToDecoder(mtime_t nzdeadline)
446
{
447
    vlc_mutex_lock(&lock);
448
449
450
451
    sendToDecoderUnlocked(nzdeadline);
    vlc_mutex_unlock(&lock);
}

452
void BaseStreamOutput::sendToDecoderUnlocked(mtime_t nzdeadline)
453
{
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
    std::list<Demuxed *>::const_iterator it;
    for(it=queues.begin(); it!=queues.end();++it)
    {
        Demuxed *pair = *it;
        while(pair->p_queue && pair->p_queue->i_dts <= VLC_TS_0 + nzdeadline)
        {
            block_t *p_block = pair->p_queue;
            pair->p_queue = pair->p_queue->p_next;
            p_block->p_next = NULL;

            if(pair->pp_queue_last == &p_block->p_next)
                pair->pp_queue_last = &pair->p_queue;

            realdemux->out->pf_send(realdemux->out, pair->es_id, p_block);
        }
    }
}

472
473
474
475
476
477
478
void BaseStreamOutput::setTimestampOffset(mtime_t offset)
{
    vlc_mutex_lock(&lock);
    timestamps_offset = VLC_TS_0 + offset;
    vlc_mutex_unlock(&lock);
}

479
BaseStreamOutput::Demuxed::Demuxed(es_out_id_t *id, const es_format_t *fmt)
480
481
482
{
    p_queue = NULL;
    pp_queue_last = &p_queue;
483
484
485
    es_id = id;
    es_format_Init(&fmtcpy, UNKNOWN_ES, 0);
    es_format_Copy(&fmtcpy, fmt);
486
487
}

488
BaseStreamOutput::Demuxed::~Demuxed()
489
{
490
    es_format_Clean(&fmtcpy);
491
492
493
    drop();
}

494
void BaseStreamOutput::Demuxed::drop()
495
496
497
498
499
500
{
    block_ChainRelease(p_queue);
    p_queue = NULL;
    pp_queue_last = &p_queue;
}

501
/* Static callbacks */
502
es_out_id_t * BaseStreamOutput::esOutAdd_Callback(es_out_t *fakees, const es_format_t *p_fmt)
503
{
504
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
    return me->esOutAdd(p_fmt);
}

int BaseStreamOutput::esOutSend_Callback(es_out_t *fakees, es_out_id_t *p_es, block_t *p_block)
{
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
    return me->esOutSend(p_es, p_block);
}

void BaseStreamOutput::esOutDel_Callback(es_out_t *fakees, es_out_id_t *p_es)
{
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
    me->esOutDel(p_es);
}

int BaseStreamOutput::esOutControl_Callback(es_out_t *fakees, int i_query, va_list ap)
{
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
    return me->esOutControl(i_query, ap);
}

void BaseStreamOutput::esOutDestroy_Callback(es_out_t *fakees)
{
    BaseStreamOutput *me = (BaseStreamOutput *) fakees->p_sys;
    me->esOutDestroy();
}
/* !Static callbacks */
532

533
534
es_out_id_t * BaseStreamOutput::esOutAdd(const es_format_t *p_fmt)
{
535
536
    es_out_id_t *p_es = NULL;

537
    vlc_mutex_lock(&lock);
538
539
540

    std::list<Demuxed *>::iterator it;
    bool b_hasestorecyle = false;
541
    for(it=queues.begin(); it!=queues.end();++it)
542
543
544
545
546
547
548
    {
        Demuxed *pair = *it;
        b_hasestorecyle |= pair->recycle;

        if( p_es )
            continue;

549
        if( restarting )
550
551
552
553
554
555
        {
            /* If we're recycling from same format */
            if( es_format_IsSimilar(p_fmt, &pair->fmtcpy) &&
                    p_fmt->i_extra == pair->fmtcpy.i_extra &&
                    !memcmp(p_fmt->p_extra, pair->fmtcpy.p_extra, p_fmt->i_extra) )
            {
556
                msg_Err(realdemux, "using recycled");
557
558
559
560
561
562
563
564
                pair->recycle = false;
                p_es = pair->es_id;
            }
        }
    }

    if(!b_hasestorecyle)
    {
565
        restarting = false;
566
567
568
    }

    if(!p_es)
569
    {
570
        p_es = realdemux->out->pf_add(realdemux->out, p_fmt);
571
        if(p_es)
572
        {
573
574
            Demuxed *pair = new (std::nothrow) Demuxed(p_es, p_fmt);
            if(pair)
575
                queues.push_back(pair);
576
577
        }
    }
578
    vlc_mutex_unlock(&lock);
579

580
    return p_es;
581
582
}

583
int BaseStreamOutput::esOutSend(es_out_id_t *p_es, block_t *p_block)
584
{
585
586
    vlc_mutex_lock(&lock);
    if(b_drop)
587
    {
588
589
590
591
        block_ChainRelease( p_block );
    }
    else
    {
592
        if( timestamps_offset > VLC_TS_INVALID )
593
594
        {
            if(p_block->i_dts > VLC_TS_INVALID)
595
                p_block->i_dts += (timestamps_offset - VLC_TS_0);
596
597

            if(p_block->i_pts > VLC_TS_INVALID)
598
                p_block->i_pts += (timestamps_offset - VLC_TS_0);
599
600
        }

601
        std::list<Demuxed *>::const_iterator it;
602
        for(it=queues.begin(); it!=queues.end();++it)
603
        {
604
605
606
607
608
609
            Demuxed *pair = *it;
            if(pair->es_id == p_es)
            {
                block_ChainLastAppend(&pair->pp_queue_last, p_block);
                break;
            }
610
611
        }
    }
612
    vlc_mutex_unlock(&lock);
613
    return VLC_SUCCESS;
614
615
}

616
void BaseStreamOutput::esOutDel(es_out_id_t *p_es)
617
{
618
    vlc_mutex_lock(&lock);
619
    std::list<Demuxed *>::iterator it;
620
    for(it=queues.begin(); it!=queues.end();++it)
621
622
623
    {
        if((*it)->es_id == p_es)
        {
624
            sendToDecoderUnlocked(INT64_MAX - VLC_TS_0);
625
626
627
628
            break;
        }
    }

629
    if(it != queues.end())
630
    {
631
        if(restarting)
632
633
634
635
636
        {
            (*it)->recycle = true;
        }
        else
        {
637
            delete *it;
638
            queues.erase(it);
639
640
        }
    }
641

642
643
    if(!restarting)
        realdemux->out->pf_del(realdemux->out, p_es);
644

645
    vlc_mutex_unlock(&lock);
646
647
}

648
int BaseStreamOutput::esOutControl(int i_query, va_list args)
649
650
651
{
    if (i_query == ES_OUT_SET_PCR )
    {
652
653
        vlc_mutex_lock(&lock);
        pcr = (int64_t)va_arg( args, int64_t );
654
655
        if(pcr > VLC_TS_INVALID && timestamps_offset > VLC_TS_INVALID)
            pcr += (timestamps_offset - VLC_TS_0);
656
        vlc_mutex_unlock(&lock);
657
658
659
660
        return VLC_SUCCESS;
    }
    else if( i_query == ES_OUT_SET_GROUP_PCR )
    {
661
        vlc_mutex_lock(&lock);
662
663
664
665
        group = (int) va_arg( args, int );
        pcr = (int64_t)va_arg( args, int64_t );
        if(pcr > VLC_TS_INVALID && timestamps_offset > VLC_TS_INVALID)
            pcr += (timestamps_offset - VLC_TS_0);
666
        vlc_mutex_unlock(&lock);
667
668
        return VLC_SUCCESS;
    }
669
670
671
672
673
674
675
    else if( i_query == ES_OUT_GET_ES_STATE )
    {
        va_arg( args, es_out_id_t * );
        bool *pb = va_arg( args, bool * );
        *pb = true;
        return VLC_SUCCESS;
    }
676

677
678
679
    vlc_mutex_lock(&lock);
    bool b_restarting = restarting;
    vlc_mutex_unlock(&lock);
680
681

    if( b_restarting )
682
683
684
    {
        return VLC_EGENERIC;
    }
685

686
    return realdemux->out->pf_control(realdemux->out, i_query, args);
687
688
}

689
void BaseStreamOutput::esOutDestroy()
690
{
691
    realdemux->out->pf_destroy(realdemux->out);
692
}
693