Skip to content

Commit

Permalink
add cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
zklgame committed Mar 15, 2024
1 parent c5da3b2 commit 01581eb
Show file tree
Hide file tree
Showing 15 changed files with 544 additions and 79 deletions.
55 changes: 55 additions & 0 deletions cluster/event_delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 xCherryIO organization

// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package cluster

import (
"fmt"
"github.com/hashicorp/memberlist"
"github.com/serialx/hashring"
"log"
"strconv"
)

type ClusterEventDelegate struct {
consistent *hashring.HashRing
ServerAddress string
}

func (d *ClusterEventDelegate) NotifyJoin(node *memberlist.Node) {
hostPort := BuildHostAddress(node)
log.Printf("ClusterEvent JOIN %s with server address %s", hostPort, d.ServerAddress)

if d.consistent == nil {
d.consistent = hashring.New([]string{hostPort})
} else {
d.consistent = d.consistent.AddNode(hostPort)
}

Check warning on line 29 in cluster/event_delegate.go

View check run for this annotation

Codecov / codecov/patch

cluster/event_delegate.go#L21-L29

Added lines #L21 - L29 were not covered by tests
}

func (d *ClusterEventDelegate) NotifyLeave(node *memberlist.Node) {
hostPort := BuildHostAddress(node)
log.Printf("ClusterEvent LEAVE %s with server address %s", hostPort, d.ServerAddress)

if d.consistent != nil {
d.consistent = d.consistent.RemoveNode(hostPort)
}

Check warning on line 38 in cluster/event_delegate.go

View check run for this annotation

Codecov / codecov/patch

cluster/event_delegate.go#L32-L38

Added lines #L32 - L38 were not covered by tests
}

func (d *ClusterEventDelegate) NotifyUpdate(node *memberlist.Node) {
// skip

Check warning on line 42 in cluster/event_delegate.go

View check run for this annotation

Codecov / codecov/patch

cluster/event_delegate.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}

func (d *ClusterEventDelegate) GetNodeFor(shardId int32) string {
node, ok := d.consistent.GetNode(strconv.Itoa(int(shardId)))
if !ok {
log.Fatalf("Failed to search shardId %d", shardId)
}
return node

Check warning on line 50 in cluster/event_delegate.go

View check run for this annotation

Codecov / codecov/patch

cluster/event_delegate.go#L45-L50

Added lines #L45 - L50 were not covered by tests
}

func BuildHostAddress(node *memberlist.Node) string {
return fmt.Sprintf("%s:%d", node.Addr.To4().String(), node.Port)

Check warning on line 54 in cluster/event_delegate.go

View check run for this annotation

Codecov / codecov/patch

cluster/event_delegate.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}
16 changes: 10 additions & 6 deletions cmd/server/bootstrap/xcherry.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,16 @@ func StartXCherryServer(rootCtx context.Context, cfg *config.Config, services ma
}
}

