Skip to content

Commit b43ad9f

Browse files
committed
Customizable DataBlob encoding type
1 parent f7779c0 commit b43ad9f

File tree

93 files changed

+634
-563
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+634
-563
lines changed

common/persistence/cassandra/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func extractCurrentWorkflowConflictError(
203203
binary, _ := conflictRecord["execution_state"].([]byte)
204204
encoding, _ := conflictRecord["execution_state_encoding"].(string)
205205
executionState := &persistencespb.WorkflowExecutionState{}
206-
if state, err := serialization.WorkflowExecutionStateFromBlob(p.NewDataBlob(binary, encoding)); err == nil {
206+
if state, err := serialization.DefaultDecoder.WorkflowExecutionStateFromBlob(p.NewDataBlob(binary, encoding)); err == nil {
207207
executionState = state
208208
}
209209
// if err != nil, this means execution state cannot be parsed, just use default values

common/persistence/cassandra/errors_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ func (s *cassandraErrorsSuite) TestExtractCurrentWorkflowConflictError_Success()
228228
},
229229
},
230230
}
231-
blob, err := serialization.WorkflowExecutionStateToBlob(workflowState)
231+
blob, err := serialization.NewSerializer().WorkflowExecutionStateToBlob(workflowState)
232232
lastWriteVersion := rand.Int63()
233233
s.NoError(err)
234234
t := rowTypeExecution

common/persistence/cassandra/execution_store.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
p "go.temporal.io/server/common/persistence"
88
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
9+
"go.temporal.io/server/common/persistence/serialization"
910
)
1011

1112
// Guidelines for creating new special UUID constants
@@ -84,11 +85,11 @@ type (
8485

8586
var _ p.ExecutionStore = (*ExecutionStore)(nil)
8687

87-
func NewExecutionStore(session gocql.Session) *ExecutionStore {
88+
func NewExecutionStore(session gocql.Session, serializer serialization.Serializer) *ExecutionStore {
8889
return &ExecutionStore{
89-
HistoryStore: NewHistoryStore(session),
90-
MutableStateStore: NewMutableStateStore(session),
91-
MutableStateTaskStore: NewMutableStateTaskStore(session),
90+
HistoryStore: NewHistoryStore(session, serializer),
91+
MutableStateStore: NewMutableStateStore(session, serializer),
92+
MutableStateTaskStore: NewMutableStateTaskStore(session, serializer),
9293
}
9394
}
9495

common/persistence/cassandra/factory.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"go.temporal.io/server/common/metrics"
1111
p "go.temporal.io/server/common/persistence"
1212
commongocql "go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
13+
"go.temporal.io/server/common/persistence/serialization"
1314
"go.temporal.io/server/common/resolver"
1415
)
1516

@@ -21,6 +22,7 @@ type (
2122
clusterName string
2223
logger log.Logger
2324
session commongocql.Session
25+
serializer serialization.Serializer
2426
}
2527
)
2628

@@ -32,6 +34,7 @@ func NewFactory(
3234
clusterName string,
3335
logger log.Logger,
3436
metricsHandler metrics.Handler,
37+
serializer serialization.Serializer,
3538
) *Factory {
3639
session, err := commongocql.NewSession(
3740
func() (*gocql.ClusterConfig, error) {
@@ -43,7 +46,13 @@ func NewFactory(
4346
if err != nil {
4447
logger.Fatal("unable to initialize cassandra session", tag.Error(err))
4548
}
46-
return NewFactoryFromSession(cfg, clusterName, logger, session)
49+
return NewFactoryFromSession(
50+
cfg,
51+
clusterName,
52+
logger,
53+
session,
54+
serializer,
55+
)
4756
}
4857

4958
// NewFactoryFromSession returns an instance of a factory object from the given session.
@@ -52,12 +61,14 @@ func NewFactoryFromSession(
5261
clusterName string,
5362
logger log.Logger,
5463
session commongocql.Session,
64+
serializer serialization.Serializer,
5565
) *Factory {
5666
return &Factory{
5767
cfg: cfg,
5868
clusterName: clusterName,
5969
logger: logger,
6070
session: session,
71+
serializer: serializer,
6172
}
6273
}
6374

@@ -88,7 +99,7 @@ func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error) {
8899

89100
// NewExecutionStore returns a new ExecutionStore.
90101
func (f *Factory) NewExecutionStore() (p.ExecutionStore, error) {
91-
return NewExecutionStore(f.session), nil
102+
return NewExecutionStore(f.session, f.serializer), nil
92103
}
93104

94105
// NewQueue returns a new queue backed by cassandra

common/persistence/cassandra/history_store.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"go.temporal.io/api/serviceerror"
88
p "go.temporal.io/server/common/persistence"
99
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
10+
"go.temporal.io/server/common/persistence/serialization"
1011
"go.temporal.io/server/common/primitives"
1112
)
1213

@@ -44,13 +45,17 @@ const (
4445
type (
4546
HistoryStore struct {
4647
Session gocql.Session
47-
p.HistoryBranchUtilImpl
48+
p.HistoryBranchUtil
4849
}
4950
)
5051

51-
func NewHistoryStore(session gocql.Session) *HistoryStore {
52+
func NewHistoryStore(
53+
session gocql.Session,
54+
serializer serialization.Serializer,
55+
) *HistoryStore {
5256
return &HistoryStore{
53-
Session: session,
57+
Session: session,
58+
HistoryBranchUtil: p.NewHistoryBranchUtil(serializer),
5459
}
5560
}
5661

@@ -137,7 +142,7 @@ func (h *HistoryStore) ReadHistoryBranch(
137142
ctx context.Context,
138143
request *p.InternalReadHistoryBranchRequest,
139144
) (*p.InternalReadHistoryBranchResponse, error) {
140-
branch, err := h.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
145+
branch, err := h.ParseHistoryBranchInfo(request.BranchToken)
141146
if err != nil {
142147
return nil, err
143148
}
@@ -346,7 +351,7 @@ func (h *HistoryStore) GetHistoryTreeContainingBranch(
346351
request *p.InternalGetHistoryTreeContainingBranchRequest,
347352
) (*p.InternalGetHistoryTreeContainingBranchResponse, error) {
348353

349-
branch, err := h.GetHistoryBranchUtil().ParseHistoryBranchInfo(request.BranchToken)
354+
branch, err := h.ParseHistoryBranchInfo(request.BranchToken)
350355
if err != nil {
351356
return nil, err
352357
}
@@ -391,6 +396,10 @@ func (h *HistoryStore) GetHistoryTreeContainingBranch(
391396
}, nil
392397
}
393398

399+
func (h *HistoryStore) GetHistoryBranchUtil() p.HistoryBranchUtil {
400+
return h.HistoryBranchUtil
401+
}
402+
394403
func convertHistoryNode(
395404
message map[string]interface{},
396405
) p.InternalHistoryNode {

common/persistence/cassandra/mutable_state_store.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -363,13 +363,15 @@ const (
363363

364364
type (
365365
MutableStateStore struct {
366-
Session gocql.Session
366+
Session gocql.Session
367+
serializer serialization.Serializer
367368
}
368369
)
369370

370-
func NewMutableStateStore(session gocql.Session) *MutableStateStore {
371+
func NewMutableStateStore(session gocql.Session, serializer serialization.Serializer) *MutableStateStore {
371372
return &MutableStateStore{
372-
Session: session,
373+
Session: session,
374+
serializer: serializer,
373375
}
374376
}
375377

@@ -648,7 +650,7 @@ func (d *MutableStateStore) UpdateWorkflowExecution(
648650
lastWriteVersion := updateWorkflow.LastWriteVersion
649651

650652
// TODO: double encoding execution state? already in updateWorkflow.ExecutionStateBlob
651-
executionStateDatablob, err := serialization.WorkflowExecutionStateToBlob(updateWorkflow.ExecutionState)
653+
executionStateDatablob, err := d.serializer.WorkflowExecutionStateToBlob(updateWorkflow.ExecutionState)
652654
if err != nil {
653655
return err
654656
}
@@ -962,7 +964,7 @@ func (d *MutableStateStore) GetCurrentExecution(
962964
}
963965

964966
// TODO: fix blob ExecutionState in storage should not be a blob.
965-
executionState, err := serialization.WorkflowExecutionStateFromBlob(executionStateBlob)
967+
executionState, err := d.serializer.WorkflowExecutionStateFromBlob(executionStateBlob)
966968
if err != nil {
967969
return nil, err
968970
}

common/persistence/cassandra/mutable_state_task_store.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,15 @@ const (
162162

163163
type (
164164
MutableStateTaskStore struct {
165-
Session gocql.Session
165+
Session gocql.Session
166+
serializer serialization.Serializer
166167
}
167168
)
168169

169-
func NewMutableStateTaskStore(session gocql.Session) *MutableStateTaskStore {
170+
func NewMutableStateTaskStore(session gocql.Session, serializer serialization.Serializer) *MutableStateTaskStore {
170171
return &MutableStateTaskStore{
171-
Session: session,
172+
Session: session,
173+
serializer: serializer,
172174
}
173175
}
174176

@@ -503,7 +505,7 @@ func (d *MutableStateTaskStore) PutReplicationTaskToDLQ(
503505
request *p.PutReplicationTaskToDLQRequest,
504506
) error {
505507
task := request.TaskInfo
506-
datablob, err := serialization.ReplicationTaskInfoToBlob(task)
508+
datablob, err := d.serializer.ReplicationTaskInfoToBlob(task)
507509
if err != nil {
508510
return gocql.ConvertError("PutReplicationTaskToDLQ", err)
509511
}

common/persistence/cassandra/queue_store.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ const (
3030

3131
type (
3232
QueueStore struct {
33-
queueType persistence.QueueType
34-
session gocql.Session
35-
logger log.Logger
33+
queueType persistence.QueueType
34+
session gocql.Session
35+
logger log.Logger
36+
serializer serialization.Serializer
3637
}
3738
)
3839

@@ -42,9 +43,10 @@ func NewQueueStore(
4243
logger log.Logger,
4344
) (persistence.Queue, error) {
4445
return &QueueStore{
45-
queueType: queueType,
46-
session: session,
47-
logger: logger,
46+
queueType: queueType,
47+
session: session,
48+
logger: logger,
49+
serializer: serialization.NewSerializer(),
4850
}, nil
4951
}
5052

@@ -300,7 +302,7 @@ func (q *QueueStore) getQueueMetadata(
300302
return nil, err
301303
}
302304

303-
return convertQueueMetadata(message)
305+
return convertQueueMetadata(message, q.serializer)
304306
}
305307

306308
func (q *QueueStore) updateAckLevel(
@@ -310,7 +312,7 @@ func (q *QueueStore) updateAckLevel(
310312
) error {
311313

312314
// TODO: remove this once cluster_ack_level is removed from DB
313-
metadataStruct, err := serialization.QueueMetadataFromBlob(metadata.Blob)
315+
metadataStruct, err := q.serializer.QueueMetadataFromBlob(metadata.Blob)
314316
if err != nil {
315317
return gocql.ConvertError("updateAckLevel", err)
316318
}
@@ -385,6 +387,7 @@ func convertQueueMessage(
385387

386388
func convertQueueMetadata(
387389
message map[string]interface{},
390+
serializer serialization.Serializer,
388391
) (*persistence.InternalQueueMetadata, error) {
389392

390393
metadata := &persistence.InternalQueueMetadata{
@@ -394,7 +397,7 @@ func convertQueueMetadata(
394397
if ok {
395398
clusterAckLevel := message["cluster_ack_level"].(map[string]int64)
396399
// TODO: remove this once we remove cluster_ack_level from DB.
397-
blob, err := serialization.QueueMetadataToBlob(&persistencespb.QueueMetadata{ClusterAckLevels: clusterAckLevel})
400+
blob, err := serializer.QueueMetadataToBlob(&persistencespb.QueueMetadata{ClusterAckLevels: clusterAckLevel})
398401
if err != nil {
399402
return nil, err
400403
}

common/persistence/client/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func (f *factoryImpl) NewHistoryTaskQueueManager() (persistence.HistoryTaskQueue
225225
if err != nil {
226226
return nil, err
227227
}
228-
return persistence.NewHistoryTaskQueueManager(q, serialization.NewSerializer()), nil
228+
return persistence.NewHistoryTaskQueueManager(q, f.serializer), nil
229229
}
230230

231231
func (f *factoryImpl) NewNexusEndpointManager() (persistence.NexusEndpointManager, error) {

common/persistence/client/fx.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type (
5252
Logger log.Logger
5353
HealthSignals persistence.HealthSignalAggregator
5454
DynamicRateLimitingParams DynamicRateLimitingParams
55+
Serializer serialization.Serializer
5556
}
5657

5758
FactoryProviderFn func(NewFactoryParams) Factory
@@ -83,6 +84,7 @@ func ClusterNameProvider(config *cluster.Config) ClusterName {
8384
func EventBlobCacheProvider(
8485
dc *dynamicconfig.Collection,
8586
logger log.Logger,
87+
serializer serialization.Serializer,
8688
) persistence.XDCCache {
8789
return persistence.NewEventsBlobCache(
8890
dynamicconfig.XDCCacheMaxSizeBytes.Get(dc)(),
@@ -128,7 +130,7 @@ func FactoryProvider(
128130
systemRequestRateLimiter,
129131
namespaceRequestRateLimiter,
130132
shardRequestRateLimiter,
131-
serialization.NewSerializer(),
133+
params.Serializer,
132134
params.EventBlobCache,
133135
string(params.ClusterName),
134136
params.MetricsHandler,
@@ -163,14 +165,15 @@ func DataStoreFactoryProvider(
163165
logger log.Logger,
164166
metricsHandler metrics.Handler,
165167
tracerProvider trace.TracerProvider,
168+
serializer serialization.Serializer,
166169
) persistence.DataStoreFactory {
167170
var dataStoreFactory persistence.DataStoreFactory
168171
defaultStoreCfg := cfg.DataStores[cfg.DefaultStore]
169172
switch {
170173
case defaultStoreCfg.Cassandra != nil:
171-
dataStoreFactory = cassandra.NewFactory(*defaultStoreCfg.Cassandra, r, string(clusterName), logger, metricsHandler)
174+
dataStoreFactory = cassandra.NewFactory(*defaultStoreCfg.Cassandra, r, string(clusterName), logger, metricsHandler, serializer)
172175
case defaultStoreCfg.SQL != nil:
173-
dataStoreFactory = sql.NewFactory(*defaultStoreCfg.SQL, r, string(clusterName), logger, metricsHandler)
176+
dataStoreFactory = sql.NewFactory(*defaultStoreCfg.SQL, r, string(clusterName), serializer, logger, metricsHandler)
174177
case defaultStoreCfg.CustomDataStoreConfig != nil:
175178
dataStoreFactory = abstractDataStoreFactory.NewFactory(*defaultStoreCfg.CustomDataStoreConfig, r, string(clusterName), logger, metricsHandler)
176179
default:

0 commit comments

Comments
 (0)