Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshot(ticdc): fix ddl puller and ddl manager stuck caused by dead lock (#11886) #11896

Open
wants to merge 1 commit into
base: release-7.1
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,92 @@
return nil
}

<<<<<<< HEAD

Check failure on line 231 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 231 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body
=======
// AllPhysicalTables returns the table IDs of all tables and partition tables.
func (s *schemaStorage) AllPhysicalTables(ctx context.Context, ts model.Ts) ([]model.TableID, error) {
// NOTE: it's better to pre-allocate the vector. However, in the current implementation
// we can't know how many valid tables in the snapshot.
res := make([]model.TableID, 0)
snap, err := s.GetSnapshot(ctx, ts)
if err != nil {
return nil, err
}

snap.IterTables(true, func(tblInfo *model.TableInfo) {
if s.shouldIgnoreTable(tblInfo) {
return
}
if pi := tblInfo.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
res = append(res, partition.ID)
}
} else {
res = append(res, tblInfo.ID)
}
})
log.Debug("get new schema snapshot",
zap.Uint64("ts", ts),
zap.Uint64("snapTs", snap.CurrentTs()),
zap.Any("tables", res))

return res, nil
}

// AllTables returns table info of all tables that are being replicated.
func (s *schemaStorage) AllTables(ctx context.Context, ts model.Ts) ([]*model.TableInfo, error) {
tables := make([]*model.TableInfo, 0)
snap, err := s.GetSnapshot(ctx, ts)
if err != nil {
return nil, err
}
snap.IterTables(true, func(tblInfo *model.TableInfo) {
if !s.shouldIgnoreTable(tblInfo) {
tables = append(tables, tblInfo)
}
})
return tables, nil
}

func (s *schemaStorage) shouldIgnoreTable(t *model.TableInfo) bool {
schemaName := t.TableName.Schema
tableName := t.TableName.Table
if s.filter.ShouldIgnoreTable(schemaName, tableName) {
return true
}
if t.IsEligible(s.forceReplicate) {
return false
}

// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !t.IsSequence() {
log.Warn("skip ineligible table",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.Int64("tableID", t.ID),
zap.Stringer("tableName", t.TableName),
)
}
return true
}

// IsIneligibleTable returns whether the table is ineligible.
// It uses the snapshot of the given ts to check the table.
// Ineligible means that the table does not have a primary key
// or not null unique key.
func (s *schemaStorage) IsIneligibleTable(
ctx context.Context, tableID model.TableID, ts model.Ts,
) (bool, error) {
snap, err := s.GetSnapshot(ctx, ts)
if err != nil {
return false, err
}
return snap.IsIneligibleTableID(tableID), nil
}

>>>>>>> 0bb497751a (snapshot(ticdc): fix ddl puller and ddl manager stuck caused by two dead lock (#11886))

Check failure on line 316 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 316 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

binary literal has no digits

Check failure on line 316 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 316 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body

Check failure on line 316 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

binary literal has no digits

Check failure on line 316 in cdc/entry/schema_storage.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
// AdvanceResolvedTs advances the resolved. Not thread safe.
// NOTE: SHOULD NOT call it concurrently
func (s *schemaStorageImpl) AdvanceResolvedTs(ts uint64) {
Expand Down
Loading