Skip to content

Commit

Permalink
add cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
zklgame committed Mar 15, 2024
1 parent f93c258 commit 7dd625b
Show file tree
Hide file tree
Showing 15 changed files with 494 additions and 78 deletions.
50 changes: 50 additions & 0 deletions cluster/event_delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package cluster

import (
"fmt"
"github.com/hashicorp/memberlist"
"github.com/serialx/hashring"
"log"
"strconv"
)

type ClusterEventDelegate struct {
consistent *hashring.HashRing
ServerAddress string
}

func (d *ClusterEventDelegate) NotifyJoin(node *memberlist.Node) {
hostPort := BuildHostAddress(node)
log.Printf("ClusterEvent JOIN %s with server address %s", hostPort, d.ServerAddress)

if d.consistent == nil {
d.consistent = hashring.New([]string{d.ServerAddress})
} else {
d.consistent = d.consistent.AddNode(d.ServerAddress)
}
}

func (d *ClusterEventDelegate) NotifyLeave(node *memberlist.Node) {
hostPort := BuildHostAddress(node)
log.Printf("ClusterEvent LEAVE %s with server address %s", hostPort, d.ServerAddress)

if d.consistent != nil {
d.consistent = d.consistent.RemoveNode(d.ServerAddress)
}
}

func (d *ClusterEventDelegate) NotifyUpdate(node *memberlist.Node) {
// skip
}

func (d *ClusterEventDelegate) GetServerAddressFor(shardId int32) string {
node, ok := d.consistent.GetNode(strconv.Itoa(int(shardId)))
if !ok {
log.Fatal(fmt.Sprintf("Failed to search shardId %d", shardId))

Check failure on line 43 in cluster/event_delegate.go

View workflow job for this annotation

GitHub Actions / golangci-lint

[golangci] reported by reviewdog 🐶 S1038: should use log.Fatalf(...) instead of log.Fatal(fmt.Sprintf(...)) (gosimple) Raw Output: cluster/event_delegate.go:43:3: S1038: should use log.Fatalf(...) instead of log.Fatal(fmt.Sprintf(...)) (gosimple) log.Fatal(fmt.Sprintf("Failed to search shardId %d", shardId)) ^
}
return node
}

func BuildHostAddress(node *memberlist.Node) string {
return fmt.Sprintf("%s:%d", node.Addr.To4().String(), node.Port)
}
23 changes: 14 additions & 9 deletions cmd/server/bootstrap/xcherry.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,16 @@ func StartXCherryServer(rootCtx context.Context, cfg *config.Config, services ma
}
}

