Commit 9e05f70b authored by Paweł Wegner's avatar Paweł Wegner
Browse files

IHttpServer: simplified interface.

parent 3f5a8807
......@@ -218,41 +218,34 @@ class TransferListener : public mega::MegaTransferListener, public Listener {
struct Buffer {
using Pointer = std::shared_ptr<Buffer>;
Buffer(IHttpServer::IConnection::Pointer c) : connection_(c) {}
void resume() {
if (connection_ && suspended_) {
if (response_ && suspended_) {
suspended_ = false;
connection_->resume();
}
}
void suspend() {
if (connection_ && !suspended_) {
suspended_ = true;
connection_->suspend();
response_->resume();
}
}
std::mutex mutex_;
std::queue<char> data_;
IHttpServer::IConnection::Pointer connection_;
IHttpServer::IResponse* response_;
bool suspended_ = false;
bool done_ = false;
};
class HttpData : public IHttpServer::IResponse::ICallback {
public:
HttpData(Buffer::Pointer d, MegaNz* p) : buffer_(d), mega_(p) {}
HttpData(Buffer::Pointer d, MegaNz* p,
std::shared_ptr<Request<EitherError<void>>> r)
: buffer_(d), mega_(p), request_(r) {}
~HttpData() { mega_->removeStreamRequest(request_); }
int putData(char* buf, size_t max) override {
std::unique_lock<std::mutex> lock(buffer_->mutex_);
if (buffer_->done_) return -1;
if (buffer_->done_) return Abort;
if (buffer_->data_.empty()) {
buffer_->suspend();
return 0;
buffer_->suspended_ = true;
return Suspend;
}
size_t cnt = std::min(buffer_->data_.size(), max);
for (size_t i = 0; i < cnt; i++) {
......@@ -292,24 +285,25 @@ class HttpDataCallback : public IDownloadFileCallback {
MegaNz::HttpServerCallback::HttpServerCallback(MegaNz* p) : provider_(p) {}
IHttpServer::IResponse::Pointer MegaNz::HttpServerCallback::receivedConnection(
const IHttpServer& server, IHttpServer::IConnection::Pointer connection) {
IHttpServer::IResponse::Pointer MegaNz::HttpServerCallback::handle(
const IHttpServer::IRequest& request) {
{
std::lock_guard<std::mutex> lock(provider_->mutex_);
if (provider_->deleted_)
return server.createResponse(IHttpRequest::Bad, {}, "");
return util::response_from_string(request, IHttpRequest::Bad, {}, "");
}
const char* state = connection->getParameter("state");
const char* state = request.get("state");
if (!state || state != provider_->auth()->state())
return server.createResponse(IHttpRequest::Forbidden, {},
"state parameter missing / invalid");
return util::response_from_string(request, IHttpRequest::Forbidden, {},
"state parameter missing / invalid");
if (!provider_->authorized_)
return server.createResponse(IHttpRequest::ServiceUnavailable, {},
"not authorized just yet");
const char* file = connection->getParameter("file");
return util::response_from_string(request, IHttpRequest::ServiceUnavailable,
{}, "not authorized just yet");
const char* file = request.get("file");
std::unique_ptr<mega::MegaNode> node(provider_->mega()->getNodeByPath(file));
if (!node)
return server.createResponse(IHttpRequest::NotFound, {}, "file not found");
return util::response_from_string(request, IHttpRequest::NotFound, {},
"file not found");
int code = IHttpRequest::Ok;
auto extension =
static_cast<Item*>(provider_->toItem(node.get()).get())->extension();
......@@ -319,35 +313,36 @@ IHttpServer::IResponse::Pointer MegaNz::HttpServerCallback::receivedConnection(
{"Content-Disposition",
"inline; filename=\"" + std::string(node->getName()) + "\""}};
util::range range = {0, node->getSize()};
if (const char* range_str = connection->header("Range")) {
if (const char* range_str = request.header("Range")) {
range = util::parse_range(range_str);
if (range.size == -1) range.size = node->getSize() - range.start;
if (range.start + range.size > node->getSize() || range.start == -1 ||
range.size < 0)
return server.createResponse(IHttpRequest::RangeInvalid, {},
"invalid range");
return util::response_from_string(request, IHttpRequest::RangeInvalid, {},
"invalid range");
std::stringstream stream;
stream << "bytes " << range.start << "-" << range.start + range.size - 1
<< "/" << node->getSize();
headers["Content-Range"] = stream.str();
code = IHttpRequest::Partial;
}
auto buffer = std::make_shared<Buffer>(connection);
auto data = util::make_unique<HttpData>(buffer, provider_);
auto request = std::make_shared<Request<EitherError<void>>>(
auto buffer = std::make_shared<Buffer>();
auto download_request = std::make_shared<Request<EitherError<void>>>(
std::weak_ptr<CloudProvider>(provider_->shared_from_this()));
data->request_ = request;
provider_->addStreamRequest(request);
request->set(provider_->downloadResolver(
download_request->set(provider_->downloadResolver(
provider_->toItem(node.get()),
util::make_unique<HttpDataCallback>(buffer), range.start, range.size));
request->run();
connection->onCompleted([buffer]() {
provider_->addStreamRequest(download_request);
auto data = util::make_unique<HttpData>(buffer, provider_, download_request);
auto response =
request.response(code, headers, range.size, std::move(data));
buffer->response_ = response.get();
response->completed([buffer]() {
std::unique_lock<std::mutex> lock(buffer->mutex_);
buffer->connection_ = nullptr;
buffer->response_ = nullptr;
});
return server.createResponse(code, headers, range.size, BUFFER_SIZE,
std::move(data));
download_request->run();
return std::move(response);
}
MegaNz::MegaNz()
......
......@@ -46,8 +46,8 @@ class MegaNz : public CloudProvider {
class HttpServerCallback : public IHttpServer::ICallback {
public:
HttpServerCallback(MegaNz*);
IHttpServer::IResponse::Pointer receivedConnection(
const IHttpServer&, IHttpServer::IConnection::Pointer) override;
IHttpServer::IResponse::Pointer handle(
const IHttpServer::IRequest&) override;
private:
MegaNz* provider_;
......
......@@ -36,40 +36,43 @@ class IHttpServer {
enum class Type { Authorization, FileProvider };
class IConnection;
virtual ~IHttpServer() = default;
class IResponse {
public:
using Pointer = std::unique_ptr<IResponse>;
using Headers = std::unordered_map<std::string, std::string>;
using CompletedCallback = std::function<void()>;
class ICallback {
public:
using Pointer = std::unique_ptr<ICallback>;
static constexpr int Suspend = 0;
static constexpr int Abort = -1;
virtual ~ICallback() = default;
virtual int putData(char* buffer, size_t size) = 0;
};
virtual ~IResponse() = default;
virtual void resume() = 0;
virtual void completed(CompletedCallback) = 0;
};
class IConnection {
class IRequest {
public:
using Pointer = IConnection*;
using CompletedCallback = std::function<void()>;
virtual ~IRequest() = default;
virtual ~IConnection() = default;
virtual const char* getParameter(const std::string& name) const = 0;
virtual const char* get(const std::string& name) const = 0;
virtual const char* header(const std::string& name) const = 0;
virtual std::string url() const = 0;
virtual void onCompleted(CompletedCallback) = 0;
virtual void suspend() = 0;
virtual void resume() = 0;
virtual IResponse::Pointer response(
int code, const IResponse::Headers&, int size,
IResponse::ICallback::Pointer) const = 0;
};
class ICallback {
......@@ -78,18 +81,10 @@ class IHttpServer {
virtual ~ICallback() = default;
virtual IResponse::Pointer receivedConnection(const IHttpServer&,
IConnection::Pointer) = 0;
virtual IResponse::Pointer handle(const IRequest&) = 0;
};
virtual ICallback::Pointer callback() const = 0;
virtual IResponse::Pointer createResponse(int code, const IResponse::Headers&,
const std::string& body) const = 0;
virtual IResponse::Pointer createResponse(
int code, const IResponse::Headers&, int size, int chunk_size,
IResponse::ICallback::Pointer) const = 0;
};
class IHttpServerFactory {
......
......@@ -65,31 +65,27 @@ void AuthorizeRequest::sendCancel() {
server_cancelled_ = true;
lock.unlock();
if (auth_server) {
class Connection : public IHttpServer::IConnection {
class Request : public IHttpServer::IRequest {
public:
Connection(const std::string& state) : state_(state) {}
~Connection() {
if (f_) f_();
}
const char* getParameter(const std::string& name) const {
Request(const std::string& state) : state_(state) {}
const char* get(const std::string& name) const override {
if (name == "error") return "cancelled";
if (name == "accepted") return "false";
if (name == "state") return state_.c_str();
return nullptr;
}
const char* header(const std::string&) const { return nullptr; }
std::string url() const { return "/"; }
void onCompleted(CompletedCallback f) { f_ = f; }
void suspend() {}
void resume() {}
const char* header(const std::string&) const override { return nullptr; }
std::string url() const override { return "/"; }
IHttpServer::IResponse::Pointer response(
int, const IHttpServer::IResponse::Headers&, int,
IHttpServer::IResponse::ICallback::Pointer) const override {
return nullptr;
}
private:
std::string state_;
CompletedCallback f_;
};
auto connection = std::make_shared<Connection>(state_);
auth_server->callback()->receivedConnection(*auth_server, connection.get());
auth_server->callback()->handle(Request(state_));
}
}
......
......@@ -93,17 +93,17 @@ const std::string DEFAULT_ERROR_PAGE =
namespace cloudstorage {
IHttpServer::IResponse::Pointer Auth::HttpServerCallback::receivedConnection(
const IHttpServer& server, IHttpServer::IConnection::Pointer connection) {
const char* state = connection->getParameter(data_.state_parameter_name_);
IHttpServer::IResponse::Pointer Auth::HttpServerCallback::handle(
const IHttpServer::IRequest& request) {
const char* state = request.get(data_.state_parameter_name_);
if (!state || state != data_.state_)
return server.createResponse(
IHttpRequest::Unauthorized, {},
return util::response_from_string(
request, IHttpRequest::Unauthorized, {},
data_.error_page_.empty() ? DEFAULT_ERROR_PAGE : data_.error_page_);
const char* accepted = connection->getParameter("accepted");
const char* code = connection->getParameter(data_.code_parameter_name_);
const char* error = connection->getParameter(data_.error_parameter_name_);
const char* accepted = request.get("accepted");
const char* code = request.get(data_.code_parameter_name_);
const char* error = request.get(data_.error_parameter_name_);
if (accepted) {
std::unique_lock<std::mutex> lock(lock_);
auto callback = std::move(data_.callback_);
......@@ -121,23 +121,23 @@ IHttpServer::IResponse::Pointer Auth::HttpServerCallback::receivedConnection(
}
if (code)
return server.createResponse(IHttpRequest::Ok, {},
data_.success_page_.empty()
? DEFAULT_SUCCESS_PAGE
: data_.success_page_);
return util::response_from_string(request, IHttpRequest::Ok, {},
data_.success_page_.empty()
? DEFAULT_SUCCESS_PAGE
: data_.success_page_);
if (error)
return server.createResponse(
IHttpRequest::Unauthorized, {},
return util::response_from_string(
request, IHttpRequest::Unauthorized, {},
data_.error_page_.empty() ? DEFAULT_ERROR_PAGE : data_.error_page_);
if (connection->url() == data_.redirect_uri_path_ + "/login")
return server.createResponse(
IHttpRequest::Ok, {},
if (request.url() == data_.redirect_uri_path_ + "/login")
return util::response_from_string(
request, IHttpRequest::Ok, {},
data_.login_page_.empty() ? DEFAULT_LOGIN_PAGE : data_.login_page_);
return server.createResponse(
IHttpRequest::NotFound, {},
return util::response_from_string(
request, IHttpRequest::NotFound, {},
data_.error_page_.empty() ? DEFAULT_ERROR_PAGE : data_.error_page_);
}
......
......@@ -36,8 +36,8 @@ class Auth : public IAuth {
class HttpServerCallback : public IHttpServer::ICallback {
public:
IHttpServer::IResponse::Pointer receivedConnection(
const IHttpServer&, IHttpServer::IConnection::Pointer) override;
IHttpServer::IResponse::Pointer handle(
const IHttpServer::IRequest&) override;
struct HttpServerData {
std::string code_parameter_name_;
......
......@@ -27,6 +27,7 @@
namespace cloudstorage {
const int CHUNK_SIZE = 1024;
const int AUTHORIZATION_PORT = 12345;
const int FILE_PROVIDER_PORT = 12346;
......@@ -37,86 +38,74 @@ int http_request_callback(void* cls, MHD_Connection* c, const char* url,
const char* /*upload_data*/,
size_t* /*upload_data_size*/, void** con_cls) {
MicroHttpdServer* server = static_cast<MicroHttpdServer*>(cls);
auto connection = util::make_unique<MicroHttpdServer::Connection>(c, url);
auto response =
server->callback()->receivedConnection(*server, connection.get());
auto response = server->callback()->handle(MicroHttpdServer::Request(c, url));
auto p = static_cast<MicroHttpdServer::Response*>(response.get());
int ret = MHD_queue_response(c, p->code(), p->response());
*con_cls = connection.release();
*con_cls = response.release();
return ret;
}
void http_request_completed(void*, MHD_Connection*, void** con_cls,
MHD_RequestTerminationCode) {
auto p = static_cast<MicroHttpdServer::Connection*>(*con_cls);
auto p = static_cast<MicroHttpdServer::Response*>(*con_cls);
if (p->callback()) p->callback()();
delete p;
}
} // namespace
MicroHttpdServer::Response::Response(int code,
MicroHttpdServer::Response::Response(MHD_Connection* connection, int code,
const IResponse::Headers& headers,
const std::string& body)
: response_(MHD_create_response_from_buffer(
body.length(), (void*)body.c_str(), MHD_RESPMEM_MUST_COPY)),
code_(code) {
for (auto it : headers)
MHD_add_response_header(response_, it.first.c_str(), it.second.c_str());
}
MicroHttpdServer::Response::~Response() {
if (response_) MHD_destroy_response(response_);
}
int size,
IResponse::ICallback::Pointer callback)
: connection_(connection), code_(code) {
using DataType = std::pair<MHD_Connection*, IResponse::ICallback::Pointer>;
MicroHttpdServer::CallbackResponse::CallbackResponse(
int code, const IResponse::Headers& headers, int size, int chunk_size,
IResponse::ICallback::Pointer callback) {
code_ = code;
auto data_provider = [](void* cls, uint64_t, char* buf,
size_t max) -> ssize_t {
auto callback = static_cast<IResponse::ICallback*>(cls);
auto r = callback->putData(buf, max);
return r;
auto data = static_cast<DataType*>(cls);
auto r = data->second->putData(buf, max);
if (r == IResponse::ICallback::Suspend) {
MHD_suspend_connection(data->first);
return 0;
} else if (r == IResponse::ICallback::Abort)
return MHD_CONTENT_READER_END_WITH_ERROR;
else
return r;
};
auto release_data = [](void* cls) {
auto callback = static_cast<IResponse::ICallback*>(cls);
delete callback;
auto data = static_cast<DataType*>(cls);
delete data;
};
response_ = MHD_create_response_from_callback(
size, chunk_size, data_provider, callback.release(), release_data);
auto data = util::make_unique<DataType>(connection, std::move(callback));
response_ = MHD_create_response_from_callback(size, CHUNK_SIZE, data_provider,
data.release(), release_data);
for (auto it : headers)
MHD_add_response_header(response_, it.first.c_str(), it.second.c_str());
}
MicroHttpdServer::Connection::Connection(MHD_Connection* c, const char* url)
MicroHttpdServer::Response::~Response() {
if (response_) MHD_destroy_response(response_);
}
void MicroHttpdServer::Response::resume() {
MHD_resume_connection(connection_);
}
MicroHttpdServer::Request::Request(MHD_Connection* c, const char* url)
: connection_(c), url_(url) {}
const char* MicroHttpdServer::Connection::getParameter(
const std::string& name) const {
const char* MicroHttpdServer::Request::get(const std::string& name) const {
return MHD_lookup_connection_value(connection_, MHD_GET_ARGUMENT_KIND,
name.c_str());
}
const char* MicroHttpdServer::Connection::header(
const std::string& name) const {
const char* MicroHttpdServer::Request::header(const std::string& name) const {
return MHD_lookup_connection_value(connection_, MHD_HEADER_KIND,
name.c_str());
}
std::string MicroHttpdServer::Connection::url() const { return url_; }
void MicroHttpdServer::Connection::onCompleted(CompletedCallback f) {
callback_ = f;
}
void MicroHttpdServer::Connection::suspend() {
MHD_suspend_connection(connection_);
}
void MicroHttpdServer::Connection::resume() {
MHD_resume_connection(connection_);
}
std::string MicroHttpdServer::Request::url() const { return url_; }
MicroHttpdServer::MicroHttpdServer(IHttpServer::ICallback::Pointer cb, int port)
: http_server_(MHD_start_daemon(
......@@ -127,17 +116,11 @@ MicroHttpdServer::MicroHttpdServer(IHttpServer::ICallback::Pointer cb, int port)
MicroHttpdServer::~MicroHttpdServer() { MHD_stop_daemon(http_server_); }
MicroHttpdServer::IResponse::Pointer MicroHttpdServer::createResponse(
int code, const IResponse::Headers& headers,
const std::string& body) const {
return util::make_unique<Response>(code, headers, body);
}
MicroHttpdServer::IResponse::Pointer MicroHttpdServer::createResponse(
int code, const IResponse::Headers& headers, int size, int chunk_size,
MicroHttpdServer::IResponse::Pointer MicroHttpdServer::Request::response(
int code, const IResponse::Headers& headers, int size,
IResponse::ICallback::Pointer cb) const {
return util::make_unique<CallbackResponse>(code, headers, size, chunk_size,
std::move(cb));
return util::make_unique<Response>(connection_, code, headers, size,
std::move(cb));
}
IHttpServer::Pointer MicroHttpdServerFactory::create(
......
......@@ -37,50 +37,42 @@ class MicroHttpdServer : public IHttpServer {
class Response : public IResponse {
public:
Response() = default;
Response(int code, const IResponse::Headers&, const std::string& body);
Response(MHD_Connection* connection, int code, const IResponse::Headers&,
int size, IResponse::ICallback::Pointer);
~Response();
MHD_Response* response() const { return response_; }
int code() const { return code_; }
CompletedCallback callback() const { return callback_; }
void resume() override;
void completed(CompletedCallback f) override { callback_ = f; }
protected:
MHD_Connection* connection_;
MHD_Response* response_;
int code_;
CompletedCallback callback_;
};
class CallbackResponse : public Response {
public:
CallbackResponse(int code, const IResponse::Headers&, int size,
int chunk_size, IResponse::ICallback::Pointer);
};
class Connection : public IConnection {
class Request : public IRequest {
public:
Connection(MHD_Connection*, const char* url);
Request(MHD_Connection*, const char* url);
MHD_Connection* connection() const { return connection_; }
CompletedCallback callback() const { return callback_; }
const char* getParameter(const std::string& name) const override;
const char* get(const std::string& name) const override;
const char* header(const std::string&) const override;
std::string url() const override;
void onCompleted(CompletedCallback) override;
void suspend() override;
void resume() override;
IResponse::Pointer response(int code, const IResponse::Headers&, int size,
IResponse::ICallback::Pointer) const override;
private:
MHD_Connection* connection_;
std::string url_;
CompletedCallback callback_;
};
IResponse::Pointer createResponse(int code, const IResponse::Headers&,
const std::string& body) const override;
IResponse::Pointer createResponse(
int code, const IResponse::Headers&, int size, int chunk_size,
IResponse::ICallback::Pointer) const override;
ICallback::Pointer callback() const override { return callback_; }
private:
......
......@@ -31,6 +31,8 @@
#include <sstream>
#include <unordered_map>
const uint32_t CHUNK_SIZE = 1024;
namespace cloudstorage {
const std::unordered_map<std::string, std::string> MIME_TYPE = {
......@@ -155,5 +157,28 @@ std::string Url::escape(const std::string& value) {
std::string Url::escapeHeader(const std::string& header) {
return Json::valueToQuotedString(header.c_str());
}
IHttpServer::IResponse::Pointer response_from_string(
const IHttpServer::IRequest& request, int code,
const IHttpServer::IResponse::Headers& headers, const std::string& data) {
class DataProvider : public IHttpServer::IResponse::ICallback {
public:
DataProvider(const std::string& data) : position_(), data_(data) {}
int putData(char* buffer, size_t max) override {
int cnt = std::min(data_.length() - position_, max);
memcpy(buffer, (data_.begin() + position_).base(), cnt);
position_ += cnt;