-
Notifications
You must be signed in to change notification settings - Fork 809
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix/corrects replication in clusters of different sizes #5567
base: master
Are you sure you want to change the base?
Bugfix/corrects replication in clusters of different sizes #5567
Conversation
|
||
// Ensures that shards received are within the range of the server, | ||
// or they'll be filtered out. This necessarily means that replication requests | ||
// that are for shards that don't exist are dropped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the replication is fundamentally can't be done completely in such setups, should we even attempt to do the partial replication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be - as far as I can tell, successfully be done when scaling a cluster up, in terms of the number of shards. Just not down. This strikes me as useful.
I don't think there's any value in doing partial replication, it's just hard to prevent it in such scenarios. The poller information doesn't presently convey the shard count, and such an API change seems like overkill.
// higher than what it can handle, it'll drop them. | ||
filteredRequest := filterReplicationShards(adh.config.NumHistoryShards, request) | ||
if len(filteredRequest.Tokens) != len(request.Tokens) { | ||
adh.GetLogger().Warn("Warning! Received replication request from a cluster with a greater number of shards."+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
including the number of shards in current and requesting cluster to the log would be useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no obvious way (that I'm aware of) to get the requesting cluster's shard count, else I would
goMock := gomock.NewController(t) | ||
mockhistoryclient := history.NewMockClient(goMock) | ||
|
||
mockhistoryclient.EXPECT().GetReplicationMessages(gomock.Any(), td.expectedOut).Return(&types.GetReplicationMessagesResponse{}, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mock line validates GetReplicationMessages
is called with tc.expectedOut
or does it need Times(1)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no explicit Times
is equivalent to MinTimes(1)
... but that's probably worth verifying.
@davidporter-id-au thank you so much for putting this together. I was hoping to get some insight into testing strategy for this. Do you think it is likely that this change will get merged into the next release? I'm just getting a feel for if we should plan on cherry picking this PR and making an internal build for our uses vs grabbing an upcoming release instead. I really do appreciate you and the team doing this. Let me know if there's anything I can do to help! |
// higher than what it can handle, it'll drop them. | ||
filteredRequest := filterReplicationShards(adh.config.NumHistoryShards, request) | ||
if len(filteredRequest.Tokens) != len(request.Tokens) { | ||
adh.GetLogger().Warn("Warning! Received replication request from a cluster with a greater number of shards."+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is definitely a useful log message to have, but unfortunately, it also means that there is a high degree of frequency with which it gets logged. For example, if you are going from 1k shards to 8k (e.g., MySQL to Cassandra), you will get 7k of these logged on every replication cycle.
Could we change it to either be debug or only log once per cluster/etc? I've attached a screenshot of what the logs look like for a very small reproduction case (1 vs 4 shards) I put together in Docker Compose using this PR's patch on top of v1.2.6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for not being on the ball with this one.
That's way too much noise, agreed. when you're running that however, is the active side on the larger cluster? that should be the scenario that's broken and will not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for digging into it! In this case, the active side is the lower shard count one (with an unfortunate name due to historical reasons). We actually didn't have any domains set up to replicate - these logs were happening out of the box.
I'm happy to share my code, if that would help with testing. It's a fairly concise docker-compose with 3 clusters (one with fewer shards) set up to replicate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah if you could post the config that'd be helpful for me to understand what's going on. I'm blocked from testing it more internally due to some other PR we need to fix, but otherwise I'll be able to take more of a look hopefully later next week
@@ -1038,3 +1038,71 @@ func Test_UpdateDomainIsolationGroups(t *testing.T) { | |||
}) | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving this comment here as a reminder to update documentation https://github.com/uber/Cadence-Docs/pull/181/files
👋 Hi folks. Re: the docs update in cadence-workflow/Cadence-Docs#181 I was wondering if there is still a plan to merge this PR and support migrating from a smaller -> larger number of shards? Thanks! |
What changed?
This tests/fixes replication between clusters of different sizes. This is not super well tested, so please treat as an alpha feature.
Intention and where this works
Smaller # of shards -> larger cluster: Should be ok:
This is a bugfix for replication between clusters where the shard numbers aren't the same. The high-level approach is to only partially support this replication with a very simple bugfix for now. This adds the approach of simple shard filtering where requests for shards higher than what are supported are simply dropped.
The intention is to fix this for the use-case where a cluster has a small number of shards and there is an intention to connect it to a larger cluster for expansion. In this use-case the traffic for domains active on the old (smaller) cluster should be able to be replicated to the new (larger) cluster as usual. I've done some preliminary testing to confirm this is the case.
During this replication workflows moving from the smaller cluster to the larger will move history shards in the database as they are replicated. Ie, if a workflow in small-cluster hashed to shard 123, it would hash to some other hash in the new cluster. Though this shouldn't affect anything.
Caveats
Don't use this to replicate from larger clusters to smaller:
However, in the scenario that a domain is active in the new (larger) cluster and it's replicating back to the old cluster workflows will be partially dropped. The drop is partial simply because it's more convenient to drop too large replication requests rather than dropping all since cluster metadata to the receiving cluster isn't available; it's not possible to know what the requesting cluster's shard size is without some more significant API changes.
Why not use some shard translation scheme like /2
When I first was poking around with this problem I was intending to just see if I could connect a cluster power-two size differences and half / double the shards in translation. This isn't possible however, without a stateful intermediary, because of the replication API: The replication API keepts track of offsets on the producer side, meaning that any reduction of 2 shards into a single one would need to solve the problem of keeping track of both offsets somehow.
Why?
How did you test it?
Tested locally with a multicluster setup
Potential risks
This change could break replication and should be treated with care.
Release notes
Documentation Changes
This change probably requires updating the migration documentation for replication-based migration.