Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setting chunk callback to http request #2252

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions examples/client_example/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ int nth_resp = 0;

int main()
{
#if 1
trantor::Logger::setLogLevel(trantor::Logger::kTrace);
{
auto client = HttpClient::newHttpClient("http://www.baidu.com");
Expand Down Expand Up @@ -75,6 +76,28 @@ int main()
std::cout << "requestsBufferSize:" << client->requestsBufferSize()
<< std::endl;
}
#else
{
auto client = HttpClient::newHttpClient("http://127.0.0.1:8848/stream");
auto req = HttpRequest::newHttpRequest();
req->setMethod(drogon::Get);
req->setPath("/stream");
req->setChunkCallback([](ResponseChunk ch) {
std::cout << "recv chunk:" << ch.data() << std::endl;
});
client->sendRequest(
req, [](ReqResult result, const HttpResponsePtr &response) {
if (result != ReqResult::Ok)
{
std::cout
<< "error while sending request to server! result: "
<< result << std::endl;
return;
}
std::cout << "recv chunk done" << std::endl;
});
}
#endif
app().run();
}
34 changes: 34 additions & 0 deletions lib/inc/drogon/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <trantor/net/InetAddress.h>
#include <trantor/net/Certificate.h>
#include <trantor/utils/Date.h>
#include <cstddef>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
Expand All @@ -37,6 +39,32 @@ namespace drogon
class HttpRequest;
using HttpRequestPtr = std::shared_ptr<HttpRequest>;

class DROGON_EXPORT ResponseChunk
{
public:
ResponseChunk() = default;

ResponseChunk(const char *s, std::size_t count)
{
data_.append(s, count);
}

~ResponseChunk() = default;

ResponseChunk(const ResponseChunk &other) = default;
ResponseChunk(ResponseChunk &&other) noexcept = default;
ResponseChunk &operator=(const ResponseChunk &other) noexcept = default;
ResponseChunk &operator=(ResponseChunk &&other) noexcept = default;

auto &data()
{
return data_;
}

private:
std::string data_;
};

/**
* @brief This template is used to convert a request object to a custom
* type object. Users must specialize the template for a particular type.
Expand Down Expand Up @@ -510,6 +538,12 @@ class DROGON_EXPORT HttpRequest
virtual const std::weak_ptr<trantor::TcpConnection> &getConnectionPtr()
const noexcept = 0;

virtual void setChunkCallback(
std::function<void(ResponseChunk)>) noexcept = 0;

virtual const std::function<void(ResponseChunk)> &getChunkCallback()
const noexcept = 0;

virtual ~HttpRequest()
{
}
Expand Down
3 changes: 2 additions & 1 deletion lib/src/HttpClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,8 @@ void HttpClientImpl::onRecvMessage(const trantor::TcpConnectionPtr &connPtr,
{
responseParser->setForHeadMethod();
}
if (!responseParser->parseResponse(msg))
if (!responseParser->parseResponse(msg,
firstReq.first->getChunkCallback()))
{
onError(ReqResult::BadResponse);
bytesReceived_ += (msgSize - msg->readableBytes());
Expand Down
13 changes: 13 additions & 0 deletions lib/src/HttpRequestImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,18 @@ class HttpRequestImpl : public HttpRequest
return connPtr_;
}

void setChunkCallback(
std::function<void(ResponseChunk)> cb) noexcept override
{
chunkCb_ = std::move(cb);
}

const std::function<void(ResponseChunk)> &getChunkCallback()
const noexcept override
{
return chunkCb_;
}

bool isOnSecureConnection() const noexcept override
{
return isOnSecureConnection_;
Expand Down Expand Up @@ -731,6 +743,7 @@ class HttpRequestImpl : public HttpRequest
std::exception_ptr streamExceptionPtr_;
bool startProcessing_{false};
std::weak_ptr<trantor::TcpConnection> connPtr_;
std::function<void(ResponseChunk)> chunkCb_;

protected:
std::string content_;
Expand Down
10 changes: 9 additions & 1 deletion lib/src/HttpResponseParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <trantor/utils/Logger.h>
#include <trantor/utils/MsgBuffer.h>
#include <algorithm>
#include <string>

using namespace trantor;
using namespace drogon;
Expand Down Expand Up @@ -84,7 +85,9 @@ bool HttpResponseParser::parseResponseOnClose()
}

// return false if any error
bool HttpResponseParser::parseResponse(MsgBuffer *buf)
bool HttpResponseParser::parseResponse(
MsgBuffer *buf,
const std::function<void(ResponseChunk)> &chunkCallback)
{
bool ok = true;
bool hasMore = true;
Expand Down Expand Up @@ -277,6 +280,11 @@ bool HttpResponseParser::parseResponse(MsgBuffer *buf)
responsePtr_->bodyPtr_ =
std::make_shared<HttpMessageStringBody>();
}
if (chunkCallback)
{
chunkCallback(
ResponseChunk(buf->peek(), currentChunkLength_));
}
responsePtr_->bodyPtr_->append(buf->peek(),
currentChunkLength_);
buf->retrieve(currentChunkLength_ + 2);
Expand Down
7 changes: 6 additions & 1 deletion lib/src/HttpResponseParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
#pragma once

#include "impl_forwards.h"
#include <drogon/HttpRequest.h>
#include <trantor/utils/NonCopyable.h>
#include <trantor/net/TcpConnection.h>
#include <trantor/utils/MsgBuffer.h>
#include <functional>
#include <list>
#include <mutex>

Expand All @@ -43,7 +45,10 @@ class HttpResponseParser : public trantor::NonCopyable
// default copy-ctor, dtor and assignment are fine

// return false if any error
bool parseResponse(trantor::MsgBuffer *buf);
bool parseResponse(
trantor::MsgBuffer *buf,
const std::function<void(ResponseChunk)> &chunkCallback = nullptr);

bool parseResponseOnClose();

bool gotAll() const
Expand Down
Loading