Skip to content

Commit

Permalink
[coro_io] rename channel to load_blancer, rename coro_channel to chan…
Browse files Browse the repository at this point in the history
…nel (alibaba#732)

* rename channel to load_blancer

* rename coro_channel to channel

* fix bazel
  • Loading branch information
poor-circle authored Aug 5, 2024
1 parent b42761e commit 999bb72
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 112 deletions.
4 changes: 2 additions & 2 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ cc_binary(
)

cc_binary(
name = "coro_http_channel",
srcs = ["src/coro_http/examples/channel.cpp"],
name = "coro_http_load_blancer",
srcs = ["src/coro_http/examples/load_blancer.cpp"],
copts = YA_BIN_COPT,
includes = [
"include",
Expand Down
4 changes: 2 additions & 2 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ template <typename client_t, typename io_context_pool_t>
class client_pools;

template <typename, typename>
class channel;
class load_blancer;

template <typename client_t,
typename io_context_pool_t = coro_io::io_context_pool>
Expand Down Expand Up @@ -405,7 +405,7 @@ class client_pool : public std::enable_shared_from_this<
friend class client_pools;

template <typename, typename>
friend class channel;
friend class load_blancer;

template <typename T>
async_simple::coro::Lazy<return_type_with_host<T>> send_request(
Expand Down
11 changes: 5 additions & 6 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,11 @@ post(Func func,
}

template <typename R>
struct coro_channel
: public asio::experimental::channel<void(std::error_code, R)> {
struct channel : public asio::experimental::channel<void(std::error_code, R)> {
using return_type = R;
using ValueType = std::pair<std::error_code, R>;
using asio::experimental::channel<void(std::error_code, R)>::channel;
coro_channel(coro_io::ExecutorWrapper<> *executor, size_t capacity)
channel(coro_io::ExecutorWrapper<> *executor, size_t capacity)
: executor_(executor),
asio::experimental::channel<void(std::error_code, R)>(
executor->get_asio_executor(), capacity) {}
Expand All @@ -380,17 +379,17 @@ struct coro_channel
};

template <typename R>
inline coro_channel<R> create_channel(
inline channel<R> create_load_blancer(
size_t capacity,
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor()) {
return coro_channel<R>(executor, capacity);
return channel<R>(executor, capacity);
}

template <typename R>
inline auto create_shared_channel(
size_t capacity,
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor()) {
return std::make_shared<coro_channel<R>>(executor, capacity);
return std::make_shared<channel<R>>(executor, capacity);
}

template <typename T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,33 @@
#include "io_context_pool.hpp"
namespace coro_io {

enum class load_blance_algorithm {
enum class load_blancer_algorithm {
RR = 0, // round-robin
WRR, // weight round-robin
random,
};

template <typename client_t, typename io_context_pool_t = io_context_pool>
class channel {
class load_blancer {
using client_pool_t = client_pool<client_t, io_context_pool_t>;
using client_pools_t = client_pools<client_t, io_context_pool_t>;

public:
struct channel_config {
struct load_blancer_config {
typename client_pool_t::pool_config pool_config;
load_blance_algorithm lba = load_blance_algorithm::RR;
~channel_config(){};
load_blancer_algorithm lba = load_blancer_algorithm::RR;
~load_blancer_config(){};
};

private:
struct RRLoadBlancer {
std::unique_ptr<std::atomic<uint32_t>> index =
std::make_unique<std::atomic<uint32_t>>();
async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
const load_blancer& load_blancer) {
auto i = index->fetch_add(1, std::memory_order_relaxed);
co_return channel.client_pools_[i % channel.client_pools_.size()];
co_return load_blancer
.client_pools_[i % load_blancer.client_pools_.size()];
}
};

Expand Down Expand Up @@ -84,14 +85,15 @@ class channel {
}

async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
const load_blancer& load_blancer) {
int selected = select_host_with_weight_round_robin();
if (selected == -1) {
selected = 0;
}

wrr_current_ = selected;
co_return channel.client_pools_[selected % channel.client_pools_.size()];
co_return load_blancer
.client_pools_[selected % load_blancer.client_pools_.size()];
}

private:
Expand Down Expand Up @@ -138,27 +140,27 @@ class channel {

struct RandomLoadBlancer {
async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
const load_blancer& load_blancer) {
static thread_local std::default_random_engine e(std::time(nullptr));
std::uniform_int_distribution rnd{std::size_t{0},
channel.client_pools_.size() - 1};
co_return channel.client_pools_[rnd(e)];
load_blancer.client_pools_.size() - 1};
co_return load_blancer.client_pools_[rnd(e)];
}
};
channel() = default;
load_blancer() = default;

public:
channel(channel&& o)
load_blancer(load_blancer&& o)
: config_(std::move(o.config_)),
lb_worker(std::move(o.lb_worker)),
client_pools_(std::move(o.client_pools_)){};
channel& operator=(channel&& o) {
load_blancer& operator=(load_blancer&& o) {
this->config_ = std::move(o.config_);
this->lb_worker = std::move(o.lb_worker);
this->client_pools_ = std::move(o.client_pools_);
}
channel(const channel& o) = delete;
channel& operator=(const channel& o) = delete;
load_blancer(const load_blancer& o) = delete;
load_blancer& operator=(const load_blancer& o) = delete;

auto send_request(auto op, typename client_t::config& config)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op),
Expand All @@ -185,37 +187,38 @@ class channel {
return send_request(std::move(op), config_.pool_config.client_config);
}

static channel create(const std::vector<std::string_view>& hosts,
const channel_config& config = {},
const std::vector<int>& weights = {},
client_pools_t& client_pools =
g_clients_pool<client_t, io_context_pool_t>()) {
channel ch;
static load_blancer create(
const std::vector<std::string_view>& hosts,
const load_blancer_config& config = {},
const std::vector<int>& weights = {},
client_pools_t& client_pools =
g_clients_pool<client_t, io_context_pool_t>()) {
load_blancer ch;
ch.init(hosts, config, weights, client_pools);
return ch;
}

/**
* @brief return the channel's hosts size.
* @brief return the load_blancer's hosts size.
*
* @return std::size_t
*/
std::size_t size() const noexcept { return client_pools_.size(); }

private:
void init(const std::vector<std::string_view>& hosts,
const channel_config& config, const std::vector<int>& weights,
const load_blancer_config& config, const std::vector<int>& weights,
client_pools_t& client_pools) {
config_ = config;
client_pools_.reserve(hosts.size());
for (auto& host : hosts) {
client_pools_.emplace_back(client_pools.at(host, config.pool_config));
}
switch (config_.lba) {
case load_blance_algorithm::RR:
case load_blancer_algorithm::RR:
lb_worker = RRLoadBlancer{};
break;
case load_blance_algorithm::WRR: {
case load_blancer_algorithm::WRR: {
if (hosts.empty() || weights.empty()) {
throw std::invalid_argument("host/weight list is empty!");
}
Expand All @@ -224,13 +227,13 @@ class channel {
}
lb_worker = WRRLoadBlancer(weights);
} break;
case load_blance_algorithm::random:
case load_blancer_algorithm::random:
default:
lb_worker = RandomLoadBlancer{};
}
return;
}
channel_config config_;
load_blancer_config config_;
std::variant<RRLoadBlancer, WRRLoadBlancer, RandomLoadBlancer> lb_worker;
std::vector<std::shared_ptr<client_pool_t>> client_pools_;
};
Expand Down
34 changes: 18 additions & 16 deletions include/ylt/standalone/cinatra/coro_http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
#include "cinatra/mime_types.hpp"
#include "cinatra_log_wrapper.hpp"
#include "coro_http_connection.hpp"
#include "ylt/coro_io/channel.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_io/load_blancer.hpp"
#include "ylt/metric/system_metric.hpp"

namespace cinatra {
Expand Down Expand Up @@ -208,22 +208,23 @@ class coro_http_server {
template <http_method... method, typename... Aspects>
void set_http_proxy_handler(std::string url_path,
std::vector<std::string_view> hosts,
coro_io::load_blance_algorithm type =
coro_io::load_blance_algorithm::random,
coro_io::load_blancer_algorithm type =
coro_io::load_blancer_algorithm::random,
std::vector<int> weights = {},
Aspects &&...aspects) {
if (hosts.empty()) {
throw std::invalid_argument("not config hosts yet!");
}

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(hosts, {.lba = type},
weights));
auto load_blancer =
std::make_shared<coro_io::load_blancer<coro_http_client>>(
coro_io::load_blancer<coro_http_client>::create(
hosts, {.lba = type}, weights));
auto handler =
[this, channel, type](
[this, load_blancer, type](
coro_http_request &req,
coro_http_response &response) -> async_simple::coro::Lazy<void> {
co_await channel->send_request(
co_await load_blancer->send_request(
[this, &req, &response](
coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
Expand All @@ -247,22 +248,23 @@ class coro_http_server {
template <http_method... method, typename... Aspects>
void set_websocket_proxy_handler(std::string url_path,
std::vector<std::string_view> hosts,
coro_io::load_blance_algorithm type =
coro_io::load_blance_algorithm::random,
coro_io::load_blancer_algorithm type =
coro_io::load_blancer_algorithm::random,
std::vector<int> weights = {},
Aspects &&...aspects) {
if (hosts.empty()) {
throw std::invalid_argument("not config hosts yet!");
}

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(hosts, {.lba = type},
weights));
auto load_blancer =
std::make_shared<coro_io::load_blancer<coro_http_client>>(
coro_io::load_blancer<coro_http_client>::create(
hosts, {.lba = type}, weights));

set_http_handler<cinatra::GET>(
url_path,
[channel](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
[load_blancer](coro_http_request &req, coro_http_response &resp)
-> async_simple::coro::Lazy<void> {
websocket_result result{};
while (true) {
result = co_await req.get_conn()->read_websocket();
Expand All @@ -275,7 +277,7 @@ class coro_http_server {
break;
}

co_await channel->send_request(
co_await load_blancer->send_request(
[&req, result](
coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
Expand Down
4 changes: 2 additions & 2 deletions src/coro_http/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ else()
endif()

add_executable(coro_http_example example.cpp)
add_executable(coro_http_channel channel.cpp)
add_executable(coro_http_load_blancer load_blancer.cpp)
add_executable(coro_chat_room chat_room.cpp)

if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_SYSTEM_NAME MATCHES "Windows") # mingw-w64
target_link_libraries(coro_http_example wsock32 ws2_32)
target_link_libraries(coro_http_channel wsock32 ws2_32)
target_link_libraries(coro_http_load_blancer wsock32 ws2_32)
target_link_libraries(coro_chat_room wsock32 ws2_32)
endif()
11 changes: 6 additions & 5 deletions src/coro_http/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "ylt/coro_http/coro_http_client.hpp"
#include "ylt/coro_http/coro_http_server.hpp"
#include "ylt/coro_io/coro_io.hpp"

using namespace std::chrono_literals;
using namespace coro_http;
Expand Down Expand Up @@ -569,12 +570,12 @@ void http_proxy() {
coro_http_server proxy_wrr(2, 8090);
proxy_wrr.set_http_proxy_handler<GET, POST>(
"/", {"127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"},
coro_io::load_blance_algorithm::WRR, {10, 5, 5});
coro_io::load_blancer_algorithm::WRR, {10, 5, 5});

coro_http_server proxy_rr(2, 8091);
proxy_rr.set_http_proxy_handler<GET, POST>(
"/", {"127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"},
coro_io::load_blance_algorithm::RR);
coro_io::load_blancer_algorithm::RR);

coro_http_server proxy_random(2, 8092);
proxy_random.set_http_proxy_handler<GET, POST>(
Expand Down Expand Up @@ -628,8 +629,8 @@ void http_proxy() {
assert(!resp_random.resp_body.empty());
}

void coro_channel() {
auto ch = coro_io::create_channel<int>(10000);
void coro_load_blancer() {
auto ch = coro_io::create_load_blancer<int>(10000);
auto ec = async_simple::coro::syncAwait(coro_io::async_send(ch, 41));
assert(!ec);
ec = async_simple::coro::syncAwait(coro_io::async_send(ch, 42));
Expand Down Expand Up @@ -660,6 +661,6 @@ int main() {
test_gzip();
#endif
http_proxy();
coro_channel();
coro_load_blancer();
return 0;
}
Loading

0 comments on commit 999bb72

Please sign in to comment.