Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add visibility store #116

Merged
merged 7 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions cmd/server/bootstrap/xcherry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package bootstrap
import (
"context"
"fmt"
"github.com/xcherryio/xcherry/persistence/visibility"
rawLog "log"
"os"
"os/signal"
Expand All @@ -16,7 +17,7 @@ import (
"github.com/xcherryio/xcherry/common/log"
"github.com/xcherryio/xcherry/common/log/tag"
"github.com/xcherryio/xcherry/config"
"github.com/xcherryio/xcherry/persistence/sql"
"github.com/xcherryio/xcherry/persistence/process"
"github.com/xcherryio/xcherry/service/api"
"github.com/xcherryio/xcherry/service/async"
"go.uber.org/multierr"
Expand Down Expand Up @@ -70,14 +71,20 @@ func StartXCherryServer(rootCtx context.Context, cfg *config.Config, services ma
logger.Fatal("config is invalid", tag.Error(err))
}

sqlStore, err := sql.NewSQLProcessStore(*cfg.Database.SQL, logger)
processStore, err := process.NewSQLProcessStore(*cfg.Database.ProcessStoreConfig, logger)
if err != nil {
logger.Fatal("error on persistence setup", tag.Error(err))
}

visibilityStore, err := visibility.NewSqlVisibilityStore(*cfg.Database.VisibilityStoreConfig, logger)
if err != nil {
logger.Fatal("error on visibility setup", tag.Error(err))
}

var apiServer api.Server
if services[ApiServiceName] {
apiServer = api.NewDefaultAPIServerWithGin(rootCtx, *cfg, sqlStore, logger.WithTags(tag.Service(ApiServiceName)))
apiServer = api.NewDefaultAPIServerWithGin(
rootCtx, *cfg, processStore, logger.WithTags(tag.Service(ApiServiceName)))
err = apiServer.Start()
if err != nil {
logger.Fatal("Failed to start api server", tag.Error(err))
Expand All @@ -86,7 +93,8 @@ func StartXCherryServer(rootCtx context.Context, cfg *config.Config, services ma

var asyncServer async.Server
if services[AsyncServiceName] {
asyncServer := async.NewDefaultAPIServerWithGin(rootCtx, *cfg, sqlStore, logger.WithTags(tag.Service(AsyncServiceName)))
asyncServer := async.NewDefaultAPIServerWithGin(
rootCtx, *cfg, processStore, visibilityStore, logger.WithTags(tag.Service(AsyncServiceName)))
err = asyncServer.Start()
if err != nil {
logger.Fatal("Failed to start async server", tag.Error(err))
Expand All @@ -109,8 +117,12 @@ func StartXCherryServer(rootCtx context.Context, cfg *config.Config, services ma
errs = multierr.Append(errs, err)
}
}
// stop sqlStore
err := sqlStore.Close()
// stop processStore and visibilityStore
err := processStore.Close()
if err != nil {
errs = multierr.Append(errs, err)
}
err = visibilityStore.Close()
if err != nil {
errs = multierr.Append(errs, err)
}
Expand Down
10 changes: 6 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type (
// SQL is the SQL database config
// either sql or nosql is needed to run server
// Only SQL is supported for now.
SQL *SQL `yaml:"sql"`
ProcessStoreConfig *SQL `yaml:"processStore"`
VisibilityStoreConfig *SQL `yaml:"visibilityStore"`
}

ApiServiceConfig struct {
Expand Down Expand Up @@ -215,12 +216,13 @@ func NewConfig(configPath string) (*Config, error) {
}

func (c *Config) ValidateAndSetDefaults() error {
if c.Database.SQL == nil {
if c.Database.ProcessStoreConfig == nil {
return fmt.Errorf("sql config is required")
}
sql := c.Database.SQL
sql := c.Database.ProcessStoreConfig
if anyAbsent(sql.DatabaseName, sql.DBExtensionName, sql.ConnectAddr, sql.User) {
return fmt.Errorf("some required configs are missing: sql.DatabaseName, sql.DBExtensionName, sql.ConnectAddr, sql.User")
return fmt.Errorf("some required configs are missing: processStore.DatabaseName, " +
"processStore.DBExtensionName, processStore.ConnectAddr, processStore.User")
}
rpcConfig := &c.ApiService.Rpc
if rpcConfig.MaxRpcAPITimeout == 0 {
Expand Down
8 changes: 7 additions & 1 deletion config/development-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ apiService:
readTimeout: 10s
writeTimeout: 60s
database:
sql:
processStore:
dbExtensionName: postgres
user: xcherry
password: xcherryio
databaseName: xcherry
connectAddr: 127.0.0.1:5432
visibilityStore:
dbExtensionName: postgres
user: xcherry
password: xcherryio
Expand Down
27 changes: 15 additions & 12 deletions engine/immediate_task_concurrent_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ type immediateTaskConcurrentProcessor struct {
// shardId to the channel
taskToCommitChans map[int32]chan<- data_models.ImmediateTask
taskNotifier TaskNotifier
store persistence.ProcessStore
processStore persistence.ProcessStore
visibilityStore persistence.VisibilityStore
logger log.Logger
}

func NewImmediateTaskConcurrentProcessor(
ctx context.Context, cfg config.Config, notifier TaskNotifier,
store persistence.ProcessStore, logger log.Logger,
processStore persistence.ProcessStore,
visibilityStore persistence.VisibilityStore, logger log.Logger,
) ImmediateTaskProcessor {
bufferSize := cfg.AsyncService.ImmediateTaskQueue.ProcessorBufferSize
return &immediateTaskConcurrentProcessor{
Expand All @@ -48,7 +50,8 @@ func NewImmediateTaskConcurrentProcessor(
currentShards: map[int32]bool{},
taskToCommitChans: make(map[int32]chan<- data_models.ImmediateTask),
taskNotifier: notifier,
store: store,
processStore: processStore,
visibilityStore: visibilityStore,
logger: logger,
}
}
Expand Down Expand Up @@ -121,7 +124,7 @@ func (w *immediateTaskConcurrentProcessor) processImmediateTask(
return w.processVisibilityTask(ctx, task)
}

prep, err := w.store.PrepareStateExecution(ctx, data_models.PrepareStateExecutionRequest{
prep, err := w.processStore.PrepareStateExecution(ctx, data_models.PrepareStateExecutionRequest{
ProcessExecutionId: task.ProcessExecutionId,
StateExecutionId: data_models.StateExecutionId{
StateId: task.StateId,
Expand Down Expand Up @@ -208,7 +211,7 @@ func (w *immediateTaskConcurrentProcessor) processWaitUntilTask(
xcapi.WAIT_UNTIL_API)
}

compResp, err := w.store.ProcessWaitUntilExecution(ctx, data_models.ProcessWaitUntilExecutionRequest{
compResp, err := w.processStore.ProcessWaitUntilExecution(ctx, data_models.ProcessWaitUntilExecutionRequest{
ProcessExecutionId: task.ProcessExecutionId,
StateExecutionId: data_models.StateExecutionId{
StateId: task.StateId,
Expand Down Expand Up @@ -258,7 +261,7 @@ func (w *immediateTaskConcurrentProcessor) applyStateFailureRecoveryPolicy(
}
switch stateRecoveryPolicy.Policy {
case xcapi.FAIL_PROCESS_ON_STATE_FAILURE:
resp, errStopProcess := w.store.StopProcess(ctx, data_models.StopProcessRequest{
resp, errStopProcess := w.processStore.StopProcess(ctx, data_models.StopProcessRequest{
Namespace: prep.Info.Namespace,
ProcessId: prep.Info.ProcessId,
ProcessStopType: xcapi.FAIL,
Expand All @@ -277,7 +280,7 @@ func (w *immediateTaskConcurrentProcessor) applyStateFailureRecoveryPolicy(
return fmt.Errorf("cannot proceed to configured state because of missing state config")
}

err := w.store.RecoverFromStateExecutionFailure(ctx, data_models.RecoverFromStateExecutionFailureRequest{
err := w.processStore.RecoverFromStateExecutionFailure(ctx, data_models.RecoverFromStateExecutionFailureRequest{
Namespace: prep.Info.Namespace,
ProcessExecutionId: task.ProcessExecutionId,
SourceStateExecutionId: data_models.StateExecutionId{
Expand Down Expand Up @@ -441,7 +444,7 @@ func (w *immediateTaskConcurrentProcessor) processExecuteTask(
xcapi.EXECUTE_API)
}

compResp, err := w.store.CompleteExecuteExecution(ctx, data_models.CompleteExecuteExecutionRequest{
compResp, err := w.processStore.CompleteExecuteExecution(ctx, data_models.CompleteExecuteExecutionRequest{
ProcessExecutionId: task.ProcessExecutionId,
StateExecutionId: data_models.StateExecutionId{
StateId: task.StateId,
Expand Down Expand Up @@ -530,7 +533,7 @@ func (w *immediateTaskConcurrentProcessor) retryTask(
LastFailureStatus int32, LastFailureDetails string,
) error {
fireTimeUnixSeconds := time.Now().Unix() + int64(nextIntervalSecs)
err := w.store.BackoffImmediateTask(ctx, data_models.BackoffImmediateTaskRequest{
err := w.processStore.BackoffImmediateTask(ctx, data_models.BackoffImmediateTaskRequest{
LastFailureStatus: LastFailureStatus,
LastFailureDetails: LastFailureDetails,
Prep: prep,
Expand Down Expand Up @@ -589,7 +592,7 @@ func (w *immediateTaskConcurrentProcessor) composeHttpError(
func (w *immediateTaskConcurrentProcessor) processLocalQueueMessagesTask(
ctx context.Context, task data_models.ImmediateTask,
) error {
resp, err := w.store.ProcessLocalQueueMessages(ctx, data_models.ProcessLocalQueueMessagesRequest{
resp, err := w.processStore.ProcessLocalQueueMessages(ctx, data_models.ProcessLocalQueueMessagesRequest{
TaskShardId: task.ShardId,
TaskSequence: task.GetTaskSequence(),
ProcessExecutionId: task.ProcessExecutionId,
Expand Down Expand Up @@ -627,7 +630,7 @@ func (w *immediateTaskConcurrentProcessor) readAppDatabaseIfNeeded(
tag.JsonValue(prep.Info.StateConfig),
tag.JsonValue(prep.Info.AppDatabaseConfig))

resp, err := w.store.ReadAppDatabase(ctx, data_models.AppDatabaseReadRequest{
resp, err := w.processStore.ReadAppDatabase(ctx, data_models.AppDatabaseReadRequest{
AppDatabaseConfig: *prep.Info.AppDatabaseConfig,
Request: *prep.Info.StateConfig.AppDatabaseReadRequest,
})
Expand All @@ -652,7 +655,7 @@ func (w *immediateTaskConcurrentProcessor) loadLocalAttributesIfNeeded(
tag.StateExecutionId(task.GetStateExecutionId()),
tag.JsonValue(prep.Info.StateConfig))

resp, err := w.store.LoadLocalAttributes(ctx, data_models.LoadLocalAttributesRequest{
resp, err := w.processStore.LoadLocalAttributes(ctx, data_models.LoadLocalAttributesRequest{
ProcessExecutionId: task.ProcessExecutionId,
Request: *prep.Info.StateConfig.LoadLocalAttributesRequest,
})
Expand Down
4 changes: 2 additions & 2 deletions extensions/postgres/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/xcherryio/xcherry/extensions/postgres"
"github.com/xcherryio/xcherry/extensions/postgres/postgrestool"

"github.com/xcherryio/xcherry/persistence/sql"
"github.com/xcherryio/xcherry/persistence/process"
)

var store persistence.ProcessStore
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestMain(m *testing.M) {
panic(err)
}

store, err = sql.NewSQLProcessStore(*sqlConfig, log.NewDevelopmentLogger())
store, err = process.NewSQLProcessStore(*sqlConfig, log.NewDevelopmentLogger())
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/postgres/tests/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/xcherryio/xcherry/persistence/sql/sqltest"
"github.com/xcherryio/xcherry/persistence/process/sqltest"
)

func TestBasic(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion integTests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func TestMain(m *testing.M) {
},
},
Database: config.DatabaseConfig{
SQL: sqlConfig,
ProcessStoreConfig: sqlConfig,
VisibilityStoreConfig: sqlConfig,
},
AsyncService: config.AsyncServiceConfig{
Mode: config.AsyncServiceModeStandalone,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import "context"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 XDBLab Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package sql
package process

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2023 xCherryIO Organization
// SPDX-License-Identifier: Apache-2.0

package sql
package process

import (
"context"
Expand Down
Loading
Loading