var asyncServer async.Server
var asyncServers []async.Server
if services[AsyncServiceName] {
asyncServer := async.NewDefaultAPIServerWithGin(
asyncServers = async.NewDefaultAsyncServersWithGin(
rootCtx, *cfg, processStore, visibilityStore, logger.WithTags(tag.Service(AsyncServiceName)))
err = asyncServer.Start()
if err != nil {
logger.Fatal("Failed to start async server", tag.Error(err))

for _, asyncServer := range asyncServers {
err = asyncServer.Start()
if err != nil {
logger.Fatal(fmt.Sprintf("Failed to start the async server %s", asyncServer.GetServerAddress()), tag.Error(err))
}

Check warning on line 103 in cmd/server/bootstrap/xcherry.go

View check run for this annotation

Codecov / codecov/patch

cmd/server/bootstrap/xcherry.go#L102-L103

Added lines #L102 - L103 were not covered by tests
}
}

Expand All @@ -111,12 +114,13 @@ func StartXCherryServer(rootCtx context.Context, cfg *config.Config, services ma
errs = multierr.Append(errs, err)
}
}
if asyncServer != nil {
for _, asyncServer := range asyncServers {

Check warning on line 117 in cmd/server/bootstrap/xcherry.go

View check run for this annotation

Codecov / codecov/patch

cmd/server/bootstrap/xcherry.go#L117

Added line #L117 was not covered by tests
err := asyncServer.Stop(ctx)
if err != nil {
errs = multierr.Append(errs, err)
}
}

// stop processStore and visibilityStore
err := processStore.Close()
if err != nil {
Expand Down
76 changes: 63 additions & 13 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log"
"os"
"strings"
"time"

"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -46,8 +47,12 @@ type (
}

AsyncServiceConfig struct {
// Mode is the mode of async service. Currently only standalone mode is supported
// Mode is the mode of async service.
Mode AsyncServiceMode `yaml:"mode"`
// Shard specifies how many shards to use in the database.
// Shards are managed by consistent hash across all the async services.
// In the AsyncServiceModeStandalone mode, this number is always 1.
Shard int `yaml:"shard"`
// ImmediateTaskQueue is the config for immediate task queue
ImmediateTaskQueue ImmediateTaskQueueConfig `yaml:"immediateTaskQueue"`
// TimerTaskQueue is the config for timer task queue
Expand All @@ -57,6 +62,9 @@ type (
InternalHttpServer HttpServerConfig `yaml:"internalHttpServer"`
// ClientAddress is the address for API service to call AsyncService's internal API
ClientAddress string `yaml:"clientAddress"`
// Only available in the AsyncServiceModeConsistentHashingCluster mode.
// A map maintains the relationship between cluster advertise addresses and cluster addresses
AdvertiseToClientAddressMap map[string]string
}

// HttpServerConfig is the config that will be mapped into http.Server
Expand All @@ -67,6 +75,12 @@ type (
// See net.Dial for details of the address format.
// For more details, see https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/
Address string `yaml:"address"`
// Used in AsyncServiceConfig with AsyncServiceModeConsistentHashingCluster mode only.
// Multiple Address seperated by comma.
ClusterAddresses string `yaml:"clusterAddresses"`
// Used in AsyncServiceConfig with AsyncServiceModeConsistentHashingCluster mode only.
// These are addresses used by memberlist.
ClusterAdvertiseAddresses string `yaml:"clusterAdvertiseAddresses"`
// ReadTimeout is the maximum duration for reading the entire
// request, including the body. Because ReadTimeout does not
// let Handlers make per-request decisions on each request body's acceptable
Expand Down Expand Up @@ -184,13 +198,9 @@ type (

const (
// AsyncServiceModeStandalone means there is only one node for async service
// This is the only supported mode now
AsyncServiceModeStandalone = "standalone"
// AsyncServiceModeConsistentHashingCluster means all the nodes of async service
// will form a consistent hashing ring, which is used for shard ownership management
// TODO
// 1. add ringpop config
// 2. add async client address config for APIService to call async service with LBS
AsyncServiceModeConsistentHashingCluster = "consistent-hashing-cluster"
)

Expand Down Expand Up @@ -232,10 +242,7 @@ func (c *Config) ValidateAndSetDefaults() error {
rpcConfig.DefaultRpcAPITimeout = 10 * time.Second
}
if c.AsyncService.Mode == "" {
return fmt.Errorf("must set async service mode")
}
if c.AsyncService.Mode != AsyncServiceModeStandalone {
return fmt.Errorf("currently only standalone mode is supported")
return fmt.Errorf("must set a valid async service mode")

Check warning on line 245 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L245

Added line #L245 was not covered by tests
}
immediateTaskQConfig := &c.AsyncService.ImmediateTaskQueue
if immediateTaskQConfig.MaxPollInterval == 0 {
Expand Down Expand Up @@ -284,12 +291,55 @@ func (c *Config) ValidateAndSetDefaults() error {
if timerTaskQConfig.TriggerNotificationBufferSize == 0 {
timerTaskQConfig.TriggerNotificationBufferSize = 1000
}
if c.AsyncService.ClientAddress == "" {
if c.AsyncService.InternalHttpServer.Address == "" {
return fmt.Errorf("AsyncService.InternalHttpServer.Address cannot be empty")

if c.AsyncService.Mode == AsyncServiceModeStandalone {
c.AsyncService.Shard = 1

if c.AsyncService.ClientAddress == "" {
if c.AsyncService.InternalHttpServer.Address == "" {
return fmt.Errorf("AsyncService.InternalHttpServer.Address cannot be empty")
}

Check warning on line 301 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L300-L301

Added lines #L300 - L301 were not covered by tests

c.AsyncService.ClientAddress = "http://" + c.AsyncService.InternalHttpServer.Address
}
}

if c.AsyncService.Mode == AsyncServiceModeConsistentHashingCluster {
clusterAdvertiseAddresses := strings.Split(c.AsyncService.InternalHttpServer.ClusterAdvertiseAddresses, ",")

if c.AsyncService.ClientAddress == "" {
clusterAddresses := strings.Split(c.AsyncService.InternalHttpServer.ClusterAddresses, ",")

if len(clusterAddresses) != len(clusterAdvertiseAddresses) {
return fmt.Errorf("AsyncService.InternalHttpServer.ClusterAddresses must have the same size " +
"as AsyncService.InternalHttpServer.ClusterAdvertiseAddresses")
}

Check warning on line 316 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L308-L316

Added lines #L308 - L316 were not covered by tests

if len(clusterAddresses) == 0 {
return fmt.Errorf("AsyncService.InternalHttpServer.ClusterAddresses cannot be empty")
}

Check warning on line 320 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L318-L320

Added lines #L318 - L320 were not covered by tests

for _, clusterAddress := range clusterAddresses {
if len(c.AsyncService.ClientAddress) > 0 {
c.AsyncService.ClientAddress += ","
}
c.AsyncService.ClientAddress += "http://" + clusterAddress

Check warning on line 326 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L322-L326

Added lines #L322 - L326 were not covered by tests
}
}

clientAddresses := strings.Split(c.AsyncService.ClientAddress, ",")

if c.AsyncService.Shard < len(clientAddresses) {
return fmt.Errorf("AsyncService.Shard should be an equal or bigger number than the AsyncService.ClientAddress count")
}

Check warning on line 334 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L330-L334

Added lines #L330 - L334 were not covered by tests

c.AsyncService.AdvertiseToClientAddressMap = map[string]string{}

for i, advertiseAddress := range clusterAdvertiseAddresses {
c.AsyncService.AdvertiseToClientAddressMap[advertiseAddress] = clientAddresses[i]

Check warning on line 339 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L336-L339

Added lines #L336 - L339 were not covered by tests
}
c.AsyncService.ClientAddress = "http://" + c.AsyncService.InternalHttpServer.Address
}

return nil
}

Expand Down
29 changes: 29 additions & 0 deletions config/development-postgres-cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
log:
stdout: true
level: debug
levelKey: "level"
apiService:
httpServer: # see more config fields in config.go
address: 0.0.0.0:8801
readTimeout: 10s
writeTimeout: 60s
database:
processStore:
dbExtensionName: postgres
user: xcherry
password: xcherryio
databaseName: xcherry
connectAddr: 127.0.0.1:5432
visibilityStore:
dbExtensionName: postgres
user: xcherry
password: xcherryio
databaseName: xcherry
connectAddr: 127.0.0.1:5432
asyncService:
mode: consistent-hashing-cluster
shard: 30
internalHttpServer:
clusterAddresses: 0.0.0.0:8701,0.0.0.0:8702,0.0.0.0:8703
# need to use real IP here
clusterAdvertiseAddresses: 192.168.2.100:8901,192.168.2.100:8902,192.168.2.100:8903
12 changes: 6 additions & 6 deletions engine/immediate_task_concurrent_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (w *immediateTaskConcurrentProcessor) processWaitUntilTask(
}

if compResp.HasNewImmediateTask {
w.notifyNewImmediateTask(prep, task)
w.notifyNewImmediateTask(task.ShardId, prep, task)
}

if len(compResp.FireTimestamps) > 0 {
Expand Down Expand Up @@ -335,7 +335,7 @@ func (w *immediateTaskConcurrentProcessor) applyStateFailureRecoveryPolicy(
} else {
nextImmediateTask.TaskType = data_models.ImmediateTaskTypeExecute
}
w.notifyNewImmediateTask(prep, nextImmediateTask)
w.notifyNewImmediateTask(task.ShardId, prep, nextImmediateTask)
default:
return fmt.Errorf("unknown state failure recovery policy %v", stateRecoveryPolicy.Policy)
}
Expand Down Expand Up @@ -489,7 +489,7 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
return fmt.Errorf("failed to write app database")
}
if compResp.HasNewImmediateTask {
w.notifyNewImmediateTask(prep, task)
w.notifyNewImmediateTask(task.ShardId, prep, task)
}
return nil
}
Expand Down Expand Up @@ -519,10 +519,10 @@ func (w *immediateTaskConcurrentProcessor) createContextWithTimeout(
}

func (w *immediateTaskConcurrentProcessor) notifyNewImmediateTask(
prep data_models.PrepareStateExecutionResponse, task data_models.ImmediateTask,
shardId int32, prep data_models.PrepareStateExecutionResponse, task data_models.ImmediateTask,
) {
w.taskNotifier.NotifyNewImmediateTasks(xcapi.NotifyImmediateTasksRequest{
ShardId: persistence.DefaultShardId,
ShardId: shardId,
Namespace: &prep.Info.Namespace,
ProcessId: &prep.Info.ProcessId,
ProcessExecutionId: ptr.Any(task.ProcessExecutionId.String()),
Expand Down Expand Up @@ -564,7 +564,7 @@ func (w *immediateTaskConcurrentProcessor) retryTask(
return err
}
w.taskNotifier.NotifyNewTimerTasks(xcapi.NotifyTimerTasksRequest{
ShardId: persistence.DefaultShardId,
ShardId: task.ShardId,
Namespace: &prep.Info.Namespace,
ProcessId: &prep.Info.ProcessId,
ProcessExecutionId: ptr.Any(task.ProcessExecutionId.String()),
Expand Down
3 changes: 2 additions & 1 deletion engine/immediate_task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package engine

import (
"context"
"fmt"
"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/persistence/data_models"
"math/rand"
Expand Down Expand Up @@ -92,7 +93,7 @@ func (w *immediateTaskQueueImpl) Start() error {
w.receiveCompletedTask(task)
}
case <-w.rootCtx.Done():
w.logger.Info("processor is being closed")
w.logger.Info(fmt.Sprintf("immediateTaskQueue %d is being closed", w.shardId))

Check warning on line 96 in engine/immediate_task_queue.go

View check run for this annotation

Codecov / codecov/patch

engine/immediate_task_queue.go#L96

Added line #L96 was not covered by tests
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion engine/timer_task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (w *timerTaskQueueImpl) Start() error {
case <-w.triggerPollTimer.FireChan():
w.triggeredPolling()
case <-w.rootCtx.Done():
w.logger.Info("processor is being closed")
w.logger.Info(fmt.Sprintf("timerTaskQueue %d is being closed", w.shardId))

Check warning on line 133 in engine/timer_task_queue.go

View check run for this annotation

Codecov / codecov/patch

engine/timer_task_queue.go#L133

Added line #L133 was not covered by tests
return
}
}
Expand Down
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ go 1.19
require (
github.com/gin-gonic/gin v1.9.1
github.com/gofrs/uuid v4.4.0+incompatible
github.com/hashicorp/memberlist v0.5.0
github.com/iancoleman/strcase v0.3.0
github.com/jmoiron/sqlx v1.2.1-0.20200615141059-0794cb1f47ee
github.com/jonboulle/clockwork v0.4.0
github.com/lib/pq v1.2.0
github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
github.com/xcherryio/apis v0.0.3-0.20240313171434-ae652fc3c70f
Expand All @@ -19,6 +21,7 @@ require (
)

require (
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
Expand All @@ -29,17 +32,26 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.5.3 // indirect
github.com/hashicorp/go-multierror v1.0.0 // indirect
github.com/hashicorp/go-sockaddr v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/miekg/dns v1.1.26 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
Expand Down
Loading

0 comments on commit 01581eb

Please sign in to comment.