Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coro_io] rename channel to load_blancer, rename coro_channel to channel #732

Merged
merged 3 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ cc_binary(
)

cc_binary(
name = "coro_http_channel",
srcs = ["src/coro_http/examples/channel.cpp"],
name = "coro_http_load_blancer",
srcs = ["src/coro_http/examples/load_blancer.cpp"],
copts = YA_BIN_COPT,
includes = [
"include",
Expand Down
4 changes: 2 additions & 2 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ template <typename client_t, typename io_context_pool_t>
class client_pools;

template <typename, typename>
class channel;
class load_blancer;

template <typename client_t,
typename io_context_pool_t = coro_io::io_context_pool>
Expand Down Expand Up @@ -405,7 +405,7 @@ class client_pool : public std::enable_shared_from_this<
friend class client_pools;

template <typename, typename>
friend class channel;
friend class load_blancer;

template <typename T>
async_simple::coro::Lazy<return_type_with_host<T>> send_request(
Expand Down
11 changes: 5 additions & 6 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,11 @@ post(Func func,
}

template <typename R>
struct coro_channel
: public asio::experimental::channel<void(std::error_code, R)> {
struct channel : public asio::experimental::channel<void(std::error_code, R)> {
using return_type = R;
using ValueType = std::pair<std::error_code, R>;
using asio::experimental::channel<void(std::error_code, R)>::channel;
coro_channel(coro_io::ExecutorWrapper<> *executor, size_t capacity)
channel(coro_io::ExecutorWrapper<> *executor, size_t capacity)
: executor_(executor),
asio::experimental::channel<void(std::error_code, R)>(
executor->get_asio_executor(), capacity) {}
Expand All @@ -380,17 +379,17 @@ struct coro_channel
};

template <typename R>
inline coro_channel<R> create_channel(
inline channel<R> create_load_blancer(
size_t capacity,
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor()) {
return coro_channel<R>(executor, capacity);
return channel<R>(executor, capacity);
}

template <typename R>
inline auto create_shared_channel(
size_t capacity,
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor()) {
return std::make_shared<coro_channel<R>>(executor, capacity);
return std::make_shared<channel<R>>(executor, capacity);
}

template <typename T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,33 @@
#include "io_context_pool.hpp"
namespace coro_io {

enum class load_blance_algorithm {
enum class load_blancer_algorithm {
RR = 0, // round-robin
WRR, // weight round-robin
random,
};

template <typename client_t, typename io_context_pool_t = io_context_pool>
class channel {
class load_blancer {
using client_pool_t = client_pool<client_t, io_context_pool_t>;
using client_pools_t = client_pools<client_t, io_context_pool_t>;

public:
struct channel_config {
struct load_blancer_config {
typename client_pool_t::pool_config pool_config;
load_blance_algorithm lba = load_blance_algorithm::RR;
~channel_config(){};
load_blancer_algorithm lba = load_blancer_algorithm::RR;
~load_blancer_config(){};
};

private:
struct RRLoadBlancer {
std::unique_ptr<std::atomic<uint32_t>> index =
std::make_unique<std::atomic<uint32_t>>();
async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
const load_blancer& load_blancer) {
auto i = index->fetch_add(1, std::memory_order_relaxed);
co_return channel.client_pools_[i % channel.client_pools_.size()];
co_return load_blancer
.client_pools_[i % load_blancer.client_pools_.size()];
}
};

Expand Down Expand Up @@ -84,14 +85,15 @@ class channel {
}

async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
const load_blancer& load_blancer) {
int selected = select_host_with_weight_round_robin();
if (selected == -1) {
selected = 0;
}

wrr_current_ = selected;
co_return channel.client_pools_[selected % channel.client_pools_.size()];
co_return load_blancer
.client_pools_[selected % load_blancer.client_pools_.size()];
}

private:
Expand Down Expand Up @@ -138,27 +140,27 @@ class channel {

struct RandomLoadBlancer {
async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
const load_blancer& load_blancer) {
static thread_local std::default_random_engine e(std::time(nullptr));
std::uniform_int_distribution rnd{std::size_t{0},
channel.client_pools_.size() - 1};
co_return channel.client_pools_[rnd(e)];
load_blancer.client_pools_.size() - 1};
co_return load_blancer.client_pools_[rnd(e)];
}
};
channel() = default;
load_blancer() = default;

public:
channel(channel&& o)
load_blancer(load_blancer&& o)
: config_(std::move(o.config_)),
lb_worker(std::move(o.lb_worker)),
client_pools_(std::move(o.client_pools_)){};
channel& operator=(channel&& o) {
load_blancer& operator=(load_blancer&& o) {
this->config_ = std::move(o.config_);
this->lb_worker = std::move(o.lb_worker);
this->client_pools_ = std::move(o.client_pools_);
}
channel(const channel& o) = delete;
channel& operator=(const channel& o) = delete;
load_blancer(const load_blancer& o) = delete;
load_blancer& operator=(const load_blancer& o) = delete;

auto send_request(auto op, typename client_t::config& config)
-> decltype(std::declval<client_pool_t>().send_request(std::move(op),
Expand All @@ -185,37 +187,38 @@ class channel {
return send_request(std::move(op), config_.pool_config.client_config);
}

static channel create(const std::vector<std::string_view>& hosts,
const channel_config& config = {},
const std::vector<int>& weights = {},
client_pools_t& client_pools =
g_clients_pool<client_t, io_context_pool_t>()) {
channel ch;
static load_blancer create(
const std::vector<std::string_view>& hosts,
const load_blancer_config& config = {},
const std::vector<int>& weights = {},
client_pools_t& client_pools =
g_clients_pool<client_t, io_context_pool_t>()) {
load_blancer ch;
ch.init(hosts, config, weights, client_pools);
return ch;
}

/**
* @brief return the channel's hosts size.
* @brief return the load_blancer's hosts size.
*
* @return std::size_t
*/
std::size_t size() const noexcept { return client_pools_.size(); }

private:
void init(const std::vector<std::string_view>& hosts,
const channel_config& config, const std::vector<int>& weights,
const load_blancer_config& config, const std::vector<int>& weights,
client_pools_t& client_pools) {
config_ = config;
client_pools_.reserve(hosts.size());
for (auto& host : hosts) {
client_pools_.emplace_back(client_pools.at(host, config.pool_config));
}
switch (config_.lba) {
case load_blance_algorithm::RR:
case load_blancer_algorithm::RR:
lb_worker = RRLoadBlancer{};
break;
case load_blance_algorithm::WRR: {
case load_blancer_algorithm::WRR: {
if (hosts.empty() || weights.empty()) {
throw std::invalid_argument("host/weight list is empty!");
}
Expand All @@ -224,13 +227,13 @@ class channel {
}
lb_worker = WRRLoadBlancer(weights);
} break;
case load_blance_algorithm::random:
case load_blancer_algorithm::random:
default:
lb_worker = RandomLoadBlancer{};
}
return;
}
channel_config config_;
load_blancer_config config_;
std::variant<RRLoadBlancer, WRRLoadBlancer, RandomLoadBlancer> lb_worker;
std::vector<std::shared_ptr<client_pool_t>> client_pools_;
};
Expand Down
34 changes: 18 additions & 16 deletions include/ylt/standalone/cinatra/coro_http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
#include "cinatra/mime_types.hpp"
#include "cinatra_log_wrapper.hpp"
#include "coro_http_connection.hpp"
#include "ylt/coro_io/channel.hpp"
#include "ylt/coro_io/coro_file.hpp"
#include "ylt/coro_io/coro_io.hpp"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_io/load_blancer.hpp"
#include "ylt/metric/system_metric.hpp"

namespace cinatra {
Expand Down Expand Up @@ -208,22 +208,23 @@ class coro_http_server {
template <http_method... method, typename... Aspects>
void set_http_proxy_handler(std::string url_path,
std::vector<std::string_view> hosts,
coro_io::load_blance_algorithm type =
coro_io::load_blance_algorithm::random,
coro_io::load_blancer_algorithm type =
coro_io::load_blancer_algorithm::random,
std::vector<int> weights = {},
Aspects &&...aspects) {
if (hosts.empty()) {
throw std::invalid_argument("not config hosts yet!");
}

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(hosts, {.lba = type},
weights));
auto load_blancer =
std::make_shared<coro_io::load_blancer<coro_http_client>>(
coro_io::load_blancer<coro_http_client>::create(
hosts, {.lba = type}, weights));
auto handler =
[this, channel, type](
[this, load_blancer, type](
coro_http_request &req,
coro_http_response &response) -> async_simple::coro::Lazy<void> {
co_await channel->send_request(
co_await load_blancer->send_request(
[this, &req, &response](
coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
Expand All @@ -247,22 +248,23 @@ class coro_http_server {
template <http_method... method, typename... Aspects>
void set_websocket_proxy_handler(std::string url_path,
std::vector<std::string_view> hosts,
coro_io::load_blance_algorithm type =
coro_io::load_blance_algorithm::random,
coro_io::load_blancer_algorithm type =
coro_io::load_blancer_algorithm::random,
std::vector<int> weights = {},
Aspects &&...aspects) {
if (hosts.empty()) {
throw std::invalid_argument("not config hosts yet!");
}

auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(hosts, {.lba = type},
weights));
auto load_blancer =
std::make_shared<coro_io::load_blancer<coro_http_client>>(
coro_io::load_blancer<coro_http_client>::create(
hosts, {.lba = type}, weights));

set_http_handler<cinatra::GET>(
url_path,
[channel](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
[load_blancer](coro_http_request &req, coro_http_response &resp)
-> async_simple::coro::Lazy<void> {
websocket_result result{};
while (true) {
result = co_await req.get_conn()->read_websocket();
Expand All @@ -275,7 +277,7 @@ class coro_http_server {
break;
}

co_await channel->send_request(
co_await load_blancer->send_request(
[&req, result](
coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
Expand Down
4 changes: 2 additions & 2 deletions src/coro_http/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ else()
endif()

add_executable(coro_http_example example.cpp)
add_executable(coro_http_channel channel.cpp)
add_executable(coro_http_load_blancer load_blancer.cpp)
add_executable(coro_chat_room chat_room.cpp)

if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_SYSTEM_NAME MATCHES "Windows") # mingw-w64
target_link_libraries(coro_http_example wsock32 ws2_32)
target_link_libraries(coro_http_channel wsock32 ws2_32)
target_link_libraries(coro_http_load_blancer wsock32 ws2_32)
target_link_libraries(coro_chat_room wsock32 ws2_32)
endif()
11 changes: 6 additions & 5 deletions src/coro_http/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "ylt/coro_http/coro_http_client.hpp"
#include "ylt/coro_http/coro_http_server.hpp"
#include "ylt/coro_io/coro_io.hpp"

using namespace std::chrono_literals;
using namespace coro_http;
Expand Down Expand Up @@ -569,12 +570,12 @@ void http_proxy() {
coro_http_server proxy_wrr(2, 8090);
proxy_wrr.set_http_proxy_handler<GET, POST>(
"/", {"127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"},
coro_io::load_blance_algorithm::WRR, {10, 5, 5});
coro_io::load_blancer_algorithm::WRR, {10, 5, 5});

coro_http_server proxy_rr(2, 8091);
proxy_rr.set_http_proxy_handler<GET, POST>(
"/", {"127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"},
coro_io::load_blance_algorithm::RR);
coro_io::load_blancer_algorithm::RR);

coro_http_server proxy_random(2, 8092);
proxy_random.set_http_proxy_handler<GET, POST>(
Expand Down Expand Up @@ -628,8 +629,8 @@ void http_proxy() {
assert(!resp_random.resp_body.empty());
}

void coro_channel() {
auto ch = coro_io::create_channel<int>(10000);
void coro_load_blancer() {
auto ch = coro_io::create_load_blancer<int>(10000);
auto ec = async_simple::coro::syncAwait(coro_io::async_send(ch, 41));
assert(!ec);
ec = async_simple::coro::syncAwait(coro_io::async_send(ch, 42));
Expand Down Expand Up @@ -660,6 +661,6 @@ int main() {
test_gzip();
#endif
http_proxy();
coro_channel();
coro_load_blancer();
return 0;
}
Loading
Loading