From e53e3b138d8d017abe4a2c096ccff7da4c1af662 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 29 Dec 2023 11:44:30 +0000 Subject: [PATCH 1/2] Possibly have a fix for shard replication for clusters with different sizes --- service/frontend/adminHandler.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 7de3145c1fe..9a145b85419 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -967,6 +967,8 @@ func (adh *adminHandlerImpl) GetReplicationMessages( return nil, adh.error(errClusterNameNotSet, scope) } + request = filterReplicationShards(adh.config.NumHistoryShards, request) + resp, err = adh.GetHistoryRawClient().GetReplicationMessages(ctx, request) if err != nil { return nil, adh.error(err, scope) @@ -1810,3 +1812,30 @@ func convertFilterListToMap(filters []*types.DynamicConfigFilter) (map[dc.Filter } return newFilters, nil } + +// Ensures that shards received are within the range of the server, +// or they'll be filtered out. This necessarily means that replication requests +// that are for shards that don't exist are dropped. +// +// Replication is therefore the min(clusterA.NumberHistoryShards, clusterB.NumberHistoryShards). +// Shards that are above this limit will just not be replicated. This is a +// compromise because I've not found any way to translate shards from higher-shards +// to lower ones. +// +// Simple division (such as divide by 2) doesn't work because each +// replication response returns offsets for the shard (like kafka), so that +// consumers can manage their own offset locations. Any attempts to do +// reductions or translations would require keeping track of the offsets +// of the translated shards. +func filterReplicationShards(maxShards int, req *types.GetReplicationMessagesRequest) *types.GetReplicationMessagesRequest { + var out []*types.ReplicationToken + for _, v := range req.Tokens { + if int(v.ShardID) < maxShards { + out = append(out, v) + } + } + return &types.GetReplicationMessagesRequest{ + Tokens: out, + ClusterName: req.ClusterName, + } +} From 52940439c813a15c1c224b1bef53704343b1a206 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 29 Dec 2023 12:07:30 +0000 Subject: [PATCH 2/2] Adds a test --- service/frontend/adminHandler.go | 12 ++++- service/frontend/adminHandler_test.go | 68 +++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 9a145b85419..c3bf5d870c0 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -967,9 +967,17 @@ func (adh *adminHandlerImpl) GetReplicationMessages( return nil, adh.error(errClusterNameNotSet, scope) } - request = filterReplicationShards(adh.config.NumHistoryShards, request) + // Ensures that the request is able to be fulfilled by the current cluster. + // If this cluster receives a request for replication from shards that are + // higher than what it can handle, it'll drop them. + filteredRequest := filterReplicationShards(adh.config.NumHistoryShards, request) + if len(filteredRequest.Tokens) != len(request.Tokens) { + adh.GetLogger().Warn("Warning! Received replication request from a cluster with a greater number of shards."+ + "Some workflows will not be replicated they are active in this larger cluster"+ + "and are intended to be replicated here", tag.ClusterName(request.ClusterName)) + } - resp, err = adh.GetHistoryRawClient().GetReplicationMessages(ctx, request) + resp, err = adh.GetHistoryRawClient().GetReplicationMessages(ctx, filteredRequest) if err != nil { return nil, adh.error(err, scope) } diff --git a/service/frontend/adminHandler_test.go b/service/frontend/adminHandler_test.go index e174e1308ff..eed4eba9ccf 100644 --- a/service/frontend/adminHandler_test.go +++ b/service/frontend/adminHandler_test.go @@ -1038,3 +1038,71 @@ func Test_UpdateDomainIsolationGroups(t *testing.T) { }) } } + +func TestShardFiltering(t *testing.T) { + + tests := map[string]struct { + input *types.GetReplicationMessagesRequest + expectedOut *types.GetReplicationMessagesRequest + expectedErr error + }{ + "Normal replication - no changes whatsoever expected - replication from 4 shards to 4 shards clusters": { + input: &types.GetReplicationMessagesRequest{ + Tokens: []*types.ReplicationToken{ + {ShardID: 0, LastRetrievedMessageID: 0, LastProcessedMessageID: 0}, + {ShardID: 1, LastRetrievedMessageID: 1, LastProcessedMessageID: 1}, + {ShardID: 2, LastRetrievedMessageID: 2, LastProcessedMessageID: 2}, + {ShardID: 3, LastRetrievedMessageID: 3, LastProcessedMessageID: 3}, + }, + ClusterName: "cluster2-with-4-shards", + }, + expectedOut: &types.GetReplicationMessagesRequest{ + Tokens: []*types.ReplicationToken{ + {ShardID: 0, LastRetrievedMessageID: 0, LastProcessedMessageID: 0}, + {ShardID: 1, LastRetrievedMessageID: 1, LastProcessedMessageID: 1}, + {ShardID: 2, LastRetrievedMessageID: 2, LastProcessedMessageID: 2}, + {ShardID: 3, LastRetrievedMessageID: 3, LastProcessedMessageID: 3}, + }, + ClusterName: "cluster2-with-4-shards", + }, + }, + "filtering replication - the new cluster's asking for shards that aren't present": { + input: &types.GetReplicationMessagesRequest{ + Tokens: []*types.ReplicationToken{ + {ShardID: 3, LastRetrievedMessageID: 3, LastProcessedMessageID: 3}, + {ShardID: 4, LastRetrievedMessageID: 0, LastProcessedMessageID: 0}, + {ShardID: 5, LastRetrievedMessageID: 1, LastProcessedMessageID: 1}, + {ShardID: 6, LastRetrievedMessageID: 2, LastProcessedMessageID: 2}, + }, + ClusterName: "cluster2-with-8-shards", + }, + expectedOut: &types.GetReplicationMessagesRequest{ + Tokens: []*types.ReplicationToken{ + {ShardID: 3, LastRetrievedMessageID: 3, LastProcessedMessageID: 3}, + }, + ClusterName: "cluster2-with-8-shards", + }, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + goMock := gomock.NewController(t) + mockhistoryclient := history.NewMockClient(goMock) + + mockhistoryclient.EXPECT().GetReplicationMessages(gomock.Any(), td.expectedOut).Return(&types.GetReplicationMessagesResponse{}, nil) + + handler := adminHandlerImpl{ + config: &Config{NumHistoryShards: 4}, + Resource: &resource.Test{ + Logger: testlogger.New(t), + MetricsClient: metrics.NewNoopMetricsClient(), + HistoryClient: mockhistoryclient, + }, + } + + _, err := handler.GetReplicationMessages(context.Background(), td.input) + assert.Equal(t, td.expectedErr, err) + }) + } +}