Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists_Headers.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ set(INCLUDE_HEADERS
src/protocol/DnsUtil.h
src/protocol/TLVMessage.h
src/protocol/SSLWrapper.h
src/protocol/PackageWrapper.h
src/server/WFServer.h
src/server/WFHttpServer.h
src/server/WFRedisServer.h
src/server/WFMySQLServer.h
src/server/WFDnsServer.h
src/client/WFMySQLConnection.h
src/client/WFDnsClient.h
src/client/WFHttpChunkedClient.h
src/manager/DnsCache.h
src/manager/WFGlobal.h
src/manager/UpstreamManager.h
Expand Down Expand Up @@ -106,6 +108,7 @@ set(INCLUDE_HEADERS
src/factory/WFResourcePool.h
src/factory/WFMessageQueue.h
src/factory/WFHttpServerTask.h
src/factory/HttpTaskImpl.inl
src/nameservice/WFNameService.h
src/nameservice/WFDnsResolver.h
src/nameservice/WFServiceGovernance.h
Expand Down
1 change: 1 addition & 0 deletions src/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ project(client)
set(SRC
WFMySQLConnection.cc
WFDnsClient.cc
WFHttpChunkedClient.cc
)

add_library(${PROJECT_NAME} OBJECT ${SRC})
Expand Down
74 changes: 74 additions & 0 deletions src/client/WFHttpChunkedClient.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright (c) 2025 Sogou, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Author: Xie Han ([email protected])
*/

#include "HttpTaskImpl.inl"
#include "WFHttpChunkedClient.h"

void WFHttpChunkedTask::task_extract(protocol::HttpMessageChunk *chunk,
WFHttpTask *task)
{
auto *t = (WFHttpChunkedTask *)task->user_data;

t->chunk = chunk;
if (t->extract)
{
if (chunk || t->extract_flag)
t->extract(t);
}
}

void WFHttpChunkedTask::task_callback(WFHttpTask *task)
{
auto *t = (WFHttpChunkedTask *)task->user_data;

t->state = task->get_state();
t->error = task->get_error();
t->chunk = NULL;
if (t->callback)
t->callback(t);

t->task = NULL;
delete t;
}

WFHttpChunkedTask *
WFHttpChunkedClient::create_chunked_task(const std::string& url,
int redirect_max,
extract_t extract,
callback_t callback)
{
WFHttpTask *task = __WFHttpTaskFactory::create_chunked_task(url,
redirect_max,
WFHttpChunkedTask::task_extract,
WFHttpChunkedTask::task_callback);
return new WFHttpChunkedTask(task, std::move(extract), std::move(callback));
}

WFHttpChunkedTask *
WFHttpChunkedClient::create_chunked_task(const ParsedURI& uri,
int redirect_max,
extract_t extract,
callback_t callback)
{
WFHttpTask *task = __WFHttpTaskFactory::create_chunked_task(uri,
redirect_max,
WFHttpChunkedTask::task_extract,
WFHttpChunkedTask::task_callback);
return new WFHttpChunkedTask(task, std::move(extract), std::move(callback));
}

179 changes: 179 additions & 0 deletions src/client/WFHttpChunkedClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
Copyright (c) 2025 Sogou, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Author: Xie Han ([email protected])
*/

#ifndef _WFHTTPCHUNKEDCLIENT_H_
#define _WFHTTPCHUNKEDCLIENT_H_

#include <utility>
#include <functional>
#include <openssl/ssl.h>
#include "HttpMessage.h"
#include "WFTask.h"
#include "WFTaskFactory.h"

class WFHttpChunkedTask : public WFGenericTask
{
public:
protocol::HttpMessageChunk *get_chunk()
{
return this->chunk;
}

const protocol::HttpMessageChunk *get_chunk() const
{
return this->chunk;
}

public:
protocol::HttpRequest *get_req()
{
return this->task->get_req();
}

protocol::HttpResponse *get_resp()
{
return this->task->get_resp();
}

const protocol::HttpRequest *get_req() const
{
return this->task->get_req();
}

const protocol::HttpResponse *get_resp() const
{
return this->task->get_resp();
}

public:
void set_watch_timeout(int timeout)
{
this->task->set_watch_timeout(timeout);
}

void set_recv_timeout(int timeout)
{
this->task->set_receive_timeout(timeout);
}

void set_send_timeout(int timeout)
{
this->task->set_send_timeout(timeout);
}

void set_keep_alive(int timeout)
{
this->task->set_keep_alive(timeout);
}

public:
void set_ssl_ctx(SSL_CTX *ctx)
{
using HttpRequest = protocol::HttpRequest;
using HttpResponse = protocol::HttpResponse;
auto *t = (WFComplexClientTask<HttpRequest, HttpResponse> *)this->task;
t->set_ssl_ctx(ctx);
}

void extract_on_header(bool on)
{
this->extract_flag = on;
}

public:
void set_extract(std::function<void (WFHttpChunkedTask *)> ex)
{
this->extract = std::move(ex);
}

void set_callback(std::function<void (WFHttpChunkedTask *)> cb)
{
this->callback = std::move(cb);
}

public:
const WFHttpTask *get_http_task() const
{
return this->task;
}

protected:
virtual void dispatch()
{
series_of(this)->push_front(this->task);
this->subtask_done();
}

virtual SubTask *done()
{
return series_of(this)->pop();
}

protected:
static void task_extract(protocol::HttpMessageChunk *chunk,
WFHttpTask *task);
static void task_callback(WFHttpTask *task);

protected:
WFHttpTask *task;
protocol::HttpMessageChunk *chunk;
bool extract_flag;
std::function<void (WFHttpChunkedTask *)> extract;
std::function<void (WFHttpChunkedTask *)> callback;

protected:
WFHttpChunkedTask(WFHttpTask *task,
std::function<void (WFHttpChunkedTask *)>&& ex,
std::function<void (WFHttpChunkedTask *)>&& cb) :
extract(std::move(ex)),
callback(std::move(cb))
{
task->user_data = this;
this->task = task;
this->extract_flag = false;
}

virtual ~WFHttpChunkedTask()
{
if (this->task)
this->task->dismiss();
}

friend class WFHttpChunkedClient;
};

class WFHttpChunkedClient
{
public:
using extract_t = std::function<void (WFHttpChunkedTask *)>;
using callback_t = std::function<void (WFHttpChunkedTask *)>;

public:
static WFHttpChunkedTask *create_chunked_task(const std::string& url,
int redirect_max,
extract_t extract,
callback_t callback);

static WFHttpChunkedTask *create_chunked_task(const ParsedURI& uri,
int redirect_max,
extract_t extract,
callback_t callback);
};

#endif

41 changes: 41 additions & 0 deletions src/factory/HttpTaskImpl.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright (c) 2025 Sogou, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Authors: Xie Han ([email protected])
*/

#include "HttpMessage.h"
#include "WFTaskFactory.h"

// Internal, for WFHttpChunkedTask only.

class __WFHttpTaskFactory
{
private:
using extract_t = std::function<void (protocol::HttpMessageChunk *,
WFHttpTask *)>;

public:
static WFHttpTask *create_chunked_task(const std::string& url,
int redirect_max,
extract_t extract,
http_callback_t callback);

static WFHttpTask *create_chunked_task(const ParsedURI& uri,
int redirect_max,
extract_t extract,
http_callback_t callback);
};

8 changes: 8 additions & 0 deletions src/factory/WFTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class WFThreadTask : public ExecRequest
INPUT *get_input() { return &this->input; }
OUTPUT *get_output() { return &this->output; }

const INPUT *get_input() const { return &this->input; }
const OUTPUT *get_output() const { return &this->output; }

public:
void *user_data;

Expand Down Expand Up @@ -134,6 +137,9 @@ class WFNetworkTask : public CommRequest
REQ *get_req() { return &this->req; }
RESP *get_resp() { return &this->resp; }

const REQ *get_req() const { return &this->req; }
const RESP *get_resp() const { return &this->resp; }

public:
void *user_data;

Expand Down Expand Up @@ -325,6 +331,8 @@ class WFFileTask : public IORequest
public:
ARGS *get_args() { return &this->args; }

const ARGS *get_args() const { return &this->args; }

long get_retval() const
{
if (this->state == WFT_STATE_SUCCESS)
Expand Down
7 changes: 7 additions & 0 deletions src/kernel_win/Communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ static int __ssl_connect(SSL_CTX *ssl_ctx, CommConnEntry *entry)
return -1;
}

void CommMessageIn::renew()
{
CommSession *session = this->entry->session;
session->timeout = -1;
session->begin_time = -1;
}

int CommTarget::init(const struct sockaddr *addr, socklen_t addrlen,
int connect_timeout, int response_timeout)
{
Expand Down
4 changes: 4 additions & 0 deletions src/kernel_win/Communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class CommMessageIn
/* Send small packet while receiving. Call only in append(). */
virtual int feedback(const void *buf, size_t size);

/* In append(), reset the begin time of receiving to current time. */
virtual void renew();

private:
struct CommConnEntry *entry;

Expand Down Expand Up @@ -164,6 +167,7 @@ class CommSession
public:
CommSession() { this->passive = 0; }
virtual ~CommSession();
friend class CommMessageIn;
friend class Communicator;
};

Expand Down
1 change: 1 addition & 0 deletions src/protocol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(SRC
TLVMessage.cc
HttpUtil.cc
SSLWrapper.cc
PackageWrapper.cc
)

add_library(${PROJECT_NAME} OBJECT ${SRC})
Expand Down
Loading
Loading