Skip to content

Commit

Permalink
fix channel interface (#356)
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored Jul 7, 2023
1 parent b9fbd31 commit 659b08a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 19 deletions.
6 changes: 3 additions & 3 deletions include/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class channel {
return send_request(op, config_.pool_config.client_config);
}

static channel create(const ::std::vector<::std::string>& hosts,
static channel create(const std::vector<std::string_view>& hosts,
const channel_config& config = {},
client_pools_t& client_pools =
g_clients_pool<client_t, io_context_pool_t>()) {
Expand All @@ -107,8 +107,8 @@ class channel {
}

private:
void init(const std::vector<std::string> hosts, const channel_config& config,
client_pools_t& client_pools) {
void init(const std::vector<std::string_view>& hosts,
const channel_config& config, client_pools_t& client_pools) {
config_ = config;
client_pools_.reserve(hosts.size());
for (auto& host : hosts) {
Expand Down
41 changes: 30 additions & 11 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
#include <mutex>
#include <random>
#include <shared_mutex>
#include <string_view>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <ylt/util/expected.hpp>

#include "detail/client_queue.hpp"
#include "coro_io.hpp"
#include "detail/client_queue.hpp"
#include "io_context_pool.hpp"
namespace coro_io {

Expand Down Expand Up @@ -191,13 +192,13 @@ class client_pool : public std::enable_shared_from_this<

public:
static std::shared_ptr<client_pool> create(
const std::string& host_name, const pool_config& pool_config = {},
std::string_view host_name, const pool_config& pool_config = {},
io_context_pool_t& io_context_pool = coro_io::g_io_context_pool()) {
return std::make_shared<client_pool>(private_construct_token{}, host_name,
pool_config, io_context_pool);
}

client_pool(private_construct_token t, const std::string& host_name,
client_pool(private_construct_token t, std::string_view host_name,
const pool_config& pool_config,
io_context_pool_t& io_context_pool)
: host_name_(host_name),
Expand All @@ -210,7 +211,7 @@ class client_pool : public std::enable_shared_from_this<
};

client_pool(private_construct_token t, client_pools_t* pools_manager_,
const std::string& host_name, const pool_config& pool_config,
std::string_view host_name, const pool_config& pool_config,
io_context_pool_t& io_context_pool)
: pools_manager_(pools_manager_),
host_name_(host_name),
Expand Down Expand Up @@ -307,39 +308,45 @@ class client_pools {
const typename client_pool_t::pool_config& pool_config = {},
io_context_pool_t& io_context_pool = coro_io::g_io_context_pool())
: io_context_pool_(io_context_pool), default_pool_config_(pool_config) {}
auto send_request(const std::string& host_name, auto&& op)
auto send_request(std::string_view host_name, auto&& op)
-> decltype(std::declval<client_pool_t>().send_request(op)) {
auto pool = get_client_pool(host_name, default_pool_config_);
auto ret = co_await pool->send_request(op);
co_return ret;
}
auto send_request(const std::string& host_name,
auto send_request(std::string_view host_name,
const typename client_pool_t::pool_config& pool_config,
auto&& op)
-> decltype(std::declval<client_pool_t>().send_request(op)) {
auto pool = get_client_pool(host_name, pool_config);
auto ret = co_await pool.send_request(op);
co_return ret;
}
auto at(const std::string& host_name) {
auto at(std::string_view host_name) {
return get_client_pool(host_name, default_pool_config_);
}
auto at(const std::string& host_name,
auto at(std::string_view host_name,
const typename client_pool_t::pool_config& pool_config) {
return get_client_pool(host_name, pool_config);
}
auto operator[](const std::string& host_name) { return at(host_name); }
auto operator[](std::string_view host_name) { return at(host_name); }
auto get_io_context_pool() { return io_context_pool_; }

private:
std::shared_ptr<client_pool_t> get_client_pool(
const std::string& host_name,
std::string_view host_name,
const typename client_pool_t::pool_config& pool_config) {
decltype(client_pool_manager_.end()) iter;
bool has_inserted;
{
#ifdef __cpp_lib_generic_unordered_lookup
std::shared_lock shared_lock{mutex_};
iter = client_pool_manager_.find(host_name);
#else
std::string host_name_copy = std::string{host_name};
std::shared_lock shared_lock{mutex_};
iter = client_pool_manager_.find(host_name_copy);
#endif
if (iter == client_pool_manager_.end()) {
shared_lock.unlock();
std::lock_guard lock{mutex_};
Expand All @@ -354,8 +361,20 @@ class client_pools {
return iter->second;
}
}
struct string_hash {
using hash_type = std::hash<std::string_view>;
using is_transparent = void;

std::size_t operator()(std::string_view str) const {
return hash_type{}(str);
}
std::size_t operator()(std::string const& str) const {
return hash_type{}(str);
}
};
typename client_pool_t::pool_config default_pool_config_{};
std::unordered_map<std::string, std::shared_ptr<client_pool_t>>
std::unordered_map<std::string, std::shared_ptr<client_pool_t>, string_hash,
std::equal_to<>>
client_pool_manager_;
io_context_pool_t& io_context_pool_;
std::shared_mutex mutex_;
Expand Down
4 changes: 2 additions & 2 deletions src/coro_http/examples/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ Lazy<void> qps_watcher() {
}

int main() {
std::vector<std::string> hosts;
hosts.emplace_back("http://www.baidu.com", "http://baidu.com");
std::vector<std::string_view> hosts{"http://baidu.com",
"http://www.baidu.com"};
auto chan = coro_io::channel<coro_http_client>::create(
hosts, coro_io::channel<coro_http_client>::channel_config{
.pool_config{.max_connection_ = 1000}});
Expand Down
2 changes: 1 addition & 1 deletion src/coro_rpc/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/tests/coro_rpc)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/examples/coro_rpc)
add_subdirectory(base_examples)
add_subdirectory(file_transfer)

Expand Down
4 changes: 2 additions & 2 deletions src/coro_rpc/examples/base_examples/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ Lazy<void> qps_watcher() {
}

int main() {
auto hosts = std::vector<std::string>{
{std::string{"127.0.0.1:8801"}, std::string{"localhost:8801"}}};
auto hosts =
std::vector<std::string_view>{{"127.0.0.1:8801", "localhost:8801"}};
auto worker_cnt = std::thread::hardware_concurrency() * 20;
auto chan = coro_io::channel<coro_rpc_client>::create(
hosts, coro_io::channel<coro_rpc_client>::channel_config{
Expand Down

0 comments on commit 659b08a

Please sign in to comment.