Commit d1815a42 authored by François Cartegnie's avatar François Cartegnie 🤞

demux: adaptative: add buffered/threaded chunk source

parent 7d7975d0
......@@ -298,6 +298,8 @@ libadaptative_plugin_la_SOURCES = \
demux/adaptative/http/BytesRange.hpp \
demux/adaptative/http/Chunk.cpp \
demux/adaptative/http/Chunk.h \
demux/adaptative/http/Downloader.cpp \
demux/adaptative/http/Downloader.hpp \
demux/adaptative/http/HTTPConnection.cpp \
demux/adaptative/http/HTTPConnection.hpp \
demux/adaptative/http/HTTPConnectionManager.cpp \
......
......@@ -28,6 +28,7 @@
#include "Chunk.h"
#include "HTTPConnection.hpp"
#include "HTTPConnectionManager.h"
#include "Downloader.hpp"
#include <vlc_common.h>
#include <vlc_url.h>
......@@ -172,8 +173,14 @@ bool HTTPChunkSource::init(const std::string &url)
return true;
}
block_t * HTTPChunkSource::consume(size_t readsize)
block_t * HTTPChunkSource::read(size_t readsize)
{
if(!prepare())
return NULL;
if(consumed == contentLength && consumed > 0)
return NULL;
if(contentLength && readsize > contentLength - consumed)
readsize = contentLength - consumed;
......@@ -229,89 +236,190 @@ block_t * HTTPChunkSource::readBlock()
return read(HTTPChunkSource::CHUNK_SIZE);
}
block_t * HTTPChunkSource::read(size_t readsize)
{
if(!prepare())
return NULL;
if(consumed == contentLength && consumed > 0)
return NULL;
return consume(readsize);
}
HTTPChunkBufferedSource::HTTPChunkBufferedSource(const std::string& url, HTTPConnectionManager *manager) :
HTTPChunkSource(url, manager),
p_buffer (NULL),
p_head (NULL),
pp_tail (&p_head),
buffered (0)
{
vlc_mutex_init(&lock);
vlc_cond_init(&avail);
done = false;
downloadstart = 0;
}
HTTPChunkBufferedSource::~HTTPChunkBufferedSource()
{
if(p_buffer)
block_ChainRelease(p_buffer);
vlc_mutex_lock(&lock);
if(p_head)
{
block_ChainRelease(p_head);
p_head = NULL;
pp_tail = &p_head;
}
done = true;
buffered = 0;
vlc_mutex_unlock(&lock);
connManager->downloader->cancel(this);
vlc_cond_destroy(&avail);
vlc_mutex_destroy(&lock);
}
bool HTTPChunkBufferedSource::isDone() const
{
bool b_done;
vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
b_done = done;
vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
return b_done;
}
void HTTPChunkBufferedSource::bufferize(size_t readsize)
{
vlc_mutex_lock(&lock);
if(!prepare())
{
done = true;
vlc_cond_signal(&avail);
vlc_mutex_unlock(&lock);
return;
}
if(readsize < HTTPChunkSource::CHUNK_SIZE)
readsize = HTTPChunkSource::CHUNK_SIZE;
if(contentLength && readsize > contentLength - consumed)
readsize = contentLength - consumed;
if(contentLength && readsize > contentLength - buffered)
readsize = contentLength - buffered;
vlc_mutex_unlock(&lock);
block_t *p_block = block_Alloc(readsize);
if(!p_block)
return;
mtime_t time = mdate();
struct
{
size_t size;
mtime_t time;
} rate = {0,0};
ssize_t ret = connection->read(p_block->p_buffer, readsize);
time = mdate() - time;
if(ret < 0)
if(ret <= 0)
{
block_Release(p_block);
vlc_mutex_lock(&lock);
done = true;
rate.size = buffered + consumed;
rate.time = mdate() - downloadstart;
downloadstart = 0;
vlc_mutex_unlock(&lock);
}
else
{
p_block->i_buffer = (size_t) ret;
vlc_mutex_lock(&lock);
buffered += p_block->i_buffer;
block_ChainAppend(&p_buffer, p_block);
connManager->updateDownloadRate(p_block->i_buffer, time);
block_ChainLastAppend(&pp_tail, p_block);
if((size_t) ret < readsize)
{
done = true;
rate.size = buffered + consumed;
rate.time = mdate() - downloadstart;
downloadstart = 0;
}
vlc_mutex_unlock(&lock);
}
if(rate.size)
{
connManager->updateDownloadRate(rate.size, rate.time);
}
vlc_cond_signal(&avail);
}
bool HTTPChunkBufferedSource::prepare()
{
if(!prepared)
{
downloadstart = mdate();
return HTTPChunkSource::prepare();
}
return true;
}
block_t * HTTPChunkBufferedSource::readBlock()
{
block_t *p_block = NULL;
vlc_mutex_lock(&lock);
while(!p_head && !done)
vlc_cond_wait(&avail, &lock);
if(!p_head && done)
{
vlc_mutex_unlock(&lock);
return NULL;
}
/* dequeue */
p_block = p_head;
p_head = p_head->p_next;
if(p_head == NULL)
pp_tail = &p_head;
p_block->p_next = NULL;
consumed += p_block->i_buffer;
buffered -= p_block->i_buffer;
vlc_mutex_unlock(&lock);
return p_block;
}
block_t * HTTPChunkBufferedSource::consume(size_t readsize)
block_t * HTTPChunkBufferedSource::read(size_t readsize)
{
if(readsize > buffered)
bufferize(readsize);
vlc_mutex_lock(&lock);
while(readsize > buffered && !done)
vlc_cond_wait(&avail, &lock);
block_t *p_block = NULL;
if(!readsize || !buffered || !(p_block = block_Alloc(readsize)) )
{
vlc_mutex_unlock(&lock);
return NULL;
}
size_t copied = 0;
while(buffered && readsize)
{
const size_t toconsume = std::min(p_buffer->i_buffer, readsize);
memcpy(&p_block->p_buffer[copied], p_buffer->p_buffer, toconsume);
const size_t toconsume = std::min(p_head->i_buffer, readsize);
memcpy(&p_block->p_buffer[copied], p_head->p_buffer, toconsume);
copied += toconsume;
readsize -= toconsume;
buffered -= toconsume;
p_buffer->i_buffer -= toconsume;
p_buffer->p_buffer += toconsume;
if(p_buffer->i_buffer == 0)
p_head->i_buffer -= toconsume;
p_head->p_buffer += toconsume;
if(p_head->i_buffer == 0)
{
block_t *next = p_buffer->p_next;
p_buffer->p_next = NULL;
block_Release(p_buffer);
p_buffer = next;
block_t *next = p_head->p_next;
p_head->p_next = NULL;
block_Release(p_head);
p_head = next;
if(next == NULL)
pp_tail = &p_head;
}
}
consumed += copied;
p_block->i_buffer = copied;
vlc_mutex_unlock(&lock);
return p_block;
}
......
......@@ -92,7 +92,6 @@ namespace adaptative
protected:
virtual bool prepare();
virtual block_t * consume(size_t);
HTTPConnection *connection;
HTTPConnectionManager *connManager;
size_t consumed; /* read pointer */
......@@ -109,17 +108,27 @@ namespace adaptative
class HTTPChunkBufferedSource : public HTTPChunkSource
{
friend class Downloader;
public:
HTTPChunkBufferedSource(const std::string &url, HTTPConnectionManager *);
virtual ~HTTPChunkBufferedSource();
virtual block_t * readBlock (); /* reimpl */
virtual block_t * read (size_t); /* reimpl */
protected:
virtual block_t * consume(size_t);
virtual bool prepare(); /* reimpl */
void bufferize(size_t);
bool isDone() const;
private:
void bufferize(size_t);
block_t *p_buffer; /* read cache buffer */
block_t *p_head; /* read cache buffer */
block_t **pp_tail;
size_t buffered; /* read cache size */
bool done;
mtime_t downloadstart;
vlc_mutex_t lock;
vlc_cond_t avail;
};
class HTTPChunk : public AbstractChunk
......
/*
* Downloader.hpp
*****************************************************************************
* Copyright (C) 2015 - VideoLAN Authors
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#include "Downloader.hpp"
#include <vlc_threads.h>
#include <vlc_atomic.h>
using namespace adaptative::http;
Downloader::Downloader()
{
vlc_mutex_init(&lock);
vlc_cond_init(&waitcond);
killed = false;
}
bool Downloader::start()
{
if(vlc_clone(&thread_handle, downloaderThread,
reinterpret_cast<void *>(this), VLC_THREAD_PRIORITY_INPUT))
{
return false;
}
return true;
}
Downloader::~Downloader()
{
killed = true;
vlc_cond_signal(&waitcond);
vlc_join(thread_handle, NULL);
vlc_mutex_destroy(&lock);
vlc_cond_destroy(&waitcond);
}
void Downloader::schedule(HTTPChunkBufferedSource *source)
{
vlc_mutex_lock(&lock);
chunks.push_back(source);
vlc_mutex_unlock(&lock);
vlc_cond_signal(&waitcond);
}
void Downloader::cancel(HTTPChunkBufferedSource *source)
{
vlc_mutex_lock(&lock);
chunks.remove(source);
vlc_mutex_unlock(&lock);
}
void * Downloader::downloaderThread(void *opaque)
{
Downloader *instance = reinterpret_cast<Downloader *>(opaque);
int canc = vlc_savecancel();
instance->Run();
vlc_restorecancel( canc );
return NULL;
}
void Downloader::DownloadSource(HTTPChunkBufferedSource *source)
{
if(!source->isDone())
source->bufferize(HTTPChunkSource::CHUNK_SIZE);
}
void Downloader::Run()
{
while(1)
{
vlc_mutex_lock(&lock);
if(killed)
break;
while(chunks.empty() && !killed)
{
vlc_cond_wait(&waitcond, &lock);
}
if(killed)
break;
if(!chunks.empty())
{
HTTPChunkBufferedSource *source = chunks.front();
DownloadSource(source);
if(source->isDone())
chunks.pop_front();
}
vlc_mutex_unlock(&lock);
}
vlc_mutex_unlock(&lock);
}
/*
* Downloader.hpp
*****************************************************************************
* Copyright (C) 2015 - VideoLAN Authors
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifndef DOWNLOADER_HPP
#define DOWNLOADER_HPP
#include "Chunk.h"
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <vlc_common.h>
#include <list>
namespace adaptative
{
namespace http
{
class Downloader
{
public:
Downloader();
~Downloader();
bool start();
void schedule(HTTPChunkBufferedSource *);
void cancel(HTTPChunkBufferedSource *);
private:
static void * downloaderThread(void *);
void Run();
void DownloadSource(HTTPChunkBufferedSource *);
vlc_thread_t thread_handle;
vlc_mutex_t lock;
vlc_cond_t waitcond;
vlc_mutex_t processlock;
bool killed;
std::list<HTTPChunkBufferedSource *> chunks;
};
}
}
#endif // DOWNLOADER_HPP
......@@ -28,6 +28,7 @@
#include "HTTPConnectionManager.h"
#include "HTTPConnection.hpp"
#include "Sockets.hpp"
#include "Downloader.hpp"
#include <vlc_url.h>
using namespace adaptative::http;
......@@ -36,16 +37,23 @@ HTTPConnectionManager::HTTPConnectionManager (vlc_object_t *stream) :
stream (stream),
rateObserver (NULL)
{
vlc_mutex_init(&lock);
downloader = new (std::nothrow) Downloader();
downloader->start();
}
HTTPConnectionManager::~HTTPConnectionManager ()
{
delete downloader;
this->closeAllConnections();
vlc_mutex_destroy(&lock);
}
void HTTPConnectionManager::closeAllConnections ()
{
vlc_mutex_lock(&lock);
releaseAllConnections();
vlc_delete_all(this->connectionPool);
vlc_mutex_unlock(&lock);
}
void HTTPConnectionManager::releaseAllConnections()
......@@ -75,18 +83,23 @@ HTTPConnection * HTTPConnectionManager::getConnection(const std::string &scheme,
return NULL;
const int sockettype = (scheme == "https") ? TLSSocket::TLS : Socket::REGULAR;
vlc_mutex_lock(&lock);
HTTPConnection *conn = getConnection(hostname, port, sockettype);
if(!conn)
{
Socket *socket = (sockettype == TLSSocket::TLS) ? new (std::nothrow) TLSSocket()
: new (std::nothrow) Socket();
if(!socket)
{
vlc_mutex_unlock(&lock);
return NULL;
}
/* disable pipelined tls until we have ticket/resume session support */
conn = new (std::nothrow) HTTPConnection(stream, socket, sockettype != TLSSocket::TLS);
if(!conn)
{
delete socket;
vlc_mutex_unlock(&lock);
return NULL;
}
......@@ -94,11 +107,13 @@ HTTPConnection * HTTPConnectionManager::getConnection(const std::string &scheme,
if (!conn->connect(hostname, port))
{
vlc_mutex_unlock(&lock);
return NULL;
}
}
conn->setUsed(true);
vlc_mutex_unlock(&lock);
return conn;
}
......
......@@ -40,6 +40,7 @@ namespace adaptative
namespace http
{
class HTTPConnection;
class Downloader;
class HTTPConnectionManager : public IDownloadRateObserver
{
......@@ -48,19 +49,20 @@ namespace adaptative
virtual ~HTTPConnectionManager ();
void closeAllConnections ();
void releaseAllConnections ();
HTTPConnection * getConnection(const std::string &scheme,
const std::string &hostname,
uint16_t port);
virtual void updateDownloadRate(size_t, mtime_t); /* reimpl */
void setDownloadRateObserver(IDownloadRateObserver *);
Downloader *downloader;
private:
void releaseAllConnections ();
vlc_mutex_t lock;
std::vector<HTTPConnection *> connectionPool;
vlc_object_t *stream;
IDownloadRateObserver *rateObserver;
HTTPConnection * getConnection(const std::string &hostname, uint16_t port, int);
};
}
......
......@@ -49,6 +49,12 @@ RateBasedAdaptationLogic::RateBasedAdaptationLogic (vlc_object_t *p_obj_, int w
window_idx = 0;
prevbps = 0;
dlsize = 0;
vlc_mutex_init(&lock);
}
RateBasedAdaptationLogic::~RateBasedAdaptationLogic()
{
vlc_mutex_destroy(&lock);
}
BaseRepresentation *RateBasedAdaptationLogic::getNextRepresentation(BaseAdaptationSet *adaptSet, BaseRepresentation *currep) const
......@@ -56,7 +62,9 @@ BaseRepresentation *RateBasedAdaptationLogic::getNextRepresentation(BaseAdaptati
if(adaptSet == NULL)
return NULL;
vlc_mutex_lock(const_cast<vlc_mutex_t *>(&lock));
size_t availBps = currentBps + ((currep) ? currep->getBandwidth() : 0);
vlc_mutex_unlock(const_cast<vlc_mutex_t *>(&lock));
if(availBps > usedBps)
availBps -= usedBps;
else
......@@ -119,6 +127,8 @@ void RateBasedAdaptationLogic::updateDownloadRate(size_t size, mtime_t time)
* and then defines how fast we adapt to current bandwidth */
const size_t deltamax = omax - omin;
double alpha = (diffsum) ? 0.33 * ((double)deltamax / diffsum) : 0.5;
vlc_mutex_lock(&lock);
bpsAvg = alpha * bpsAvg + (1.0 - alpha) * bps;
BwDebug(msg_Dbg(p_obj, "alpha1 %lf alpha0 %lf dmax %ld ds %ld", alpha,
......@@ -132,12 +142,14 @@ void RateBasedAdaptationLogic::updateDownloadRate(size_t size, mtime_t time)
BwDebug(msg_Info(p_obj, "Current bandwidth %zu KiB/s using %u%%",
(bpsAvg / 8192), (bpsAvg) ? (unsigned)(usedBps * 100.0 / bpsAvg) : 0));
vlc_mutex_unlock(&lock);
}
void RateBasedAdaptationLogic::trackerEvent(const SegmentTrackerEvent &event)
{
if(event.type == SegmentTrackerEvent::SWITCHING)
{
vlc_mutex_lock(&lock);
if(event.u.switching.prev)
usedBps -= event.u.switching.prev->getBandwidth();
if(event.u.switching.next)
......@@ -145,6 +157,7 @@ void RateBasedAdaptationLogic::trackerEvent(const SegmentTrackerEvent &event)
BwDebug(msg_Info(p_obj, "New bandwidth usage %zu KiB/s %u%%",
(usedBps / 8192), (bpsAvg) ? (unsigned)(usedBps * 100.0 / bpsAvg) : 0 ));
vlc_mutex_unlock(&lock);
}
}
......
......@@ -36,6 +36,7 @@ namespace adaptative
{
public:
RateBasedAdaptationLogic (vlc_object_t *, int, int);
virtual ~RateBasedAdaptationLogic ();
BaseRepresentation *getNextRepresentation(BaseAdaptationSet *, BaseRepresentation *) const;
virtual void updateDownloadRate(size_t, mtime_t); /* reimpl */
......@@ -60,6 +61,8 @@ namespace adaptative
size_t dlsize;
mtime_t dllength;
vlc_mutex_t lock;
};
class FixedRateAdaptationLogic : public AbstractAdaptationLogic
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment