Skip to content

Commit d347eed

Browse files
ti-chi-botdisksingti-chi-bot[bot]
authored
dr-autosync: add recover timeout (#6295) (#8775)
close #4939 Signed-off-by: husharp <[email protected]> Co-authored-by: disksing <[email protected]> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent a4a1da7 commit d347eed

File tree

3 files changed

+62
-28
lines changed

3 files changed

+62
-28
lines changed

Diff for: server/config/config.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -1375,13 +1375,14 @@ func NormalizeReplicationMode(m string) string {
13751375

13761376
// DRAutoSyncReplicationConfig is the configuration for auto sync mode between 2 data centers.
13771377
type DRAutoSyncReplicationConfig struct {
1378-
LabelKey string `toml:"label-key" json:"label-key"`
1379-
Primary string `toml:"primary" json:"primary"`
1380-
DR string `toml:"dr" json:"dr"`
1381-
PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"`
1382-
DRReplicas int `toml:"dr-replicas" json:"dr-replicas"`
1383-
WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"`
1384-
PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"`
1378+
LabelKey string `toml:"label-key" json:"label-key"`
1379+
Primary string `toml:"primary" json:"primary"`
1380+
DR string `toml:"dr" json:"dr"`
1381+
PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"`
1382+
DRReplicas int `toml:"dr-replicas" json:"dr-replicas"`
1383+
WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"`
1384+
WaitRecoverTimeout typeutil.Duration `toml:"wait-recover-timeout" json:"wait-recover-timeout"`
1385+
PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"`
13851386
}
13861387

13871388
func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {

Diff for: server/replication/replication_mode.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ const (
212212
type drAutoSyncStatus struct {
213213
State string `json:"state,omitempty"`
214214
StateID uint64 `json:"state_id,omitempty"`
215+
AsyncStartTime *time.Time `json:"async_start,omitempty"`
215216
RecoverStartTime *time.Time `json:"recover_start,omitempty"`
216217
TotalRegions int `json:"total_regions,omitempty"`
217218
SyncedRegions int `json:"synced_regions,omitempty"`
@@ -262,7 +263,8 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error {
262263
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
263264
return err
264265
}
265-
dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores}
266+
now := time.Now()
267+
dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores, AsyncStartTime: &now}
266268
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
267269
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
268270
return err
@@ -272,6 +274,15 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error {
272274
return nil
273275
}
274276

277+
func (m *ModeManager) drDurationSinceAsyncStart() time.Duration {
278+
m.RLock()
279+
defer m.RUnlock()
280+
if m.drAutoSync.AsyncStartTime == nil {
281+
return 0
282+
}
283+
return time.Since(*m.drAutoSync.AsyncStartTime)
284+
}
285+
275286
func (m *ModeManager) drSwitchToSyncRecover() error {
276287
m.Lock()
277288
defer m.Unlock()
@@ -471,7 +482,7 @@ func (m *ModeManager) tickUpdateState() {
471482
m.drSwitchToAsync(storeIDs[primaryUp])
472483
}
473484
case drStateAsync:
474-
if canSync {
485+
if canSync && m.drDurationSinceAsyncStart() > m.config.DRAutoSync.WaitRecoverTimeout.Duration {
475486
m.drSwitchToSyncRecover()
476487
break
477488
}

Diff for: server/replication/replication_mode_test.go

+41-19
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package replication
1616

1717
import (
1818
"context"
19+
"encoding/json"
1920
"errors"
2021
"fmt"
2122
"testing"
@@ -159,6 +160,20 @@ func newMockReplicator(ids []uint64) *mockFileReplicator {
159160
}
160161
}
161162

163+
func assertLastData(t *testing.T, data string, state string, stateID uint64, availableStores []uint64) {
164+
type status struct {
165+
State string `json:"state"`
166+
StateID uint64 `json:"state_id"`
167+
AvailableStores []uint64 `json:"available_stores"`
168+
}
169+
var s status
170+
err := json.Unmarshal([]byte(data), &s)
171+
require.NoError(t, err)
172+
require.Equal(t, state, s.State)
173+
require.Equal(t, stateID, s.StateID)
174+
require.Equal(t, availableStores, s.AvailableStores)
175+
}
176+
162177
func TestStateSwitch(t *testing.T) {
163178
re := require.New(t)
164179
ctx, cancel := context.WithCancel(context.Background())
@@ -190,7 +205,7 @@ func TestStateSwitch(t *testing.T) {
190205
stateID := rep.drAutoSync.StateID
191206
re.NotEqual(uint64(0), stateID)
192207
rep.tickReplicateStatus()
193-
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1])
208+
assertLastData(t, replicator.lastData[1], "sync", stateID, nil)
194209
assertStateIDUpdate := func() {
195210
re.NotEqual(stateID, rep.drAutoSync.StateID)
196211
stateID = rep.drAutoSync.StateID
@@ -207,7 +222,7 @@ func TestStateSwitch(t *testing.T) {
207222
re.Equal(drStateAsyncWait, rep.drGetState())
208223
assertStateIDUpdate()
209224
rep.tickReplicateStatus()
210-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
225+
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4})
211226

212227
re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit())
213228
conf.DRAutoSync.PauseRegionSplit = true
@@ -218,7 +233,7 @@ func TestStateSwitch(t *testing.T) {
218233
rep.tickUpdateState()
219234
assertStateIDUpdate()
220235
rep.tickReplicateStatus()
221-
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
236+
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4})
222237

223238
// add new store in dr zone.
224239
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"})
@@ -268,18 +283,19 @@ func TestStateSwitch(t *testing.T) {
268283
rep.tickUpdateState()
269284
re.Equal(drStateAsyncWait, rep.drGetState())
270285
assertStateIDUpdate()
286+
271287
rep.tickReplicateStatus()
272-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
288+
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4})
273289
setStoreState(cluster, "down", "up", "up", "up", "down", "down")
274290
rep.tickUpdateState()
275291
assertStateIDUpdate()
276292
rep.tickReplicateStatus()
277-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1])
293+
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{2, 3, 4})
278294
setStoreState(cluster, "up", "down", "up", "up", "down", "down")
279295
rep.tickUpdateState()
280296
assertStateIDUpdate()
281297
rep.tickReplicateStatus()
282-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])
298+
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 3, 4})
283299

284300
// async_wait -> async
285301
rep.tickUpdateState()
@@ -291,26 +307,32 @@ func TestStateSwitch(t *testing.T) {
291307
rep.tickUpdateState()
292308
assertStateIDUpdate()
293309
rep.tickReplicateStatus()
294-
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])
310+
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4})
295311

296312
// async -> async
297313
setStoreState(cluster, "up", "up", "up", "up", "down", "down")
298314
rep.tickUpdateState()
299315
// store 2 won't be available before it syncs status.
300316
rep.tickReplicateStatus()
301-
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])
317+
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4})
302318
syncStoreStatus(1, 2, 3, 4)
303319
rep.tickUpdateState()
304320
assertStateIDUpdate()
305321
rep.tickReplicateStatus()
306-
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
322+
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4})
307323

308324
// async -> sync_recover
309325
setStoreState(cluster, "up", "up", "up", "up", "up", "up")
310326
rep.tickUpdateState()
311327
re.Equal(drStateSyncRecover, rep.drGetState())
312328
assertStateIDUpdate()
329+
313330
rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5})
331+
rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(time.Hour)
332+
rep.tickUpdateState()
333+
re.Equal(drStateAsync, rep.drGetState()) // wait recover timeout
334+
335+
rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(0)
314336
setStoreState(cluster, "down", "up", "up", "up", "up", "up")
315337
rep.tickUpdateState()
316338
re.Equal(drStateSyncRecover, rep.drGetState())
@@ -387,27 +409,27 @@ func TestReplicateState(t *testing.T) {
387409
stateID := rep.drAutoSync.StateID
388410
// replicate after initialized
389411
rep.tickReplicateStatus()
390-
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1])
412+
assertLastData(t, replicator.lastData[1], "sync", stateID, nil)
391413

392414
// repliate state to new member
393415
replicator.memberIDs = append(replicator.memberIDs, 2, 3)
394416
rep.tickReplicateStatus()
395-
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2])
396-
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[3])
417+
assertLastData(t, replicator.lastData[2], "sync", stateID, nil)
418+
assertLastData(t, replicator.lastData[3], "sync", stateID, nil)
397419

398420
// inject error
399421
replicator.errors[2] = errors.New("failed to persist")
400422
rep.tickUpdateState() // switch async_wait since there is only one zone
401423
newStateID := rep.drAutoSync.StateID
402424
rep.tickReplicateStatus()
403-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[1])
404-
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2])
405-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[3])
425+
assertLastData(t, replicator.lastData[1], "async_wait", newStateID, []uint64{1, 2})
426+
assertLastData(t, replicator.lastData[2], "sync", stateID, nil)
427+
assertLastData(t, replicator.lastData[3], "async_wait", newStateID, []uint64{1, 2})
406428

407429
// clear error, replicate to node 2 next time
408430
delete(replicator.errors, 2)
409431
rep.tickReplicateStatus()
410-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[2])
432+
assertLastData(t, replicator.lastData[2], "async_wait", newStateID, []uint64{1, 2})
411433
}
412434

413435
func TestAsynctimeout(t *testing.T) {
@@ -637,7 +659,7 @@ func TestComplexPlacementRules(t *testing.T) {
637659
rep.tickUpdateState()
638660
re.Equal(drStateAsyncWait, rep.drGetState())
639661
rep.tickReplicateStatus()
640-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1])
662+
assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4, 5, 6})
641663

642664
// reset to sync
643665
setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up")
@@ -698,7 +720,7 @@ func TestComplexPlacementRules2(t *testing.T) {
698720
rep.tickUpdateState()
699721
re.Equal(drStateAsyncWait, rep.drGetState())
700722
rep.tickReplicateStatus()
701-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
723+
assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4})
702724
}
703725

704726
func TestComplexPlacementRules3(t *testing.T) {
@@ -737,7 +759,7 @@ func TestComplexPlacementRules3(t *testing.T) {
737759
rep.tickUpdateState()
738760
re.Equal(drStateAsyncWait, rep.drGetState())
739761
rep.tickReplicateStatus()
740-
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
762+
assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4})
741763
}
742764

743765
func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo {

0 commit comments

Comments
 (0)