Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to #48] modify TiDB CDC to TiKV CDC #74

Merged
merged 33 commits into from
Apr 18, 2022
Merged

[to #48] modify TiDB CDC to TiKV CDC #74

merged 33 commits into from
Apr 18, 2022

Conversation

zeminzhou
Copy link
Contributor

What problem does this PR solve?

Issue Number: #48

Problem Description:
The current TiDB CDC should not be used for RawKV CDC.

What is changed and how does it work?

Modify TiDB CDC to synchronize RawKV data

Code changes

  • Has exported function/method change
  • Has exported variable/fields change
  • Has interface methods change
  • Has persistent data change

Check List for Tests

This PR has been tested by at least one of the following methods:

  • Unit test

"github.com/tikv/migration/cdc/cdc/processor/pipeline/system"
ssystem "github.com/tikv/migration/cdc/cdc/sorter/leveldb/system"

// ssystem "github.com/tikv/migration/cdc/cdc/sorter/leveldb/system"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just remove it?

Copy link
Contributor Author

@zeminzhou zeminzhou Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sorter module is used to sort transactions by commit ts to ensure that the transactions entered by cdc are in the same order as commit ts. Now rawkv don't it (We also need to add sorter for txnkv later).

if len(changefeedConfig.FilterRules) != 0 {
replicaConfig.Filter.Rules = changefeedConfig.FilterRules
}
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilterRules are used to filter table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can modify the filter table to filter keys (or keyspan) to filter keys that users do not need to synchronize.

if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
/*
if len(changefeedConfig.FilterRules) != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// APIOpVarTableID is the key of table ID in HTTP API
APIOpVarTableID = "table-id"
// APIOpVarKeySpanID is the key of keyspan ID in HTTP API
APIOpVarKeySpanID = "keyspan-id"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the relationship between keyspan id and region id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation, keyspan and region have a one-to-one relationship. But keyspan id is not equal to region id, keyspan id only represents an interval [start, end). keyspan-id = hash(start+end).

@@ -536,7 +536,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
}
})

tableID, tableName := util.TableIDFromCtx(ctx)
// tableID, tableName := util.KeySpanIDFromCtx(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be something wrong here, I'll check it later.

@@ -564,7 +564,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
// zap.Int64("tableID", tableID), zap.String("tableName", tableName),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -19,15 +19,9 @@ import (
"github.com/tikv/migration/cdc/cdc/kv"
"github.com/tikv/migration/cdc/cdc/owner"
"github.com/tikv/migration/cdc/cdc/processor"
tablepipeline "github.com/tikv/migration/cdc/cdc/processor/pipeline"
keyspanpipeline "github.com/tikv/migration/cdc/cdc/processor/pipeline"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just use the original name pipeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why tablepipeline was used instead of pipeline. 🤣

unified.InitMetrics(registry)
leveldb.InitMetrics(registry)
redowriter.InitMetrics(registry)
// sorter.InitMetrics(registry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

feedStateManager: new(feedStateManager),
gcManager: gcManager,

errCh: make(chan error, defaultErrChSize),
cancel: func() {},

newDDLPuller: newDDLPuller,
newSink: newDDLSink,
// newDDLPuller: newDDLPuller,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

@zeminzhou zeminzhou Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In TiCDC, owner is responsible for two things:

  1. Divide changfeed into tasks and assign them to other captures.
  2. Synchronize DML operations of upstream TiDB.

Now synchronizing DML operations don't need.

cdc/cdc/model/http_model.go Outdated Show resolved Hide resolved
@@ -37,6 +38,15 @@ func (s Span) String() string {
return fmt.Sprintf("[%s, %s)", hex.EncodeToString(s.Start), hex.EncodeToString(s.End))
}

func (s Span) ID() uint64 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a very small possibility that two different spans generating a same hash. Maybe we should add a seed in Span to avoid it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is indeed possible. But I don't think adding a seed is effective, because this is a probabilistic event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still conflict between "currentKeySpans" and the keyspans acquired before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using region id as keyspan id?https://github.com/zeminzhou/migration/blob/tikv-cdc/cdc/cdc/owner/scheduler_v1.go#L444.

If we do it this way, keyspan will lose its meaning.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The range of a region will change after split or merge.

How about add a seed to KeySpan to avoid conflict ?

We maintain all key spans in owner. Every time we get a new key span, calculate the hash with seed starting from 0. If the hash conflicts with an existing key span, seed += 1, and calculate the hash again. Until we find a hash without conflict.
Note that the seed should be transfer to Capture & Processor, along with key span's start & end.

@@ -58,7 +58,7 @@ type ownerJob struct {
// for ManualSchedule only
targetCaptureID model.CaptureID
// for ManualSchedule only
tableID model.TableID
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Not related to this line)
GC related procedure (Owner.updateGCSafepoint) need to be reviewed. At least the CDCServiceSaftPointID must be changed. Refer to rfc.

cdc/cdc/owner/owner_test.go Show resolved Hide resolved
cdc/cdc/owner/metrics.go Show resolved Hide resolved
cdc/cdc/owner/scheduler_v1.go Outdated Show resolved Hide resolved
cdc/cdc/owner/scheduler_v1.go Outdated Show resolved Hide resolved
cdc/cdc/owner/scheduler_v1.go Outdated Show resolved Hide resolved

s.cleanUpFinishedOperations()
pendingJob, err := s.syncTablesWithCurrentTables()
pendingJob, err := s.syncKeySpansWithCurrentKeySpans()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Keyspans to add & keyspans to remove, must be overlapped. So to be correct, I think we must finish remove jobs first, and then dispatch the add jobs.
  2. The "remove then add" actions would be expensive, and make the replication be jitter. So I think it would be better to limit the number of keyspans to be balanced in every period.
  3. If we limit the number of keyspans to be balanced, the keyspans should be "related", e.g, the splitted regions and the original region, the merged region and the orginal two regions.

cdc/cdc/owner/scheduler_v1.go Outdated Show resolved Hide resolved
@@ -20,22 +20,22 @@ import (
var (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metrics is for mounter. Can be removed.

cdc/cmd/kafka-consumer/main.go Show resolved Hide resolved
cdc/cdc/sink/buffer_sink.go Show resolved Hide resolved
cdc/cdc/sink/common/flow_control.go Outdated Show resolved Hide resolved
cdc/pkg/cmd/cli/cli_changefeed.go Outdated Show resolved Hide resolved
zeminzhou and others added 16 commits April 18, 2022 15:11
Signed-off-by: zeminzhou <[email protected]>
Issue Number: #57

Signed-off-by: pingyu <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
zeminzhou and others added 16 commits April 18, 2022 15:15
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
* [to #67] setup backup/restore integration test for rawkv

Signed-off-by: Jian Zhang <[email protected]>

* add github action

Signed-off-by: Jian Zhang <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
* [to #67] remove some unused code

Signed-off-by: Jian Zhang <[email protected]>

* remove web

Signed-off-by: Jian Zhang <[email protected]>

Co-authored-by: Ping Yu <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: Jian Zhang <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
* br: cherry pick tidb#32612 to fix rawkv backup failure

Issue Number: #67

Signed-off-by: pingyu <[email protected]>

* revert unnecessary port change

Signed-off-by: pingyu <[email protected]>

* migrate test logics from run.sh to run.py

Signed-off-by: pingyu <[email protected]>

* temporarily disable old versions < 6.0

Signed-off-by: pingyu <[email protected]>

* separate CI of 5.X versions

Signed-off-by: pingyu <[email protected]>

* bugfix

Signed-off-by: pingyu <[email protected]>

* bugfix

Signed-off-by: pingyu <[email protected]>

* address comments

Signed-off-by: pingyu <[email protected]>

* tiny fix

Signed-off-by: pingyu <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Copy link
Collaborator

@pingyu pingyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pingyu pingyu merged commit 3b82376 into tikv:main Apr 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants