Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cluster support #122

Closed
wants to merge 3 commits into from
Closed

add cluster support #122

wants to merge 3 commits into from

Conversation

zklgame
Copy link
Collaborator

@zklgame zklgame commented Mar 15, 2024

Why make this pull request?

Add cluster option for the Async service.

How to test this pull request?

go run cmd/server/main.go --config ./config/development-postgres-cluster.yaml

TODO in next MRs

  • Recovery for failed Async server.
    • when there is any member change, use delegates to call a new API in each async server to add/remove queues
  • remove all the defaultShard and store tasks of the same execution in the same shard.

@zklgame zklgame force-pushed the add_memberlist branch 4 times, most recently from a418028 to d637ea9 Compare March 15, 2024 16:52
Copy link

codecov bot commented Mar 15, 2024

Codecov Report

Attention: Patch coverage is 34.32836% with 220 lines in your changes are missing coverage. Please review.

Project coverage is 60.11%. Comparing base (c5da3b2) to head (59eb787).

Files Patch % Lines
service/async/service_impl.go 30.12% 109 Missing and 7 partials ⚠️
cluster/event_delegate.go 0.00% 30 Missing ⚠️
config/config.go 15.62% 25 Missing and 2 partials ⚠️
cluster/delegate.go 0.00% 26 Missing ⚠️
service/async/default_server.go 69.38% 11 Missing and 4 partials ⚠️
cmd/server/bootstrap/xcherry.go 55.55% 3 Missing and 1 partial ⚠️
engine/immediate_task_queue.go 0.00% 1 Missing ⚠️
engine/timer_task_queue.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #122      +/-   ##
==========================================
- Coverage   61.38%   60.11%   -1.28%     
==========================================
  Files          88       91       +3     
  Lines        6936     7200     +264     
==========================================
+ Hits         4258     4328      +70     
- Misses       2404     2591     +187     
- Partials      274      281       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@zklgame zklgame marked this pull request as ready for review March 15, 2024 16:58
@zklgame zklgame requested a review from longquanzheng March 15, 2024 16:59
@zklgame zklgame force-pushed the add_memberlist branch 7 times, most recently from 46f0bbb to 931d428 Compare March 17, 2024 11:53
Copy link
Contributor

@longquanzheng longquanzheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a hierarchy of the cluster mode, it will be nice to:

  • Each async service process is a standard deployment in K8s(Marvin) so the number of port needs to be static.
  • Each async service process is composed by several shards, each shard is composed by immediateQueue + delayQueue.

run: sleep 5 && make install-schema-postgres

- name: Start Postgres server # then run in the background
run: ./xcherry-server --config ./config/development-postgres-cluster.yaml&
Copy link
Contributor

@longquanzheng longquanzheng Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can split into multiple PRs. The end goal is to run multiple processes as different nodes of the cluster:

./xcherry-server --config ./config/development-postgres-cluster-api-node-a.yaml& ./xcherry-server --config ./config/development-postgres-cluster-async-node-a.yaml& ./xcherry-server --config ./config/development-postgres-cluster-async-node-b.yaml& ./xcherry-server --config ./config/development-postgres-cluster-async-node-c.yaml&

It includes:

  • an API node
  • three async nodes:
    • node A
    • node B
    • node C

"log"
)

type ClusterDelegate struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for? seems not being used yet. Maybe put a comment if you want to keep for later PRs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to save the meta data. We use meta data to get the target client address of a node.


type ClusterEventDelegate struct {
consistent *hashring.HashRing
ServerAddress string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is ServerAddress the address of the current node? or others?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to have a constructor method:

func NewClusterEventDelegate

meta := ParseClusterDelegateMetaData(node.Meta)

hostPort := BuildHostAddress(node)
log.Printf("ClusterEvent JOIN %s: advertise address %s, server address %s", d.ServerAddress, hostPort, meta.ServerAddress)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use our logger(unless impossible like during startup before the logger is available). This log will only print to stderr/sdtout. Our logger allows configure to different implementation, and using better format

@@ -6,66 +6,117 @@ package async
import (
"context"
"fmt"
"github.com/hashicorp/memberlist"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a lot of the membership implementation code is added into this file.

It will be great if we can abstract the code into a ClusterMembership interface & impl in a spearate place. And this server_impl will just use it

serverAddresses := strings.Split(cfg.AsyncService.ClientAddress, ",")
advertiseAddresses := []string{""}

if cfg.AsyncService.Mode == config.AsyncServiceModeConsistentHashingCluster {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we can rename AsyncServiceModeConsistentHashingCluster to AsyncServiceModeCluster :D

Comment on lines +75 to +80
// Used in AsyncServiceConfig with AsyncServiceModeConsistentHashingCluster mode only.
// Multiple Address seperated by comma.
ClusterAddresses string `yaml:"clusterAddresses"`
// Used in AsyncServiceConfig with AsyncServiceModeConsistentHashingCluster mode only.
// These are addresses used by memberlist.
ClusterAdvertiseAddresses string `yaml:"clusterAdvertiseAddresses"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to put them under AsyncServiceConfig.ClusterConfig

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also let's add more details on what are those configs for and how to use them

var servers []Server
addressToServerMap := map[string]Server{}

serverAddresses := strings.Split(cfg.AsyncService.ClientAddress, ",")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cfg.AsyncService.ClientAddress is the address for api service to call async service. API service needs to notify the async serivce's specific node/shard. But api service doesn't need to use memberlist to lookup. Instead, api service will send to a random node, and then the receiving async service node may forward the request to the right node of the shard.

Because api service doesn't need to know which node own the shard, it can send request behind a LBS.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current way is:

  1. api server send the internal request to a random async server
  2. the random async server forward the request to the target async server

This is the same as what you described above. See the codes near the comment // randomly send the request to an async service.

@zklgame zklgame closed this Mar 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants