Commit 02b938e8 authored by Paweł Wegner's avatar Paweł Wegner
Browse files

MicroHttpdServer: suspend fix.

parent e5eced90
......@@ -218,17 +218,37 @@ class TransferListener : public mega::MegaTransferListener, public Listener {
struct Buffer {
using Pointer = std::shared_ptr<Buffer>;
void resume() {
if (response_ && suspended_) {
suspended_ = false;
response_->resume();
int read(char* buf, uint32_t max) {
std::unique_lock<std::mutex> lock(mutex_);
if (done_) return IHttpServer::IResponse::ICallback::Abort;
if (data_.empty()) return IHttpServer::IResponse::ICallback::Suspend;
size_t cnt = std::min(data_.size(), (size_t)max);
for (size_t i = 0; i < cnt; i++) {
buf[i] = data_.front();
data_.pop();
}
return cnt;
}
void put(const char* data, uint32_t length) {
std::lock_guard<std::mutex> lock(mutex_);
for (uint32_t i = 0; i < length; i++) data_.push(data[i]);
}
void done() {
std::lock_guard<std::mutex> lock(mutex_);
done_ = true;
}
void resume() {
std::lock_guard<std::mutex> lock(response_mutex_);
if (response_) response_->resume();
}
std::mutex mutex_;
std::queue<char> data_;
std::mutex response_mutex_;
IHttpServer::IResponse* response_;
bool suspended_ = false;
bool done_ = false;
};
......@@ -241,18 +261,7 @@ class HttpData : public IHttpServer::IResponse::ICallback {
~HttpData() { mega_->removeStreamRequest(request_); }
int putData(char* buf, size_t max) override {
std::unique_lock<std::mutex> lock(buffer_->mutex_);
if (buffer_->done_) return Abort;
if (buffer_->data_.empty()) {
buffer_->suspended_ = true;
return Suspend;
}
size_t cnt = std::min(buffer_->data_.size(), max);
for (size_t i = 0; i < cnt; i++) {
buf[i] = buffer_->data_.front();
buffer_->data_.pop();
}
return cnt;
return buffer_->read(buf, max);
}
Buffer::Pointer buffer_;
......@@ -265,14 +274,12 @@ class HttpDataCallback : public IDownloadFileCallback {
HttpDataCallback(Buffer::Pointer d) : buffer_(d) {}
void receivedData(const char* data, uint32_t length) override {
std::lock_guard<std::mutex> lock(buffer_->mutex_);
for (uint32_t i = 0; i < length; i++) buffer_->data_.push(data[i]);
buffer_->put(data, length);
buffer_->resume();
}
void done(EitherError<void>) override {
std::lock_guard<std::mutex> lock(buffer_->mutex_);
buffer_->done_ = true;
buffer_->done();
buffer_->resume();
}
......@@ -337,7 +344,7 @@ IHttpServer::IResponse::Pointer MegaNz::HttpServerCallback::handle(
auto response = request.response(code, headers, range.size, std::move(data));
buffer->response_ = response.get();
response->completed([buffer]() {
std::unique_lock<std::mutex> lock(buffer->mutex_);
std::unique_lock<std::mutex> lock(buffer->response_mutex_);
buffer->response_ = nullptr;
});
download_request->run();
......
......@@ -58,15 +58,25 @@ MicroHttpdServer::Response::Response(MHD_Connection* connection, int code,
const IResponse::Headers& headers,
int size,
IResponse::ICallback::Pointer callback)
: connection_(connection), code_(code) {
using DataType = std::pair<MHD_Connection*, IResponse::ICallback::Pointer>;
: data_(std::make_shared<SharedData>()),
connection_(connection),
code_(code) {
struct DataType {
std::shared_ptr<SharedData> data_;
MHD_Connection* connection_;
IResponse::ICallback::Pointer callback_;
};
auto data_provider = [](void* cls, uint64_t, char* buf,
size_t max) -> ssize_t {
auto data = static_cast<DataType*>(cls);
auto r = data->second->putData(buf, max);
std::unique_lock<std::mutex> lock(data->data_->mutex_);
auto r = data->callback_->putData(buf, max);
if (r == IResponse::ICallback::Suspend) {
MHD_suspend_connection(data->first);
if (!data->data_->suspended_) {
data->data_->suspended_ = true;
MHD_suspend_connection(data->connection_);
}
return 0;
} else if (r == IResponse::ICallback::Abort)
return MHD_CONTENT_READER_END_WITH_ERROR;
......@@ -77,7 +87,8 @@ MicroHttpdServer::Response::Response(MHD_Connection* connection, int code,
auto data = static_cast<DataType*>(cls);
delete data;
};
auto data = util::make_unique<DataType>(connection, std::move(callback));
auto data = util::make_unique<DataType>(
DataType{data_, connection, std::move(callback)});
response_ = MHD_create_response_from_callback(size, CHUNK_SIZE, data_provider,
data.release(), release_data);
for (auto it : headers)
......@@ -89,7 +100,11 @@ MicroHttpdServer::Response::~Response() {
}
void MicroHttpdServer::Response::resume() {
MHD_resume_connection(connection_);
std::unique_lock<std::mutex> lock(data_->mutex_);
if (data_->suspended_) {
data_->suspended_ = false;
MHD_resume_connection(connection_);
}
}
MicroHttpdServer::Request::Request(MHD_Connection* c, const char* url)
......
......@@ -28,6 +28,8 @@
#include "IHttpServer.h"
#include <mutex>
namespace cloudstorage {
class MicroHttpdServer : public IHttpServer {
......@@ -49,6 +51,11 @@ class MicroHttpdServer : public IHttpServer {
void completed(CompletedCallback f) override { callback_ = f; }
protected:
struct SharedData {
std::mutex mutex_;
bool suspended_ = false;
};
std::shared_ptr<SharedData> data_;
MHD_Connection* connection_;
MHD_Response* response_;
int code_;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment