Skip to content

Commit

Permalink
ddl: Fixed partitioning a non-partitioned table with placement rules (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss authored Nov 29, 2024
1 parent b6025b9 commit b2f29ee
Show file tree
Hide file tree
Showing 11 changed files with 753 additions and 128 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ go_test(
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
Expand Down
116 changes: 63 additions & 53 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,12 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in
}
}

bundles, err := alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
_, err = alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
Expand Down Expand Up @@ -269,33 +264,52 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64)
return false, nil
}

func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle
func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addDefs []model.PartitionDefinition) (bool, error) {
// We want to achieve:
// - before we do any reorganization/write to new partitions/global indexes that the placement rules are in-place
// - not removing any placement rules for removed partitions
// So we will:
// 1) First write the new bundles including both new and old partitions,
// EXCEPT if the old partition is in fact a table, then skip that partition
// 2) Then overwrite the bundles with the final partitioning scheme (second call in onReorg/

// tblInfo do not include added partitions, so we should add them first
tblInfo = tblInfo.Clone()
p := *tblInfo.Partition
p.Definitions = append([]model.PartitionDefinition{}, p.Definitions...)
p.Definitions = append(tblInfo.Partition.Definitions, addingDefinitions...)
tblInfo.Partition = &p
p := tblInfo.Partition
if p != nil {
// if partitioning a non-partitioned table, we will first change the metadata,
// so the table looks like a partitioned table, with the first/only partition having
// the same partition ID as the table, so we can access the table as a single partition.
// But in this case we should not add a bundle rule for the same range
// both as table and partition.
if p.Definitions[0].ID != tblInfo.ID {
// prepend with existing partitions
addDefs = append(p.Definitions, addDefs...)
}
p.Definitions = addDefs
}

// bundle for table should be recomputed because it includes some default configs for partitions
tblBundle, err := placement.NewTableBundle(t, tblInfo)
if err != nil {
return nil, errors.Trace(err)
return false, errors.Trace(err)
}

var bundles []*placement.Bundle
if tblBundle != nil {
bundles = append(bundles, tblBundle)
}

partitionBundles, err := placement.NewPartitionListBundles(t, addingDefinitions)
partitionBundles, err := placement.NewPartitionListBundles(t, addDefs)
if err != nil {
return nil, errors.Trace(err)
return false, errors.Trace(err)
}

bundles = append(bundles, partitionBundles...)
return bundles, nil

if len(bundles) > 0 {
return true, infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
}
return false, nil
}

// When drop/truncate a partition, we should still keep the dropped partition's placement settings to avoid unnecessary region schedules.
Expand Down Expand Up @@ -335,20 +349,16 @@ func updateAddingPartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *mode
tblInfo.Partition.AddingDefinitions = append(tblInfo.Partition.AddingDefinitions, newDefs...)
}

