Skip to content

Commit

Permalink
load balance for complement queue
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <[email protected]>
  • Loading branch information
bufferflies committed Dec 11, 2023
1 parent 815f12f commit 0444b25
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
16 changes: 12 additions & 4 deletions src/core/lib/iomgr/tcp_server_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,22 @@ static void on_read(void* arg, grpc_error_handle err) {
addr_uri->c_str());
}

// addr_str format: ipv4/ipv6:ipv6:port
std::size_t start = addr_str.find_first_of(":") + 1;
std::size_t end = addr_str.find(":", start);
std::string ip = addr_str.substr(start, end - start);

std::string name = absl::StrCat("tcp-server-connection:", addr_uri.value());
grpc_fd* fdobj = grpc_fd_create(fd, name.c_str(), true);

read_notifier_pollset = (*(sp->server->pollsets))
[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1)) %
sp->server->pollsets->size()];
std::size_t start_idx = static_cast<size_t>(rand()) % sp->server->pollsets->size();
if (!gpr_atm_no_barrier_cas(&sp->server->next_pollset_to_assign_ids[ip],0,start_idx)){
start_idx=static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign_ids[ip], 1)) %
sp->server->pollsets->size();
}

read_notifier_pollset = (*(sp->server->pollsets))[start_idx];
grpc_pollset_add_fd(read_notifier_pollset, fdobj);

// Create acceptor.
Expand Down
5 changes: 3 additions & 2 deletions src/core/lib/iomgr/tcp_server_utils_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include <map>

// one listening port
typedef struct grpc_tcp_listener {
Expand Down Expand Up @@ -98,8 +99,8 @@ struct grpc_tcp_server {
// owned by this struct
const std::vector<grpc_pollset*>* pollsets = nullptr;

// next pollset to assign a channel to
gpr_atm next_pollset_to_assign = 0;
// next pollset to assign a channel to, it is a map from pollset name to ip address.
std::map<std::string, gpr_atm> next_pollset_to_assign_ids;

// Contains config extracted from channel args for this server
grpc_core::PosixTcpOptions options;
Expand Down

0 comments on commit 0444b25

Please sign in to comment.