Commit 9227c871 authored by François Cartegnie's avatar François Cartegnie 🤞

stream_filter: dash: rewrite the http connection and remove double queuing

Ahead chunks now equals network-caching
parent 1afd4f37
......@@ -63,10 +63,10 @@ void* DASHDownloader::download (void *thread_sys)
do
{
block_t *block = NULL;
ret = conManager->read(&block, BLOCKSIZE);
ret = conManager->read(&block);
if(ret > 0)
buffer->put(block);
}while(ret && !buffer->getEOF());
}while(ret > 0 && !buffer->getEOF());
buffer->setEOF(true);
......
......@@ -29,7 +29,6 @@
#include "adaptationlogic/IAdaptationLogic.h"
#include "buffer/BlockBuffer.h"
#define BLOCKSIZE 32768
#define CHUNKDEFAULTBITRATE 1
#include <iostream>
......
......@@ -32,6 +32,7 @@ using namespace dash::http;
Chunk::Chunk () :
startByte (0),
endByte (0),
bitrate (1),
port (0),
isHostname (false),
length (0),
......@@ -149,11 +150,11 @@ size_t Chunk::getPercentDownloaded () const
{
return (size_t)(((float)this->bytesRead / this->length) * 100);
}
IHTTPConnection* Chunk::getConnection () const
HTTPConnection* Chunk::getConnection () const
{
return this->connection;
}
void Chunk::setConnection (IHTTPConnection *connection)
void Chunk::setConnection (HTTPConnection *connection)
{
this->connection = connection;
}
......@@ -32,7 +32,7 @@
#include <vlc_common.h>
#include <vlc_url.h>
#include "IHTTPConnection.h"
#include "HTTPConnection.h"
#include <vector>
#include <string>
......@@ -59,9 +59,9 @@ namespace dash
uint64_t getBytesRead () const;
uint64_t getBytesToRead () const;
size_t getPercentDownloaded () const;
IHTTPConnection* getConnection () const;
HTTPConnection* getConnection () const;
void setConnection (IHTTPConnection *connection);
void setConnection (HTTPConnection *connection);
void setBytesRead (uint64_t bytes);
void setBytesToRead (uint64_t bytes);
void setLength (uint64_t length);
......@@ -88,7 +88,7 @@ namespace dash
uint64_t length;
uint64_t bytesRead;
uint64_t bytesToRead;
IHTTPConnection *connection;
HTTPConnection *connection;
};
}
}
......
......@@ -26,128 +26,88 @@
#endif
#include "HTTPConnection.h"
#include <vlc_network.h>
#include "Chunk.h"
#include <sstream>
#include <vlc_stream.h>
using namespace dash::http;
HTTPConnection::HTTPConnection (stream_t *stream) :
IHTTPConnection (stream),
peekBufferLen (0)
HTTPConnection::HTTPConnection (stream_t *stream, Chunk *chunk_) :
IHTTPConnection (stream)
{
this->peekBuffer = new uint8_t[PEEKBUFFER];
toRead = 0;
chunk = NULL;
bindChunk(chunk_);
}
HTTPConnection::~HTTPConnection ()
std::string HTTPConnection::buildRequestHeader(const std::string &path) const
{
delete[] this->peekBuffer;
this->closeSocket();
std::string req = IHTTPConnection::buildRequestHeader(path);
return req.append("Connection: close\r\n");
}
int HTTPConnection::read (void *p_buffer, size_t len)
void HTTPConnection::bindChunk(Chunk *chunk_)
{
if(this->peekBufferLen == 0)
if(chunk_ == chunk)
return;
if (chunk_)
{
ssize_t size = net_Read(stream, httpSocket, NULL, p_buffer, len, false);
if(size <= 0)
return 0;
return size;
chunk_->setConnection(this);
if(!chunk->hasHostname())
chunk->setUrl(getUrlRelative(chunk));
}
memcpy(p_buffer, this->peekBuffer, this->peekBufferLen);
int ret = this->peekBufferLen;
this->peekBufferLen = 0;
return ret;
}
int HTTPConnection::peek (const uint8_t **pp_peek, size_t i_peek)
{
if(this->peekBufferLen == 0)
this->peekBufferLen = this->read(this->peekBuffer, PEEKBUFFER);
int size = i_peek > this->peekBufferLen ? this->peekBufferLen : i_peek;
uint8_t *peek = new uint8_t [size];
memcpy(peek, this->peekBuffer, size);
*pp_peek = peek;
return size;
chunk = chunk_;
}
std::string HTTPConnection::getRequestHeader (const Chunk *chunk) const
void HTTPConnection::releaseChunk()
{
return IHTTPConnection::getRequestHeader(chunk)
.append("Connection: close\r\n");
}
bool HTTPConnection::init (Chunk *chunk)
{
if (IHTTPConnection::init(chunk))
if(chunk)
{
HeaderReply reply;
return parseHeader(&reply);
chunk->setConnection(NULL);
chunk = NULL;
}
else
return false;
}
bool HTTPConnection::parseHeader (HeaderReply *reply)
void HTTPConnection::onHeader(const std::string &key,
const std::string &value)
{
std::string line = this->readLine();
if(line.size() == 0)
return false;
while(line.compare("\r\n"))
if(key == "Content-Length")
{
if(!strncasecmp(line.c_str(), "Content-Length", 14))
reply->contentLength = atoi(line.substr(15,line.size()).c_str());
line = this->readLine();
if(line.size() == 0)
return false;
std::istringstream ss(value);
size_t length;
ss >> length;
chunk->setLength(length);
toRead = length;
}
return true;
}
std::string HTTPConnection::readLine ()
std::string HTTPConnection::extraRequestHeaders() const
{
std::stringstream ss;
char c[1];
ssize_t size = net_Read(stream, httpSocket, NULL, c, 1, false);
while(size >= 0)
if(chunk->usesByteRange())
{
ss << c[0];
if(c[0] == '\n')
break;
size = net_Read(stream, httpSocket, NULL, c, 1, false);
ss << "Range: bytes=" << chunk->getStartByte() << "-";
if(chunk->getEndByte())
ss << chunk->getEndByte();
ss << "\r\n";
}
if(size > 0)
return ss.str();
return "";
return ss.str();
}
bool HTTPConnection::send (const std::string& data)
{
ssize_t size = net_Write(this->stream, this->httpSocket, NULL, data.c_str(), data.size());
if (size == -1)
{
return false;
}
if ((size_t)size != data.length())
{
this->send(data.substr(size, data.size()));
}
return true;
std::string HTTPConnection::getUrlRelative(const Chunk *chunk) const
{
std::stringstream ss;
ss << stream->psz_access << "://" << Helper::combinePaths(Helper::getDirectoryPath(stream->psz_path), chunk->getUrl());
return ss.str();
}
void HTTPConnection::closeSocket ()
bool HTTPConnection::isAvailable() const
{
if (httpSocket >= 0)
net_Close(httpSocket);
return chunk == NULL;
}
void HTTPConnection::disconnect()
{
toRead = 0;
}
......@@ -28,41 +28,31 @@
#include <string>
#include "http/IHTTPConnection.h"
#include "http/Chunk.h"
#include "Helper.h"
#define PEEKBUFFER 4096
namespace dash
{
namespace http
{
class Chunk;
class HTTPConnection : public IHTTPConnection
{
public:
HTTPConnection (stream_t *stream);
virtual ~HTTPConnection ();
virtual bool init (Chunk *chunk);
void closeSocket ();
virtual int read (void *p_buffer, size_t len);
virtual int peek (const uint8_t **pp_peek, size_t i_peek);
HTTPConnection (stream_t *stream, Chunk *chunk = NULL);
virtual void bindChunk (Chunk *chunk);
virtual void onHeader (const std::string &line,
const std::string &value);
virtual bool isAvailable () const;
virtual void disconnect ();
virtual void releaseChunk();
protected:
size_t toRead;
Chunk *chunk;
virtual std::string extraRequestHeaders() const;
virtual std::string buildRequestHeader(const std::string &path) const;
class HeaderReply
{
public:
int contentLength;
};
uint8_t *peekBuffer;
size_t peekBufferLen;
virtual bool send (const std::string& data);
bool parseHeader (HeaderReply *);
std::string readLine ();
virtual std::string getRequestHeader(const Chunk *chunk) const; /* reimpl */
std::string getUrlRelative(const Chunk *chunk) const;
};
}
}
......
......@@ -28,11 +28,13 @@
#include "HTTPConnectionManager.h"
#include "mpd/Segment.h"
#include <vlc_block.h>
#include <vlc_stream.h>
using namespace dash::http;
using namespace dash::logic;
const size_t HTTPConnectionManager::PIPELINE = 80;
const size_t HTTPConnectionManager::PIPELINELENGTH = 2;
const uint64_t HTTPConnectionManager::CHUNKDEFAULTBITRATE = 1;
HTTPConnectionManager::HTTPConnectionManager (IAdaptationLogic *adaptationLogic, stream_t *stream) :
......@@ -58,52 +60,73 @@ void HTTPConnectionManager::closeAllConnections
vlc_delete_all(this->connectionPool);
vlc_delete_all(this->downloadQueue);
}
int HTTPConnectionManager::read(block_t **pp_block, size_t len)
ssize_t HTTPConnectionManager::read(block_t **pp_block)
{
if(this->downloadQueue.size() == 0)
if(!this->addChunk(this->adaptationLogic->getNextChunk()))
return 0;
Chunk *chunk;
if(this->downloadQueue.front()->getPercentDownloaded() > HTTPConnectionManager::PIPELINE &&
this->downloadQueue.size() < HTTPConnectionManager::PIPELINELENGTH)
this->addChunk(this->adaptationLogic->getNextChunk());
if(downloadQueue.empty())
{
chunk = adaptationLogic->getNextChunk();
if(!connectChunk(chunk))
return -1;
else
downloadQueue.push_back(chunk);
}
int ret = 0;
chunk = downloadQueue.front();
block_t *block = block_Alloc(len);
if(chunk->getBytesRead() == 0)
{
if (!chunk->getConnection()->query(chunk->getPath()))
return -1;
}
/* chunk length should be set at connect/query reply time */
size_t readsize = chunk->getBytesToRead();
if (readsize > 128000)
readsize = 32768;
block_t *block = block_Alloc(readsize);
if(!block)
return -1;
mtime_t start = mdate();
ret = this->downloadQueue.front()->getConnection()->read(block->p_buffer, block->i_buffer);
mtime_t end = mdate();
mtime_t time = mdate();
ssize_t ret = chunk->getConnection()->read(block->p_buffer, readsize);
time = mdate() - time;
block->i_length = (mtime_t)((ret * 8) / ((float)this->downloadQueue.front()->getBitrate() / 1000000));
double time = ((double)(end - start)) / 1000000;
block->i_length = (mtime_t)((ret * 8) / ((float)chunk->getBitrate() / CLOCK_FREQ));
if(ret <= 0)
{
block_Release(block);
*pp_block = NULL;
this->bpsLastChunk = this->bpsCurrentChunk;
this->bytesReadChunk = 0;
this->timeChunk = 0;
delete(this->downloadQueue.front());
this->downloadQueue.pop_front();
delete(chunk);
downloadQueue.pop_front();
return this->read(pp_block, len);
return read(pp_block);
}
else
{
this->updateStatistics(ret, time);
updateStatistics((size_t)ret, ((double)time) / CLOCK_FREQ);
block->i_buffer = ret;
if (chunk->getBytesToRead() == 0)
{
chunk->onDownload(block->p_buffer, block->i_buffer);
delete chunk;
downloadQueue.pop_front();
}
}
*pp_block = block;
return ret;
}
void HTTPConnectionManager::attach (IDownloadRateObserver *observer)
{
this->rateObservers.push_back(observer);
......@@ -115,17 +138,19 @@ void HTTPConnectionManager::notify
for(size_t i = 0; i < this->rateObservers.size(); i++)
this->rateObservers.at(i)->downloadRateChanged(this->bpsAvg, this->bpsLastChunk);
}
std::vector<PersistentConnection *> HTTPConnectionManager::getConnectionsForHost (const std::string &hostname)
{
std::vector<PersistentConnection *> cons;
for(size_t i = 0; i < this->connectionPool.size(); i++)
if(!this->connectionPool.at(i)->getHostname().compare(hostname) || !this->connectionPool.at(i)->isConnected())
cons.push_back(this->connectionPool.at(i));
return cons;
PersistentConnection * HTTPConnectionManager::getConnectionForHost(const std::string &hostname)
{
std::vector<PersistentConnection *>::const_iterator it;
for(it = connectionPool.begin(); it != connectionPool.end(); it++)
{
if(!(*it)->getHostname().compare(hostname))
return *it;
}
return NULL;
}
void HTTPConnectionManager::updateStatistics (int bytes, double time)
void HTTPConnectionManager::updateStatistics(size_t bytes, double time)
{
this->bytesReadSession += bytes;
this->bytesReadChunk += bytes;
......@@ -143,29 +168,28 @@ void HTTPConnectionManager::updateStatistics
this->notify();
}
bool HTTPConnectionManager::addChunk (Chunk *chunk)
bool HTTPConnectionManager::connectChunk(Chunk *chunk)
{
if(chunk == NULL)
return false;
this->downloadQueue.push_back(chunk);
std::vector<PersistentConnection *> cons = this->getConnectionsForHost(chunk->getHostname());
msg_Dbg(stream, "Retrieving %s", chunk->getUrl().c_str());
if(cons.size() == 0)
PersistentConnection *conn = getConnectionForHost(chunk->getHostname());
if(!conn)
{
PersistentConnection *con = new PersistentConnection(this->stream);
this->connectionPool.push_back(con);
cons.push_back(con);
conn = new PersistentConnection(stream, chunk);
if(!conn)
return false;
if (!chunk->getConnection()->connect(chunk->getHostname(), chunk->getPort()))
return false;
connectionPool.push_back(conn);
}
size_t pos = this->chunkCount % cons.size();
cons.at(pos)->addChunk(chunk);
chunk->setConnection(cons.at(pos));
conn->bindChunk(chunk);
this->chunkCount++;
chunkCount++;
if(chunk->getBitrate() <= 0)
chunk->setBitrate(HTTPConnectionManager::CHUNKDEFAULTBITRATE);
......
......@@ -48,8 +48,7 @@ namespace dash
virtual ~HTTPConnectionManager ();
void closeAllConnections ();
bool addChunk (Chunk *chunk);
int read (block_t **, size_t);
ssize_t read (block_t **);
void attach (dash::logic::IDownloadRateObserver *observer);
void notify ();
......@@ -68,12 +67,11 @@ namespace dash
double timeSession;
double timeChunk;
static const size_t PIPELINE;
static const size_t PIPELINELENGTH;
static const uint64_t CHUNKDEFAULTBITRATE;
std::vector<PersistentConnection *> getConnectionsForHost (const std::string &hostname);
void updateStatistics (int bytes, double time);
bool connectChunk (Chunk *chunk);
PersistentConnection * getConnectionForHost (const std::string &hostname);
void updateStatistics (size_t bytes, double time);
};
}
......
......@@ -24,6 +24,7 @@
#include "dash.hpp"
#include <vlc_network.h>
#include <vlc_stream.h>
#include <sstream>
......@@ -37,45 +38,135 @@ IHTTPConnection::IHTTPConnection(stream_t *stream_)
IHTTPConnection::~IHTTPConnection()
{
disconnect();
}
bool IHTTPConnection::init(Chunk *chunk)
bool IHTTPConnection::connect(const std::string &hostname, int port)
{
if(chunk == NULL)
httpSocket = net_ConnectTCP(stream, hostname.c_str(), port);
this->hostname = hostname;
if(httpSocket == -1)
return false;
if(!chunk->hasHostname())
return true;
}
bool IHTTPConnection::connected() const
{
return (httpSocket != -1);
}
void IHTTPConnection::disconnect()
{
if (httpSocket >= 0)
{
chunk->setUrl(getUrlRelative(chunk));
if(!chunk->hasHostname())
return false;
net_Close(httpSocket);
httpSocket = -1;
}
}
bool IHTTPConnection::query(const std::string &path)
{
std::string header = buildRequestHeader(path);
header.append("\r\n");
if (!send( header ) || !parseReply())
return false;
return true;
}
httpSocket = net_ConnectTCP(stream, chunk->getHostname().c_str(), chunk->getPort());
ssize_t IHTTPConnection::read(void *p_buffer, size_t len)
{
ssize_t size = net_Read(stream, httpSocket, NULL, p_buffer, len, true);
if(size <= 0)
return -1;
else
return size;
}
if(httpSocket == -1)
bool IHTTPConnection::send(const std::string &data)
{
return send(data.c_str(), data.length());
}
bool IHTTPConnection::send(const void *buf, size_t size)
{
if (size == 0)
return true;
if (httpSocket == -1)
return false;
ssize_t ret = net_Write(stream, httpSocket, NULL, buf, size);
if (ret <= 0)
return false;
return send(getRequestHeader(chunk).append("\r\n"));
if ( (size_t)ret < size )
send( ((uint8_t*)buf) + ret, size - ret );
return true;
}
std::string IHTTPConnection::getRequestHeader(const Chunk *chunk) const
bool IHTTPConnection::parseReply()
{
std::stringstream req;
req << "GET " << chunk->getPath() << " HTTP/1.1\r\n" <<
"Host: " << chunk->getHostname() << "\r\n" <<
"User-Agent: " << std::string(stream->p_sys->psz_useragent) << "\r\n";
std::string line = readLine();
if(chunk->usesByteRange())
req << "Range: bytes=" << chunk->getStartByte() << "-" << chunk->getEndByte() << "\r\n";
if(line.empty())
return false;
return req.str();
if (line.compare(0, 9, "HTTP/1.1 ")!=0)
return false;
std::istringstream ss(line.substr(9));
int replycode;
ss >> replycode;
if (replycode != 200 && replycode != 206)
return false;
readLine();
while(!line.empty() && line.compare("\r\n"))
{
size_t split = line.find_first_of(':');
size_t value = split + 1;
while(line.at(value) == ' ')
value++;
onHeader(line.substr(0, split), line.substr(value));
line = readLine();
}
return true;
}
std::string IHTTPConnection::getUrlRelative(const Chunk *chunk) const
std::string IHTTPConnection::readLine()
{
std::stringstream ss;
ss << stream->psz_access << "://" << Helper::combinePaths(Helper::getDirectoryPath(stream->psz_path), chunk->getUrl());
return ss.str();
char c[1];
ssize_t size = net_Read(stream, httpSocket, NULL, c, 1, false);
while(size >= 0)
{
ss << c[0];
if(c[0] == '\n')
break;
size = net_Read(stream, httpSocket, NULL, c, 1, false);
}
if(size > 0)
return ss.str();
return "";
}
std::string IHTTPConnection::buildRequestHeader(const std::string &path) const
{
std::stringstream req;
req << "GET " << path << " HTTP/1.1\r\n" <<
"Host: " << hostname << "\r\n" <<
"User-Agent: " << std::string(stream->p_sys->psz_useragent) << "\r\n";
req << extraRequestHeaders();
return req