// rollbackAddingPartitionInfo remove the `addingDefinitions` in the tableInfo.
func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string, []*placement.Bundle) {
// removePartitionAddingDefinitionsFromTableInfo remove the `addingDefinitions` in the tableInfo.
func removePartitionAddingDefinitionsFromTableInfo(tblInfo *model.TableInfo) ([]int64, []string) {
physicalTableIDs := make([]int64, 0, len(tblInfo.Partition.AddingDefinitions))
partNames := make([]string, 0, len(tblInfo.Partition.AddingDefinitions))
rollbackBundles := make([]*placement.Bundle, 0, len(tblInfo.Partition.AddingDefinitions))
for _, one := range tblInfo.Partition.AddingDefinitions {
physicalTableIDs = append(physicalTableIDs, one.ID)
partNames = append(partNames, one.Name.L)
if one.PlacementPolicyRef != nil {
rollbackBundles = append(rollbackBundles, placement.NewBundle(one.ID))
}
}
tblInfo.Partition.AddingDefinitions = nil
return physicalTableIDs, partNames, rollbackBundles
return physicalTableIDs, partNames
}

// checkAddPartitionValue check add Partition Values,
Expand Down Expand Up @@ -2115,20 +2125,21 @@ func dropLabelRules(ctx context.Context, schemaName, tableName string, partNames
// It will drop newly created partitions that has not yet been used, including cleaning
// up label rules and bundles as well as changed indexes due to global flag.
func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
args, err := model.GetTablePartitionArgs(job)
if err != nil {
return ver, errors.Trace(err)
}
partInfo := args.PartInfo
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DroppingDefinitions = nil
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
// Collect table/partition ids to clean up, through args.OldPhysicalTblIDs
// GC will later also drop matching Placement bundles.
// If we delete them now, it could lead to non-compliant placement or failure during flashback
physicalTableIDs, pNames := removePartitionAddingDefinitionsFromTableInfo(tblInfo)
// TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name?
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames)
if err != nil {
Expand All @@ -2143,7 +2154,9 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (
if partInfo.Type != pmodel.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
if partInfo.NewTableID != 0 {
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
}
// Reset if it was normal table before
if tblInfo.Partition.Type == pmodel.PartitionTypeNone ||
tblInfo.Partition.DDLType == pmodel.PartitionTypeNone {
Expand Down Expand Up @@ -2172,6 +2185,11 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (
tblInfo.Partition.ClearReorgIntermediateInfo()
}

_, err = alterTablePartitionBundles(metaMut, tblInfo, nil)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -3133,11 +3151,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
jobCtx.jobArgs = args
// Handle the rolling back job
if job.IsRollingback() {
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
return w.rollbackLikeDropPartition(jobCtx, job)
}

tblInfo, partNames, partInfo, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
Expand All @@ -3155,6 +3169,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// The partInfo may have been checked against an older schema version for example.
// If the check is done here, it does not need to be repeated, since no other
// DDL on the same table can be run concurrently.
tblInfo.Partition.DDLAction = job.Type
num := len(partInfo.Definitions) - len(partNames) + len(tblInfo.Partition.Definitions)
err = checkAddPartitionTooManyPartitions(uint64(num))
if err != nil {
Expand Down Expand Up @@ -3282,31 +3297,21 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// In the next step, StateDeleteOnly, wait to verify the TiFlash replicas are OK
}

bundles, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
changed, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
if err != nil {
if !changesMade {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}

if len(bundles) > 0 {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
if !changesMade {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
changesMade = true
}
changesMade = changesMade || changed

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
}
changed, err := alterTableLabelRule(job.SchemaName, tblInfo, ids)
changed, err = alterTableLabelRule(job.SchemaName, tblInfo, ids)
changesMade = changesMade || changed
if err != nil {
if !changesMade {
Expand All @@ -3332,7 +3337,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -3581,7 +3585,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// REMOVE PARTITIONING
// Storing the old table ID, used for updating statistics.
oldTblID = tblInfo.ID
// TODO: Handle bundles?
// TODO: Add concurrent test!
// TODO: Will this result in big gaps?
// TODO: How to carrie over AUTO_INCREMENT etc.?
Expand All @@ -3595,7 +3598,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, errors.Trace(err)
}
tblInfo.ID = partInfo.NewTableID
if partInfo.DDLType != pmodel.PartitionTypeNone {
if oldTblID != physicalTableIDs[0] {
// if partitioned before, then also add the old table ID,
// otherwise it will be the already included first partition
physicalTableIDs = append(physicalTableIDs, oldTblID)
Expand All @@ -3615,6 +3618,13 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, errors.Trace(err)
}
}

// We need to update the Placement rule bundles with the final partitions.
_, err = alterTablePartitionBundles(metaMut, tblInfo, nil)
if err != nil {
return ver, err
}

failpoint.Inject("reorgPartFail5", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
Expand Down
Loading

0 comments on commit b2f29ee

Please sign in to comment.