var asyncServer async.Server
var asyncServers []async.Server
if services[AsyncServiceName] {
asyncServer := async.NewDefaultAPIServerWithGin(
asyncServers = async.NewDefaultAsyncServersWithGin(
rootCtx, *cfg, processStore, visibilityStore, logger.WithTags(tag.Service(AsyncServiceName)))
err = asyncServer.Start()
if err != nil {
logger.Fatal("Failed to start async server", tag.Error(err))

for i, asyncServer := range asyncServers {
err = asyncServer.Start()
if err != nil {
logger.Fatal(fmt.Sprintf("Failed to start the %d-th async server", i), tag.Error(err))
}
}
}

Expand All @@ -111,10 +114,12 @@ func StartXCherryServer(rootCtx context.Context, cfg *config.Config, services ma
errs = multierr.Append(errs, err)
}
}
if asyncServer != nil {
err := asyncServer.Stop(ctx)
if err != nil {
errs = multierr.Append(errs, err)
if asyncServers != nil {

Check failure on line 117 in cmd/server/bootstrap/xcherry.go

View workflow job for this annotation

GitHub Actions / golangci-lint

[golangci] reported by reviewdog 🐶 S1031: unnecessary nil check around range (gosimple) Raw Output: cmd/server/bootstrap/xcherry.go:117:3: S1031: unnecessary nil check around range (gosimple) if asyncServers != nil { ^
for _, asyncServer := range asyncServers {
err := asyncServer.Stop(ctx)
if err != nil {
errs = multierr.Append(errs, err)
}
}
}
// stop processStore and visibilityStore
Expand Down
51 changes: 40 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log"
"os"
"strings"
"time"

"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -46,8 +47,12 @@ type (
}

AsyncServiceConfig struct {
// Mode is the mode of async service. Currently only standalone mode is supported
// Mode is the mode of async service.
Mode AsyncServiceMode `yaml:"mode"`
// Only available in the consistent-hashing-cluster mode.
// Shard specifies how many shards to use in the database.
// Shards are managed by consistent hash across all the async services.
Shard int `yaml:"shard"`
// ImmediateTaskQueue is the config for immediate task queue
ImmediateTaskQueue ImmediateTaskQueueConfig `yaml:"immediateTaskQueue"`
// TimerTaskQueue is the config for timer task queue
Expand All @@ -67,6 +72,12 @@ type (
// See net.Dial for details of the address format.
// For more details, see https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/
Address string `yaml:"address"`
// Only available in the consistent-hashing-cluster mode.
// Multiple Address seperated by comma.
Addresses string `yaml:"addresses"`
// Only available in the consistent-hashing-cluster mode.
// These are addressed used by memberlist.
AdvertiseAddresses string `yaml:"advertiseAddresses"`
// ReadTimeout is the maximum duration for reading the entire
// request, including the body. Because ReadTimeout does not
// let Handlers make per-request decisions on each request body's acceptable
Expand Down Expand Up @@ -184,13 +195,9 @@ type (

const (
// AsyncServiceModeStandalone means there is only one node for async service
// This is the only supported mode now
AsyncServiceModeStandalone = "standalone"
// AsyncServiceModeConsistentHashingCluster means all the nodes of async service
// will form a consistent hashing ring, which is used for shard ownership management
// TODO
// 1. add ringpop config
// 2. add async client address config for APIService to call async service with LBS
AsyncServiceModeConsistentHashingCluster = "consistent-hashing-cluster"
)

Expand Down Expand Up @@ -234,9 +241,6 @@ func (c *Config) ValidateAndSetDefaults() error {
if c.AsyncService.Mode == "" {
return fmt.Errorf("must set async service mode")
}
if c.AsyncService.Mode != AsyncServiceModeStandalone {
return fmt.Errorf("currently only standalone mode is supported")
}
immediateTaskQConfig := &c.AsyncService.ImmediateTaskQueue
if immediateTaskQConfig.MaxPollInterval == 0 {
immediateTaskQConfig.MaxPollInterval = time.Minute
Expand Down Expand Up @@ -284,12 +288,37 @@ func (c *Config) ValidateAndSetDefaults() error {
if timerTaskQConfig.TriggerNotificationBufferSize == 0 {
timerTaskQConfig.TriggerNotificationBufferSize = 1000
}
if c.AsyncService.Mode == AsyncServiceModeConsistentHashingCluster &&
c.AsyncService.Shard <= len(strings.Split(c.AsyncService.InternalHttpServer.Addresses, ",")) {
return fmt.Errorf("AsyncService.Shard should be a bigger number than the InternalHttpServer.Addresses count")
}
if c.AsyncService.ClientAddress == "" {
if c.AsyncService.InternalHttpServer.Address == "" {
return fmt.Errorf("AsyncService.InternalHttpServer.Address cannot be empty")
if c.AsyncService.Mode == AsyncServiceModeStandalone {
if c.AsyncService.InternalHttpServer.Address == "" {
return fmt.Errorf("AsyncService.InternalHttpServer.Address cannot be empty")
}

c.AsyncService.ClientAddress = "http://" + c.AsyncService.InternalHttpServer.Address
} else {
addresses := strings.Split(c.AsyncService.InternalHttpServer.Addresses, ",")
if len(addresses) == 0 {
return fmt.Errorf("AsyncService.InternalHttpServer.Addresses cannot be empty")
}

for _, address := range addresses {
if len(c.AsyncService.ClientAddress) > 0 {
c.AsyncService.ClientAddress += ","
}
c.AsyncService.ClientAddress += "http://" + address
}
}
c.AsyncService.ClientAddress = "http://" + c.AsyncService.InternalHttpServer.Address
}
if c.AsyncService.Mode == AsyncServiceModeConsistentHashingCluster {
if len(strings.Split(c.AsyncService.InternalHttpServer.Addresses, ",")) != len(strings.Split(c.AsyncService.InternalHttpServer.AdvertiseAddresses, ",")) {

Check failure on line 317 in config/config.go

View workflow job for this annotation

GitHub Actions / golangci-lint

[golangci] reported by reviewdog 🐶 line is 162 characters (lll) Raw Output: config/config.go:317: line is 162 characters (lll) if len(strings.Split(c.AsyncService.InternalHttpServer.Addresses, ",")) != len(strings.Split(c.AsyncService.InternalHttpServer.AdvertiseAddresses, ",")) {
return fmt.Errorf("AsyncService.InternalHttpServer.Addresses must have the same size as AsyncService.InternalHttpServer.AdvertiseAddresses")
}
}

return nil
}

Expand Down
28 changes: 28 additions & 0 deletions config/development-postgres-cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
log:
stdout: true
level: debug
levelKey: "level"
apiService:
httpServer: # see more config fields in config.go
address: 0.0.0.0:8801
readTimeout: 10s
writeTimeout: 60s
database:
processStore:
dbExtensionName: postgres
user: xcherry
password: xcherryio
databaseName: xcherry
connectAddr: 127.0.0.1:5432
visibilityStore:
dbExtensionName: postgres
user: xcherry
password: xcherryio
databaseName: xcherry
connectAddr: 127.0.0.1:5432
asyncService:
mode: consistent-hashing-cluster
shard: 30
internalHttpServer:
addresses: 0.0.0.0:8701,0.0.0.0:8702,0.0.0.0:8703
advertiseAddresses: 0.0.0.0:8901,0.0.0.0:8902,0.0.0.0:8903
12 changes: 6 additions & 6 deletions engine/immediate_task_concurrent_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (w *immediateTaskConcurrentProcessor) processWaitUntilTask(
}

if compResp.HasNewImmediateTask {
w.notifyNewImmediateTask(prep, task)
w.notifyNewImmediateTask(task.ShardId, prep, task)
}

if len(compResp.FireTimestamps) > 0 {
Expand Down Expand Up @@ -335,7 +335,7 @@ func (w *immediateTaskConcurrentProcessor) applyStateFailureRecoveryPolicy(
} else {
nextImmediateTask.TaskType = data_models.ImmediateTaskTypeExecute
}
w.notifyNewImmediateTask(prep, nextImmediateTask)
w.notifyNewImmediateTask(task.ShardId, prep, nextImmediateTask)
default:
return fmt.Errorf("unknown state failure recovery policy %v", stateRecoveryPolicy.Policy)
}
Expand Down Expand Up @@ -489,7 +489,7 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
return fmt.Errorf("failed to write app database")
}
if compResp.HasNewImmediateTask {
w.notifyNewImmediateTask(prep, task)
w.notifyNewImmediateTask(task.ShardId, prep, task)
}
return nil
}
Expand Down Expand Up @@ -519,10 +519,10 @@ func (w *immediateTaskConcurrentProcessor) createContextWithTimeout(
}

func (w *immediateTaskConcurrentProcessor) notifyNewImmediateTask(
prep data_models.PrepareStateExecutionResponse, task data_models.ImmediateTask,
shardId int32, prep data_models.PrepareStateExecutionResponse, task data_models.ImmediateTask,
) {
w.taskNotifier.NotifyNewImmediateTasks(xcapi.NotifyImmediateTasksRequest{
ShardId: persistence.DefaultShardId,
ShardId: shardId,
Namespace: &prep.Info.Namespace,
ProcessId: &prep.Info.ProcessId,
ProcessExecutionId: ptr.Any(task.ProcessExecutionId.String()),
Expand Down Expand Up @@ -564,7 +564,7 @@ func (w *immediateTaskConcurrentProcessor) retryTask(
return err
}
w.taskNotifier.NotifyNewTimerTasks(xcapi.NotifyTimerTasksRequest{
ShardId: persistence.DefaultShardId,
ShardId: task.ShardId,
Namespace: &prep.Info.Namespace,
ProcessId: &prep.Info.ProcessId,
ProcessExecutionId: ptr.Any(task.ProcessExecutionId.String()),
Expand Down
3 changes: 2 additions & 1 deletion engine/immediate_task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package engine

import (
"context"
"fmt"
"github.com/xcherryio/apis/goapi/xcapi"
"github.com/xcherryio/xcherry/persistence/data_models"
"math/rand"
Expand Down Expand Up @@ -92,7 +93,7 @@ func (w *immediateTaskQueueImpl) Start() error {
w.receiveCompletedTask(task)
}
case <-w.rootCtx.Done():
w.logger.Info("processor is being closed")
w.logger.Info(fmt.Sprintf("immediateTaskQueue %d is being closed", w.shardId))
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion engine/timer_task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (w *timerTaskQueueImpl) Start() error {
case <-w.triggerPollTimer.FireChan():
w.triggeredPolling()
case <-w.rootCtx.Done():
w.logger.Info("processor is being closed")
w.logger.Info(fmt.Sprintf("timerTaskQueue %d is being closed", w.shardId))
return
}
}
Expand Down
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ go 1.19
require (
github.com/gin-gonic/gin v1.9.1
github.com/gofrs/uuid v4.4.0+incompatible
github.com/hashicorp/memberlist v0.5.0
github.com/iancoleman/strcase v0.3.0
github.com/jmoiron/sqlx v1.2.1-0.20200615141059-0794cb1f47ee
github.com/jonboulle/clockwork v0.4.0
github.com/lib/pq v1.2.0
github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
github.com/xcherryio/apis v0.0.3-0.20240112190552-ffa9d65f6a5e
Expand All @@ -19,6 +21,7 @@ require (
)

require (
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
Expand All @@ -29,17 +32,26 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.5.3 // indirect
github.com/hashicorp/go-multierror v1.0.0 // indirect
github.com/hashicorp/go-sockaddr v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/miekg/dns v1.1.26 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
Expand Down
Loading

0 comments on commit 7dd625b

Please sign in to comment.