Commit 88fc7cce authored by Christopher Mueller's avatar Christopher Mueller Committed by Hugo Beauzée-Luyssen
Browse files

dash: enabled persistent connections and pipelining


Signed-off-by: default avatarHugo Beauzée-Luyssen <beauze.h@gmail.com>
parent 4810256b
......@@ -32,11 +32,11 @@ using namespace dash::http;
using namespace dash::logic;
using namespace dash::buffer;
DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager, IAdaptationLogic *adaptationLogic, BlockBuffer *buffer)
DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager, BlockBuffer *buffer)
{
this->t_sys = (thread_sys_t *) malloc(sizeof(thread_sys_t));
this->t_sys->conManager = conManager;
this->t_sys->adaptationLogic = adaptationLogic;
this->t_sys->buffer = buffer;
}
DASHDownloader::~DASHDownloader ()
......@@ -57,42 +57,24 @@ void* DASHDownloader::download (void *thread_sys)
{
thread_sys_t *t_sys = (thread_sys_t *) thread_sys;
HTTPConnectionManager *conManager = t_sys->conManager;
IAdaptationLogic *adaptationLogic = t_sys->adaptationLogic;
BlockBuffer *buffer = t_sys->buffer;
Chunk *currentChunk = NULL;
block_t *block = block_Alloc(BLOCKSIZE);
int ret = 0;
do
{
if(currentChunk == NULL)
{
currentChunk = adaptationLogic->getNextChunk();
if(currentChunk == NULL)
{
buffer->setEOF(true);
}
}
else
ret = conManager->read(block);
if(ret > 0)
{
int ret = conManager->read(currentChunk, block->p_buffer, block->i_buffer);
if(ret <= 0)
{
currentChunk = NULL;
}
else
{
block_t *bufBlock = block_Alloc(ret);
memcpy(bufBlock->p_buffer, block->p_buffer, ret);
if(currentChunk->getBitrate() <= 0)
currentChunk->setBitrate(CHUNKDEFAULTBITRATE);
block_t *bufBlock = block_Alloc(ret);
memcpy(bufBlock->p_buffer, block->p_buffer, ret);
bufBlock->i_length = (mtime_t)((ret * 8) / ((float)currentChunk->getBitrate() / 1000000));
buffer->put(bufBlock);
}
bufBlock->i_length = block->i_length;
buffer->put(bufBlock);
}
}while(!buffer->getEOF());
}while(ret && !buffer->getEOF());
buffer->setEOF(true);
block_Release(block);
return NULL;
......
......@@ -39,14 +39,13 @@ namespace dash
struct thread_sys_t
{
dash::http::HTTPConnectionManager *conManager;
logic::IAdaptationLogic *adaptationLogic;
buffer::BlockBuffer *buffer;
};
class DASHDownloader
{
public:
DASHDownloader (http::HTTPConnectionManager *conManager, logic::IAdaptationLogic *adaptationLogic, buffer::BlockBuffer *buffer);
DASHDownloader (http::HTTPConnectionManager *conManager, buffer::BlockBuffer *buffer);
virtual ~DASHDownloader ();
bool start ();
......
......@@ -34,9 +34,8 @@ using namespace dash::logic;
using namespace dash::mpd;
using namespace dash::buffer;
DASHManager::DASHManager ( HTTPConnectionManager *conManager, MPD *mpd,
DASHManager::DASHManager ( MPD *mpd,
IAdaptationLogic::LogicType type, stream_t *stream) :
conManager ( conManager ),
currentChunk ( NULL ),
adaptationLogic( NULL ),
logicType ( type ),
......@@ -51,8 +50,10 @@ DASHManager::DASHManager ( HTTPConnectionManager *conManager, MPD *mpd,
if ( this->adaptationLogic == NULL )
return ;
this->conManager = new dash::http::HTTPConnectionManager(this->adaptationLogic, this->stream);
this->buffer = new BlockBuffer(this->stream);
this->downloader = new DASHDownloader(this->conManager, this->adaptationLogic, this->buffer);
this->downloader = new DASHDownloader(this->conManager, this->buffer);
this->conManager->attach(this->adaptationLogic);
this->buffer->attach(this->adaptationLogic);
......@@ -61,6 +62,7 @@ DASHManager::~DASHManager ()
{
delete this->downloader;
delete this->buffer;
delete this->conManager;
delete this->adaptationLogic;
delete this->mpdManager;
}
......
......@@ -40,7 +40,7 @@ namespace dash
class DASHManager
{
public:
DASHManager( http::HTTPConnectionManager *conManager, mpd::MPD *mpd,
DASHManager( mpd::MPD *mpd,
logic::IAdaptationLogic::LogicType type, stream_t *stream);
virtual ~DASHManager ();
......
......@@ -74,9 +74,8 @@ vlc_module_end ()
*****************************************************************************/
struct stream_sys_t
{
dash::DASHManager *p_dashManager;
dash::http::HTTPConnectionManager *p_conManager;
dash::mpd::MPD *p_mpd;
dash::DASHManager *p_dashManager;
dash::mpd::MPD *p_mpd;
uint64_t position;
bool isLive;
};
......@@ -114,24 +113,20 @@ static int Open(vlc_object_t *p_obj)
return VLC_ENOMEM;
p_sys->p_mpd = mpd;
dash::http::HTTPConnectionManager *p_conManager =
new dash::http::HTTPConnectionManager( p_stream );
dash::DASHManager*p_dashManager =
new dash::DASHManager( p_conManager, p_sys->p_mpd,
dash::logic::IAdaptationLogic::RateBased, p_stream);
dash::DASHManager*p_dashManager = new dash::DASHManager(p_sys->p_mpd,
dash::logic::IAdaptationLogic::RateBased,
p_stream);
if ( p_dashManager->getMpdManager() == NULL ||
p_dashManager->getMpdManager()->getMPD() == NULL ||
p_dashManager->getAdaptionLogic() == NULL ||
p_dashManager->start() == false)
{
delete p_conManager;
delete p_dashManager;
free( p_sys );
return VLC_EGENERIC;
}
p_sys->p_dashManager = p_dashManager;
p_sys->p_conManager = p_conManager;
p_sys->position = 0;
p_sys->isLive = p_dashManager->getMpdManager()->getMPD()->isLive();
p_stream->p_sys = p_sys;
......@@ -151,9 +146,7 @@ static void Close(vlc_object_t *p_obj)
stream_t *p_stream = (stream_t*) p_obj;
stream_sys_t *p_sys = (stream_sys_t *) p_stream->p_sys;
dash::DASHManager *p_dashManager = p_sys->p_dashManager;
dash::http::HTTPConnectionManager *p_conManager = p_sys->p_conManager;
delete(p_conManager);
delete(p_dashManager);
free(p_sys);
}
......
......@@ -31,118 +31,70 @@
using namespace dash::http;
using namespace dash::logic;
HTTPConnectionManager::HTTPConnectionManager (stream_t *stream)
const size_t HTTPConnectionManager::PIPELINE = 80;
const size_t HTTPConnectionManager::PIPELINELENGTH = 2;
const uint64_t HTTPConnectionManager::CHUNKDEFAULTBITRATE = 1;
HTTPConnectionManager::HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream) :
adaptationLogic (adaptationLogic),
stream (stream),
chunkCount (0),
bpsAvg (0),
bpsLastChunk (0),
bpsCurrentChunk (0),
bytesReadSession (0),
bytesReadChunk (0),
timeSession (0),
timeChunk (0)
{
this->timeSecSession = 0;
this->bytesReadSession = 0;
this->timeSecChunk = 0;
this->bytesReadChunk = 0;
this->bpsAvg = 0;
this->bpsLastChunk = 0;
this->chunkCount = 0;
this->stream = stream;
}
HTTPConnectionManager::~HTTPConnectionManager ()
{
this->closeAllConnections();
}
bool HTTPConnectionManager::closeConnection( IHTTPConnection *con )
void HTTPConnectionManager::closeAllConnections ()
{
for(std::map<Chunk*, HTTPConnection *>::iterator it = this->chunkMap.begin();
it != this->chunkMap.end(); ++it)
{
if( it->second == con )
{
delete con;
this->chunkMap.erase( it );
return true;
}
}
return false;
vlc_delete_all(this->connectionPool);
vlc_delete_all(this->downloadQueue);
}
bool HTTPConnectionManager::closeConnection( Chunk *chunk )
{
HTTPConnection *con = this->chunkMap[chunk];
bool ret = this->closeConnection(con);
this->chunkMap.erase(chunk);
delete(chunk);
return ret;
}
void HTTPConnectionManager::closeAllConnections ()
int HTTPConnectionManager::read (block_t *block)
{
std::map<Chunk *, HTTPConnection *>::iterator it;
if(this->downloadQueue.size() == 0)
if(!this->addChunk(this->adaptationLogic->getNextChunk()))
return 0;
for(it = this->chunkMap.begin(); it != this->chunkMap.end(); ++it)
delete(it->second);
if(this->downloadQueue.front()->getPercentDownloaded() > HTTPConnectionManager::PIPELINE &&
this->downloadQueue.size() < HTTPConnectionManager::PIPELINELENGTH)
this->addChunk(this->adaptationLogic->getNextChunk());
this->chunkMap.clear();
}
int HTTPConnectionManager::read( Chunk *chunk, void *p_buffer, size_t len )
{
if(this->chunkMap.find(chunk) == this->chunkMap.end())
{
this->bytesReadChunk = 0;
this->timeSecChunk = 0;
if ( this->initConnection( chunk ) == NULL )
return -1;
}
int ret = 0;
mtime_t start = mdate();
int ret = this->chunkMap[chunk]->read(p_buffer, len);
ret = this->downloadQueue.front()->getConnection()->read(block->p_buffer, block->i_buffer);
mtime_t end = mdate();
if( ret <= 0 )
this->closeConnection( chunk );
else
{
double time = ((double)(end - start)) / 1000000;
this->bytesReadSession += ret;
this->bytesReadChunk += ret;
this->timeSecSession += time;
this->timeSecChunk += time;
block->i_length = (mtime_t)((ret * 8) / ((float)this->downloadQueue.front()->getBitrate() / 1000000));
if(this->timeSecSession > 0)
this->bpsAvg = (this->bytesReadSession / this->timeSecSession) * 8;
double time = ((double)(end - start)) / 1000000;
if(this->timeSecChunk > 0)
this->bpsLastChunk = (this->bytesReadChunk / this->timeSecChunk) * 8;
if(this->chunkCount < 2)
this->bpsAvg = 0;
if(ret <= 0)
{
this->bpsLastChunk = this->bpsCurrentChunk;
this->bytesReadChunk = 0;
this->timeChunk = 0;
if(this->chunkCount < 2)
this->bpsLastChunk = 0;
delete(this->downloadQueue.front());
this->downloadQueue.pop_front();
this->notify();
return this->read(block);
}
return ret;
}
int HTTPConnectionManager::peek (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek)
{
if(this->chunkMap.find(chunk) == this->chunkMap.end())
else
{
if ( this->initConnection(chunk) == NULL )
return -1;
this->updateStatistics(ret, time);
}
return this->chunkMap[chunk]->peek(pp_peek, i_peek);
}
IHTTPConnection* HTTPConnectionManager::initConnection(Chunk *chunk)
{
HTTPConnection *con = new HTTPConnection(this->stream);
if ( con->init(chunk) == false )
return NULL;
this->chunkMap[chunk] = con;
this->chunkCount++;
return con;
return ret;
}
void HTTPConnectionManager::attach (IDownloadRateObserver *observer)
{
......@@ -155,3 +107,60 @@ void HTTPConnectionManager::notify ()
for(size_t i = 0; i < this->rateObservers.size(); i++)
this->rateObservers.at(i)->downloadRateChanged(this->bpsAvg, this->bpsLastChunk);
}
std::vector<PersistentConnection *> HTTPConnectionManager::getConnectionsForHost (const std::string &hostname)
{
std::vector<PersistentConnection *> cons;
for(size_t i = 0; i < this->connectionPool.size(); i++)
if(!this->connectionPool.at(i)->getHostname().compare(hostname) || !this->connectionPool.at(i)->isConnected())
cons.push_back(this->connectionPool.at(i));
return cons;
}
void HTTPConnectionManager::updateStatistics (int bytes, double time)
{
this->bytesReadSession += bytes;
this->bytesReadChunk += bytes;
this->timeSession += time;
this->timeChunk += time;
this->bpsAvg = (int64_t) ((this->bytesReadSession * 8) / this->timeSession);
this->bpsCurrentChunk = (int64_t) ((this->bytesReadChunk * 8) / this->timeChunk);
if(this->bpsAvg < 0)
this->bpsAvg = 0;
if(this->bpsCurrentChunk < 0)
this->bpsCurrentChunk = 0;
this->notify();
}
bool HTTPConnectionManager::addChunk (Chunk *chunk)
{
if(chunk == NULL)
return false;
this->downloadQueue.push_back(chunk);
std::vector<PersistentConnection *> cons = this->getConnectionsForHost(chunk->getHostname());
if(cons.size() == 0)
{
PersistentConnection *con = new PersistentConnection(this->stream);
this->connectionPool.push_back(con);
cons.push_back(con);
}
size_t pos = this->chunkCount % cons.size();
cons.at(pos)->addChunk(chunk);
chunk->setConnection(cons.at(pos));
this->chunkCount++;
if(chunk->getBitrate() <= 0)
chunk->setBitrate(HTTPConnectionManager::CHUNKDEFAULTBITRATE);
return true;
}
......@@ -29,14 +29,13 @@
#include <string>
#include <vector>
#include <deque>
#include <iostream>
#include <ctime>
#include <map>
#include <limits.h>
#include "http/HTTPConnection.h"
#include "http/Chunk.h"
#include "adaptationlogic/IDownloadRateObserver.h"
#include "http/PersistentConnection.h"
#include "adaptationlogic/IAdaptationLogic.h"
namespace dash
{
......@@ -45,31 +44,36 @@ namespace dash
class HTTPConnectionManager
{
public:
HTTPConnectionManager (stream_t *stream);
HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream);
virtual ~HTTPConnectionManager ();
void closeAllConnections ();
bool closeConnection (IHTTPConnection *con);
int read (Chunk *chunk, void *p_buffer, size_t len);
int peek (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek);
void attach (dash::logic::IDownloadRateObserver *observer);
void notify ();
void closeAllConnections ();
bool addChunk (Chunk *chunk);
int read (block_t *block);
void attach (dash::logic::IDownloadRateObserver *observer);
void notify ();
private:
std::map<Chunk *, HTTPConnection *> chunkMap;
std::map<std::string, HTTPConnection *> urlMap;
std::vector<dash::logic::IDownloadRateObserver *> rateObservers;
uint64_t bpsAvg;
uint64_t bpsLastChunk;
long bytesReadSession;
double timeSecSession;
long bytesReadChunk;
double timeSecChunk;
std::deque<Chunk *> downloadQueue;
std::vector<PersistentConnection *> connectionPool;
logic::IAdaptationLogic *adaptationLogic;
stream_t *stream;
int chunkCount;
int64_t bpsAvg;
int64_t bpsLastChunk;
int64_t bpsCurrentChunk;
int64_t bytesReadSession;
int64_t bytesReadChunk;
double timeSession;
double timeChunk;
bool closeConnection( Chunk *chunk );
IHTTPConnection* initConnection( Chunk *chunk );
static const size_t PIPELINE;
static const size_t PIPELINELENGTH;
static const uint64_t CHUNKDEFAULTBITRATE;
std::vector<PersistentConnection *> getConnectionsForHost (const std::string &hostname);
void updateStatistics (int bytes, double time);
};
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment