From 9a1e00a25ef5c78eaa4bfd0d889009a59c260ce5 Mon Sep 17 00:00:00 2001 From: Xie Han <63350856@qq.com> Date: Wed, 17 Sep 2025 21:46:00 +0800 Subject: [PATCH 1/2] Add WFHttpChunkedClient. --- CMakeLists_Headers.txt | 3 + src/client/CMakeLists.txt | 1 + src/client/WFHttpChunkedClient.cc | 74 ++++++++++ src/client/WFHttpChunkedClient.h | 179 +++++++++++++++++++++++ src/factory/HttpTaskImpl.inl | 41 ++++++ src/factory/WFTask.h | 8 ++ src/protocol/CMakeLists.txt | 1 + src/protocol/HttpMessage.cc | 230 ++++++++++++++++++++++++++++++ src/protocol/HttpMessage.h | 38 +++++ src/protocol/PackageWrapper.cc | 72 ++++++++++ src/protocol/PackageWrapper.h | 57 ++++++++ 11 files changed, 704 insertions(+) create mode 100644 src/client/WFHttpChunkedClient.cc create mode 100644 src/client/WFHttpChunkedClient.h create mode 100644 src/factory/HttpTaskImpl.inl create mode 100644 src/protocol/PackageWrapper.cc create mode 100644 src/protocol/PackageWrapper.h diff --git a/CMakeLists_Headers.txt b/CMakeLists_Headers.txt index dc60d8912b0..b4365ebff51 100644 --- a/CMakeLists_Headers.txt +++ b/CMakeLists_Headers.txt @@ -71,6 +71,7 @@ set(INCLUDE_HEADERS src/protocol/DnsUtil.h src/protocol/TLVMessage.h src/protocol/SSLWrapper.h + src/protocol/PackageWrapper.h src/server/WFServer.h src/server/WFHttpServer.h src/server/WFRedisServer.h @@ -78,6 +79,7 @@ set(INCLUDE_HEADERS src/server/WFDnsServer.h src/client/WFMySQLConnection.h src/client/WFDnsClient.h + src/client/WFHttpChunkedClient.h src/manager/DnsCache.h src/manager/WFGlobal.h src/manager/UpstreamManager.h @@ -106,6 +108,7 @@ set(INCLUDE_HEADERS src/factory/WFResourcePool.h src/factory/WFMessageQueue.h src/factory/WFHttpServerTask.h + src/factory/HttpTaskImpl.inl src/nameservice/WFNameService.h src/nameservice/WFDnsResolver.h src/nameservice/WFServiceGovernance.h diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 0e2ff9a2155..b7348b370d9 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -4,6 +4,7 @@ project(client) set(SRC WFMySQLConnection.cc WFDnsClient.cc + WFHttpChunkedClient.cc ) add_library(${PROJECT_NAME} OBJECT ${SRC}) diff --git a/src/client/WFHttpChunkedClient.cc b/src/client/WFHttpChunkedClient.cc new file mode 100644 index 00000000000..d1fb6864979 --- /dev/null +++ b/src/client/WFHttpChunkedClient.cc @@ -0,0 +1,74 @@ +/* + Copyright (c) 2025 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#include "HttpTaskImpl.inl" +#include "WFHttpChunkedClient.h" + +void WFHttpChunkedTask::task_extract(protocol::HttpMessageChunk *chunk, + WFHttpTask *task) +{ + auto *t = (WFHttpChunkedTask *)task->user_data; + + t->chunk = chunk; + if (t->extract) + { + if (chunk || t->extract_flag) + t->extract(t); + } +} + +void WFHttpChunkedTask::task_callback(WFHttpTask *task) +{ + auto *t = (WFHttpChunkedTask *)task->user_data; + + t->state = task->get_state(); + t->error = task->get_error(); + t->chunk = NULL; + if (t->callback) + t->callback(t); + + t->task = NULL; + delete t; +} + +WFHttpChunkedTask * +WFHttpChunkedClient::create_chunked_task(const std::string& url, + int redirect_max, + extract_t extract, + callback_t callback) +{ + WFHttpTask *task = __WFHttpTaskFactory::create_chunked_task(url, + redirect_max, + WFHttpChunkedTask::task_extract, + WFHttpChunkedTask::task_callback); + return new WFHttpChunkedTask(task, std::move(extract), std::move(callback)); +} + +WFHttpChunkedTask * +WFHttpChunkedClient::create_chunked_task(const ParsedURI& uri, + int redirect_max, + extract_t extract, + callback_t callback) +{ + WFHttpTask *task = __WFHttpTaskFactory::create_chunked_task(uri, + redirect_max, + WFHttpChunkedTask::task_extract, + WFHttpChunkedTask::task_callback); + return new WFHttpChunkedTask(task, std::move(extract), std::move(callback)); +} + diff --git a/src/client/WFHttpChunkedClient.h b/src/client/WFHttpChunkedClient.h new file mode 100644 index 00000000000..b4b60ac08ae --- /dev/null +++ b/src/client/WFHttpChunkedClient.h @@ -0,0 +1,179 @@ +/* + Copyright (c) 2025 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#ifndef _WFHTTPCHUNKEDCLIENT_H_ +#define _WFHTTPCHUNKEDCLIENT_H_ + +#include +#include +#include +#include "HttpMessage.h" +#include "WFTask.h" +#include "WFTaskFactory.h" + +class WFHttpChunkedTask : public WFGenericTask +{ +public: + protocol::HttpMessageChunk *get_chunk() + { + return this->chunk; + } + + const protocol::HttpMessageChunk *get_chunk() const + { + return this->chunk; + } + +public: + protocol::HttpRequest *get_req() + { + return this->task->get_req(); + } + + protocol::HttpResponse *get_resp() + { + return this->task->get_resp(); + } + + const protocol::HttpRequest *get_req() const + { + return this->task->get_req(); + } + + const protocol::HttpResponse *get_resp() const + { + return this->task->get_resp(); + } + +public: + void set_watch_timeout(int timeout) + { + this->task->set_watch_timeout(timeout); + } + + void set_recv_timeout(int timeout) + { + this->task->set_receive_timeout(timeout); + } + + void set_send_timeout(int timeout) + { + this->task->set_send_timeout(timeout); + } + + void set_keep_alive(int timeout) + { + this->task->set_keep_alive(timeout); + } + +public: + void set_ssl_ctx(SSL_CTX *ctx) + { + using HttpRequest = protocol::HttpRequest; + using HttpResponse = protocol::HttpResponse; + auto *t = (WFComplexClientTask *)this->task; + t->set_ssl_ctx(ctx); + } + + void extract_on_header(bool on) + { + this->extract_flag = on; + } + +public: + void set_extract(std::function ex) + { + this->extract = std::move(ex); + } + + void set_callback(std::function cb) + { + this->callback = std::move(cb); + } + +public: + const WFHttpTask *get_http_task() const + { + return this->task; + } + +protected: + virtual void dispatch() + { + series_of(this)->push_front(this->task); + this->subtask_done(); + } + + virtual SubTask *done() + { + return series_of(this)->pop(); + } + +protected: + static void task_extract(protocol::HttpMessageChunk *chunk, + WFHttpTask *task); + static void task_callback(WFHttpTask *task); + +protected: + WFHttpTask *task; + protocol::HttpMessageChunk *chunk; + bool extract_flag; + std::function extract; + std::function callback; + +protected: + WFHttpChunkedTask(WFHttpTask *task, + std::function&& ex, + std::function&& cb) : + extract(std::move(ex)), + callback(std::move(cb)) + { + task->user_data = this; + this->task = task; + this->extract_flag = false; + } + + virtual ~WFHttpChunkedTask() + { + if (this->task) + this->task->dismiss(); + } + + friend class WFHttpChunkedClient; +}; + +class WFHttpChunkedClient +{ +public: + using extract_t = std::function; + using callback_t = std::function; + +public: + static WFHttpChunkedTask *create_chunked_task(const std::string& url, + int redirect_max, + extract_t extract, + callback_t callback); + + static WFHttpChunkedTask *create_chunked_task(const ParsedURI& uri, + int redirect_max, + extract_t extract, + callback_t callback); +}; + +#endif + diff --git a/src/factory/HttpTaskImpl.inl b/src/factory/HttpTaskImpl.inl new file mode 100644 index 00000000000..2f899161d36 --- /dev/null +++ b/src/factory/HttpTaskImpl.inl @@ -0,0 +1,41 @@ +/* + Copyright (c) 2025 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Authors: Xie Han (xiehan@sogou-inc.com) +*/ + +#include "HttpMessage.h" +#include "WFTaskFactory.h" + +// Internal, for WFHttpChunkedTask only. + +class __WFHttpTaskFactory +{ +private: + using extract_t = std::function; + +public: + static WFHttpTask *create_chunked_task(const std::string& url, + int redirect_max, + extract_t extract, + http_callback_t callback); + + static WFHttpTask *create_chunked_task(const ParsedURI& uri, + int redirect_max, + extract_t extract, + http_callback_t callback); +}; + diff --git a/src/factory/WFTask.h b/src/factory/WFTask.h index 94bdf12d864..d3645460738 100644 --- a/src/factory/WFTask.h +++ b/src/factory/WFTask.h @@ -68,6 +68,9 @@ class WFThreadTask : public ExecRequest INPUT *get_input() { return &this->input; } OUTPUT *get_output() { return &this->output; } + const INPUT *get_input() const { return &this->input; } + const OUTPUT *get_output() const { return &this->output; } + public: void *user_data; @@ -134,6 +137,9 @@ class WFNetworkTask : public CommRequest REQ *get_req() { return &this->req; } RESP *get_resp() { return &this->resp; } + const REQ *get_req() const { return &this->req; } + const RESP *get_resp() const { return &this->resp; } + public: void *user_data; @@ -325,6 +331,8 @@ class WFFileTask : public IORequest public: ARGS *get_args() { return &this->args; } + const ARGS *get_args() const { return &this->args; } + long get_retval() const { if (this->state == WFT_STATE_SUCCESS) diff --git a/src/protocol/CMakeLists.txt b/src/protocol/CMakeLists.txt index bdbfb03400e..b7044a1dcc7 100644 --- a/src/protocol/CMakeLists.txt +++ b/src/protocol/CMakeLists.txt @@ -18,6 +18,7 @@ set(SRC TLVMessage.cc HttpUtil.cc SSLWrapper.cc + PackageWrapper.cc ) add_library(${PROJECT_NAME} OBJECT ${SRC}) diff --git a/src/protocol/HttpMessage.cc b/src/protocol/HttpMessage.cc index 1030380f93b..5cbc16f7de0 100644 --- a/src/protocol/HttpMessage.cc +++ b/src/protocol/HttpMessage.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include #include "HttpMessage.h" @@ -398,5 +399,234 @@ int HttpResponse::append(const void *buf, size_t *size) return ret; } +bool HttpMessageChunk::get_chunk_data(const void **data, size_t *size) const +{ + if (this->chunk_data && this->nreceived == this->chunk_size + 2) + { + *data = this->chunk_data; + *size = this->chunk_size; + return true; + } + else + return false; +} + +bool HttpMessageChunk::move_chunk_data(void **data, size_t *size) +{ + if (this->chunk_data && this->nreceived == this->chunk_size + 2) + { + *data = this->chunk_data; + *size = this->chunk_size; + this->chunk_data = NULL; + this->nreceived = 0; + return true; + } + else + return false; +} + +bool HttpMessageChunk::set_chunk_data(const void *data, size_t size) +{ + char *p = (char *)malloc(size + 3); + + if (p) + { + memcpy(p, data, size); + p[size] = '\r'; + p[size + 1] = '\n'; + p[size + 2] = '\0'; + + free(this->chunk_data); + this->chunk_data = p; + this->chunk_size = size; + this->nreceived = size + 2; + return true; + } + else + return false; +} + +int HttpMessageChunk::encode(struct iovec vectors[], int max) +{ + int len = sprintf(this->chunk_line, "%zx\r\n", this->chunk_size); + + vectors[0].iov_base = this->chunk_line; + vectors[0].iov_len = len; + vectors[1].iov_base = this->chunk_data; + vectors[1].iov_len = this->chunk_size + 2; + + return 2; +} + +#define MIN(x, y) ((x) <= (y) ? (x) : (y)) + +int HttpMessageChunk::append_chunk_line(const void *buf, size_t size) +{ + char *end; + size_t i; + + size = MIN(size, sizeof this->chunk_line - this->nreceived); + memcpy(this->chunk_line + this->nreceived, buf, size); + for (i = 0; i + 1 < this->nreceived + size; i++) + { + if (this->chunk_line[i] == '\r') + { + if (this->chunk_line[i + 1] != '\n') + { + errno = EBADMSG; + return -1; + } + + this->chunk_line[i] = '\0'; + this->chunk_size = strtoul(this->chunk_line, &end, 16); + if (end == this->chunk_line) + { + errno = EBADMSG; + return -1; + } + + if (this->chunk_size > 64 * 1024 * 1024 || + this->chunk_size > this->size_limit) + { + errno = EMSGSIZE; + return -1; + } + + this->chunk_data = malloc(this->chunk_size + 3); + if (!this->chunk_data) + return -1; + + this->nreceived = i + 2; + return 1; + } + } + + if (i == sizeof this->chunk_line - 1) + { + errno = EBADMSG; + return -1; + } + + this->nreceived += size; + return 0; +} + +int HttpMessageChunk::append(const void *buf, size_t *size) +{ + size_t nleft; + size_t n; + int ret; + + if (!this->chunk_data) + { + n = this->nreceived; + ret = this->append_chunk_line(buf, *size); + if (ret <= 0) + return ret; + + n = this->nreceived - n; + this->nreceived = 0; + } + else + n = 0; + + if (this->chunk_size != 0) + { + nleft = this->chunk_size + 2 - this->nreceived; + if (*size - n > nleft) + *size = n + nleft; + + buf = (const char *)buf + n; + n = *size - n; + memcpy((char *)this->chunk_data + this->nreceived, buf, n); + this->nreceived += n; + if (this->nreceived == this->chunk_size + 2) + { + ((char *)this->chunk_data)[this->nreceived] = '\0'; + return 1; + } + } + else + { + while (n < *size) + { + char c = ((const char *)buf)[n]; + + if (this->nreceived == 0) + { + if (c == '\r') + this->nreceived = 1; + else + this->nreceived = (size_t)-2; + } + else if (this->nreceived == 1) + { + if (c == '\n') + { + *size = n + 1; + this->nreceived = 2; + ((char *)this->chunk_data)[0] = '\r'; + ((char *)this->chunk_data)[1] = '\n'; + ((char *)this->chunk_data)[2] = '\0'; + return 1; + } + else + break; + } + else if (this->nreceived == (size_t)-2) + { + if (c == '\r') + this->nreceived = (size_t)-1; + } + else /* if (this->nreceived == (size_t)-1) */ + { + if (c == '\n') + this->nreceived = 0; + else + break; + } + + n++; + } + + if (n < *size) + { + errno = EBADMSG; + return -1; + } + } + + return 0; +} + +HttpMessageChunk::HttpMessageChunk(HttpMessageChunk&& msg) : + ProtocolMessage(std::move(msg)) +{ + memcpy(this->chunk_line, msg.chunk_line, sizeof this->chunk_line); + this->chunk_data = msg.chunk_data; + msg.chunk_data = NULL; + this->chunk_size = msg.chunk_size; + this->nreceived = msg.nreceived; + msg.nreceived = 0; +} + +HttpMessageChunk& HttpMessageChunk::operator = (HttpMessageChunk&& msg) +{ + if (&msg != this) + { + *(ProtocolMessage *)this = std::move(msg); + + memcpy(this->chunk_line, msg.chunk_line, sizeof this->chunk_line); + free(this->chunk_data); + this->chunk_data = msg.chunk_data; + msg.chunk_data = NULL; + this->chunk_size = msg.chunk_size; + this->nreceived = msg.nreceived; + msg.nreceived = 0; + } + + return *this; +} + } diff --git a/src/protocol/HttpMessage.h b/src/protocol/HttpMessage.h index a2589f83935..52465d831ab 100644 --- a/src/protocol/HttpMessage.h +++ b/src/protocol/HttpMessage.h @@ -19,6 +19,7 @@ #ifndef _HTTPMESSAGE_H_ #define _HTTPMESSAGE_H_ +#include #include #include #include @@ -404,6 +405,43 @@ class HttpResponse : public HttpMessage HttpResponse& operator = (HttpResponse&& resp) = default; }; +class HttpMessageChunk : public ProtocolMessage +{ +public: + bool get_chunk_data(const void **chunk_data, size_t *size) const; + bool move_chunk_data(void **chunk_data, size_t *size); + bool set_chunk_data(const void *chunk_data, size_t size); + +protected: + virtual int encode(struct iovec vectors[], int max); + virtual int append(const void *buf, size_t *size); + +private: + int append_chunk_line(const void *buf, size_t size); + +private: + char chunk_line[32]; + void *chunk_data; + size_t chunk_size; + size_t nreceived; + +public: + HttpMessageChunk() + { + this->chunk_data = NULL; + this->nreceived = 0; + } + + virtual ~HttpMessageChunk() + { + free(this->chunk_data); + } + +public: + HttpMessageChunk(HttpMessageChunk&& msg); + HttpMessageChunk& operator = (HttpMessageChunk&& msg); +}; + } #endif diff --git a/src/protocol/PackageWrapper.cc b/src/protocol/PackageWrapper.cc new file mode 100644 index 00000000000..a8fae118136 --- /dev/null +++ b/src/protocol/PackageWrapper.cc @@ -0,0 +1,72 @@ +/* + Copyright (c) 2022 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#include +#include "PackageWrapper.h" + +namespace protocol +{ + +int PackageWrapper::encode(struct iovec vectors[], int max) +{ + int cnt = 0; + int ret; + + while (max >= 8) + { + ret = this->ProtocolWrapper::encode(vectors, max); + if ((unsigned int)ret > (unsigned int)max) + { + if (ret < 0) + return ret; + + break; + } + + cnt += ret; + this->set_message(this->next_out(this->message)); + if (!this->message) + return cnt; + + vectors += ret; + max -= ret; + } + + errno = EOVERFLOW; + return -1; +} + +int PackageWrapper::append(const void *buf, size_t *size) +{ + int ret = this->ProtocolWrapper::append(buf, size); + + if (ret > 0) + { + this->set_message(this->next_in(this->message)); + if (this->message) + { + this->renew(); + ret = 0; + } + } + + return ret; +} + +} + diff --git a/src/protocol/PackageWrapper.h b/src/protocol/PackageWrapper.h new file mode 100644 index 00000000000..add78768529 --- /dev/null +++ b/src/protocol/PackageWrapper.h @@ -0,0 +1,57 @@ +/* + Copyright (c) 2022 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#ifndef _PACKAGEWRAPPER_H_ +#define _PACKAGEWRAPPER_H_ + +#include "ProtocolMessage.h" + +namespace protocol +{ + +class PackageWrapper : public ProtocolWrapper +{ +private: + virtual ProtocolMessage *next_out(ProtocolMessage *message) + { + return NULL; + } + + virtual ProtocolMessage *next_in(ProtocolMessage *message) + { + return NULL; + } + +protected: + virtual int encode(struct iovec vectors[], int max); + virtual int append(const void *buf, size_t *size); + +public: + PackageWrapper(ProtocolMessage *message) : ProtocolWrapper(message) + { + } + +public: + PackageWrapper(PackageWrapper&& wrapper) = default; + PackageWrapper& operator = (PackageWrapper&& wrapper) = default; +}; + +} + +#endif + From 2cfc09ead41b6c8ced7d896809d3f53f0c90a015 Mon Sep 17 00:00:00 2001 From: Xie Han <63350856@qq.com> Date: Wed, 17 Sep 2025 22:19:31 +0800 Subject: [PATCH 2/2] Add CommMessageIn::renew(). --- src/kernel_win/Communicator.cc | 7 +++++++ src/kernel_win/Communicator.h | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/src/kernel_win/Communicator.cc b/src/kernel_win/Communicator.cc index 6fa895e500d..bfb5c9753dd 100644 --- a/src/kernel_win/Communicator.cc +++ b/src/kernel_win/Communicator.cc @@ -180,6 +180,13 @@ static int __ssl_connect(SSL_CTX *ssl_ctx, CommConnEntry *entry) return -1; } +void CommMessageIn::renew() +{ + CommSession *session = this->entry->session; + session->timeout = -1; + session->begin_time = -1; +} + int CommTarget::init(const struct sockaddr *addr, socklen_t addrlen, int connect_timeout, int response_timeout) { diff --git a/src/kernel_win/Communicator.h b/src/kernel_win/Communicator.h index e3cc1a88e85..b1287cc6639 100644 --- a/src/kernel_win/Communicator.h +++ b/src/kernel_win/Communicator.h @@ -118,6 +118,9 @@ class CommMessageIn /* Send small packet while receiving. Call only in append(). */ virtual int feedback(const void *buf, size_t size); + /* In append(), reset the begin time of receiving to current time. */ + virtual void renew(); + private: struct CommConnEntry *entry; @@ -164,6 +167,7 @@ class CommSession public: CommSession() { this->passive = 0; } virtual ~CommSession(); + friend class CommMessageIn; friend class Communicator; };