Skip to content

Commit

Permalink
Merge pull request #7035 from dolthub/nicktobey/schemamerge4
Browse files Browse the repository at this point in the history
Detect data conflicts even when one branch has the same binary representation as the ancestor.
  • Loading branch information
nicktobey authored Nov 30, 2023
2 parents 207d284 + ce5f98a commit 804f1c1
Show file tree
Hide file tree
Showing 18 changed files with 161 additions and 92 deletions.
5 changes: 4 additions & 1 deletion go/libraries/doltcore/diff/diff_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func diffProllyTrees(ctx context.Context, ch chan DiffStatProgress, keyless bool
}
}

err = prolly.DiffMaps(ctx, f, t, func(ctx context.Context, diff tree.Diff) error {
// TODO: Use `vMapping` to determine whether columns have been added or removed. If so, then all rows should
// count as modifications in the diff.
considerAllRowsModified := false
err = prolly.DiffMaps(ctx, f, t, considerAllRowsModified, func(ctx context.Context, diff tree.Diff) error {
return rpr(ctx, vMapping, fVD, tVD, diff, ch)
})
if err != nil && err != io.EOF {
Expand Down
15 changes: 8 additions & 7 deletions go/libraries/doltcore/merge/merge_prolly_rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ var ErrUnableToMergeColumnDefaultValue = errorkinds.NewKind("unable to automatic
"in merge: %s for table '%s'; to continue merging, first manually apply the column alteration on this branch")

// mergeProllyTable merges the table specified by |tm| using the specified |mergedSch| and returns the new table
// instance, along with merge stats and any error. If |rewriteRows| is true, then any existing rows in the
// instance, along with merge stats and any error. If |diffInfo.RewriteRows| is true, then any existing rows in the
// table's primary index will also be rewritten. This function merges the table's artifacts (e.g. recorded
// conflicts), migrates any existing table data to the specified |mergedSch|, and merges table data from both
// sides of the merge together.
func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Schema, mergeInfo MergeInfo) (*doltdb.Table, *MergeStats, error) {
func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Schema, mergeInfo MergeInfo, diffInfo tree.ThreeWayDiffInfo) (*doltdb.Table, *MergeStats, error) {
mergeTbl, err := mergeTableArtifacts(ctx, tm, tm.leftTbl)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -84,7 +84,7 @@ func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Sch
}

var stats *MergeStats
mergeTbl, stats, err = mergeProllyTableData(sqlCtx, tm, mergedSch, mergeTbl, valueMerger, mergeInfo)
mergeTbl, stats, err = mergeProllyTableData(sqlCtx, tm, mergedSch, mergeTbl, valueMerger, mergeInfo, diffInfo)
if err != nil {
return nil, nil, err
}
Expand All @@ -110,8 +110,8 @@ func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Sch
// as well as any secondary indexes, and also checking for unique constraints incrementally. When
// conflicts are detected, this function attempts to resolve them automatically if possible, and
// if not, they are recorded as conflicts in the table's artifacts.
func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Schema, mergeTbl *doltdb.Table, valueMerger *valueMerger, mergeInfo MergeInfo) (*doltdb.Table, *MergeStats, error) {
iter, err := threeWayDiffer(ctx, tm, valueMerger)
func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Schema, mergeTbl *doltdb.Table, valueMerger *valueMerger, mergeInfo MergeInfo, diffInfo tree.ThreeWayDiffInfo) (*doltdb.Table, *MergeStats, error) {
iter, err := threeWayDiffer(ctx, tm, valueMerger, diffInfo)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Sch
return finalTbl, s, nil
}

func threeWayDiffer(ctx context.Context, tm *TableMerger, valueMerger *valueMerger) (*tree.ThreeWayDiffer[val.Tuple, val.TupleDesc], error) {
func threeWayDiffer(ctx context.Context, tm *TableMerger, valueMerger *valueMerger, diffInfo tree.ThreeWayDiffInfo) (*tree.ThreeWayDiffer[val.Tuple, val.TupleDesc], error) {
lr, err := tm.leftTbl.GetRowData(ctx)
if err != nil {
return nil, err
Expand All @@ -338,6 +338,7 @@ func threeWayDiffer(ctx context.Context, tm *TableMerger, valueMerger *valueMerg
ancRows.Tuples(),
valueMerger.tryMerge,
valueMerger.keyless,
diffInfo,
leftRows.Tuples().Order,
)
}
Expand Down Expand Up @@ -1608,7 +1609,7 @@ func newValueMerger(merged, leftSch, rightSch, baseSch schema.Schema, syncPool p
baseToLeftMapping, baseToRightMapping, baseToResultMapping := generateSchemaMappings(baseSch, leftSch, rightSch, merged)

return &valueMerger{
numCols: merged.GetNonPKCols().Size(),
numCols: merged.GetNonPKCols().StoredSize(),
baseVD: baseSch.GetValueDescriptor(),
rightVD: rightSch.GetValueDescriptor(),
resultVD: merged.GetValueDescriptor(),
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/merge/merge_rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (rm *RootMerger) MergeTable(ctx *sql.Context, tblName string, opts editor.O
}

// Calculate a merge of the schemas, but don't apply it yet
mergeSch, schConflicts, mergeInfo, err := SchemaMerge(ctx, tm.vrw.Format(), tm.leftSch, tm.rightSch, tm.ancSch, tblName)
mergeSch, schConflicts, mergeInfo, diffInfo, err := SchemaMerge(ctx, tm.vrw.Format(), tm.leftSch, tm.rightSch, tm.ancSch, tblName)
if err != nil {
return nil, nil, err
}
Expand All @@ -148,7 +148,7 @@ func (rm *RootMerger) MergeTable(ctx *sql.Context, tblName string, opts editor.O

var tbl *doltdb.Table
if types.IsFormat_DOLT(tm.vrw.Format()) {
tbl, stats, err = mergeProllyTable(ctx, tm, mergeSch, mergeInfo)
tbl, stats, err = mergeProllyTable(ctx, tm, mergeSch, mergeInfo, diffInfo)
} else {
tbl, stats, err = mergeNomsTable(ctx, tm, mergeSch, rm.vrw, opts)
}
Expand Down
61 changes: 42 additions & 19 deletions go/libraries/doltcore/merge/merge_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/prolly/tree"
storetypes "github.com/dolthub/dolt/go/store/types"
)

Expand Down Expand Up @@ -155,7 +156,7 @@ var ErrMergeWithDifferentPks = errors.New("error: cannot merge two tables with d
// SchemaMerge performs a three-way merge of |ourSch|, |theirSch|, and |ancSch|, and returns: the merged schema,
// any schema conflicts identified, whether moving to the new schema requires a full table rewrite, and any
// unexpected error encountered while merging the schemas.
func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch, theirSch, ancSch schema.Schema, tblName string) (sch schema.Schema, sc SchemaConflict, mergeInfo MergeInfo, err error) {
func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch, theirSch, ancSch schema.Schema, tblName string) (sch schema.Schema, sc SchemaConflict, mergeInfo MergeInfo, diffInfo tree.ThreeWayDiffInfo, err error) {
// (sch - ancSch) ∪ (mergeSch - ancSch) ∪ (sch ∩ mergeSch)
sc = SchemaConflict{
TableName: tblName,
Expand All @@ -164,38 +165,38 @@ func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch,
// TODO: We'll remove this once it's possible to get diff and merge on different primary key sets
// TODO: decide how to merge different orders of PKS
if !schema.ArePrimaryKeySetsDiffable(format, ourSch, theirSch) || !schema.ArePrimaryKeySetsDiffable(format, ourSch, ancSch) {
return nil, SchemaConflict{}, mergeInfo, ErrMergeWithDifferentPks
return nil, SchemaConflict{}, mergeInfo, diffInfo, ErrMergeWithDifferentPks
}

var mergedCC *schema.ColCollection
mergedCC, sc.ColConflicts, mergeInfo, err = mergeColumns(tblName, format, ourSch.GetAllCols(), theirSch.GetAllCols(), ancSch.GetAllCols())
mergedCC, sc.ColConflicts, mergeInfo, diffInfo, err = mergeColumns(tblName, format, ourSch.GetAllCols(), theirSch.GetAllCols(), ancSch.GetAllCols())
if err != nil {
return nil, SchemaConflict{}, mergeInfo, err
return nil, SchemaConflict{}, mergeInfo, diffInfo, err
}
if len(sc.ColConflicts) > 0 {
return nil, sc, mergeInfo, nil
return nil, sc, mergeInfo, diffInfo, nil
}

var mergedIdxs schema.IndexCollection
mergedIdxs, sc.IdxConflicts = mergeIndexes(mergedCC, ourSch, theirSch, ancSch)
if len(sc.IdxConflicts) > 0 {
return nil, sc, mergeInfo, nil
return nil, sc, mergeInfo, diffInfo, nil
}

sch, err = schema.SchemaFromCols(mergedCC)
if err != nil {
return nil, sc, mergeInfo, err
return nil, sc, mergeInfo, diffInfo, err
}

sch, err = mergeTableCollation(ctx, tblName, ancSch, ourSch, theirSch, sch)
if err != nil {
return nil, sc, mergeInfo, err
return nil, sc, mergeInfo, diffInfo, err
}

// TODO: Merge conflict should have blocked any primary key ordinal changes
err = sch.SetPkOrdinals(ourSch.GetPkOrdinals())
if err != nil {
return nil, sc, mergeInfo, err
return nil, sc, mergeInfo, diffInfo, err
}

_ = mergedIdxs.Iter(func(index schema.Index) (stop bool, err error) {
Expand All @@ -207,17 +208,17 @@ func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch,
var mergedChks []schema.Check
mergedChks, sc.ChkConflicts, err = mergeChecks(ctx, ourSch.Checks(), theirSch.Checks(), ancSch.Checks())
if err != nil {
return nil, SchemaConflict{}, mergeInfo, err
return nil, SchemaConflict{}, mergeInfo, diffInfo, err
}
if len(sc.ChkConflicts) > 0 {
return nil, sc, mergeInfo, nil
return nil, sc, mergeInfo, diffInfo, nil
}

// Look for invalid CHECKs
for _, chk := range mergedChks {
// CONFLICT: a CHECK now references a column that no longer exists in schema
if ok, err := isCheckReferenced(sch, chk); err != nil {
return nil, sc, mergeInfo, err
return nil, sc, mergeInfo, diffInfo, err
} else if !ok {
// Append to conflicts
sc.ChkConflicts = append(sc.ChkConflicts, ChkConflict{
Expand All @@ -232,7 +233,7 @@ func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch,
sch.Checks().AddCheck(chk.Name(), chk.Expression(), chk.Enforced())
}

return sch, sc, mergeInfo, nil
return sch, sc, mergeInfo, diffInfo, nil
}

// ForeignKeysMerge performs a three-way merge of (ourRoot, theirRoot, ancRoot) and using mergeRoot to validate FKs.
Expand Down Expand Up @@ -375,21 +376,22 @@ type MergeInfo struct {
// compatible with the current stored format. The merged columns, any column conflicts, and a boolean value stating if
// a full table rewrite is needed to align the existing table rows with the new, merged schema. If any unexpected error
// occurs, then that error is returned and the other response fields should be ignored.
func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, theirCC, ancCC *schema.ColCollection) (*schema.ColCollection, []ColConflict, MergeInfo, error) {
func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, theirCC, ancCC *schema.ColCollection) (*schema.ColCollection, []ColConflict, MergeInfo, tree.ThreeWayDiffInfo, error) {
mergeInfo := MergeInfo{}
diffInfo := tree.ThreeWayDiffInfo{}
columnMappings, err := mapColumns(ourCC, theirCC, ancCC)
if err != nil {
return nil, nil, mergeInfo, err
return nil, nil, mergeInfo, diffInfo, err
}

conflicts, err := checkSchemaConflicts(columnMappings)
if err != nil {
return nil, nil, mergeInfo, err
return nil, nil, mergeInfo, diffInfo, err
}

err = checkUnmergeableNewColumns(tblName, columnMappings)
if err != nil {
return nil, nil, mergeInfo, err
return nil, nil, mergeInfo, diffInfo, err
}

compatChecker := newTypeCompatabilityCheckerForStorageFormat(format)
Expand All @@ -408,32 +410,47 @@ func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, their
// if an ancestor does not exist, and the column exists only on one side, use that side
// (if an ancestor DOES exist, this means the column was deleted, so it's a no-op)
mergeInfo.LeftNeedsRewrite = true
diffInfo.RightSchemaChange = true
diffInfo.LeftAndRightSchemasDiffer = true
mergedColumns = append(mergedColumns, *theirs)
case anc == nil && ours != nil && theirs == nil:
// if an ancestor does not exist, and the column exists only on one side, use that side
// (if an ancestor DOES exist, this means the column was deleted, so it's a no-op)
mergeInfo.RightNeedsRewrite = true
diffInfo.LeftSchemaChange = true
diffInfo.LeftAndRightSchemasDiffer = true
mergedColumns = append(mergedColumns, *ours)
case anc != nil && ours == nil && theirs != nil:
// column was deleted on our side
mergeInfo.RightNeedsRewrite = true
diffInfo.LeftSchemaChange = true
diffInfo.LeftAndRightSchemasDiffer = true
case anc != nil && ours != nil && theirs == nil:
// column was deleted on their side
mergeInfo.LeftNeedsRewrite = true
diffInfo.RightSchemaChange = true
diffInfo.LeftAndRightSchemasDiffer = true
case ours == nil && theirs == nil:
// if the column is deleted on both sides... just let it fall out
diffInfo.LeftSchemaChange = true
diffInfo.RightSchemaChange = true
case ours != nil && theirs != nil:
// otherwise, we have two valid columns and we need to figure out which one to use
if anc != nil {
oursChanged := !anc.Equals(*ours)
diffInfo.LeftSchemaChange = diffInfo.LeftSchemaChange || oursChanged
theirsChanged := !anc.Equals(*theirs)
diffInfo.RightSchemaChange = diffInfo.RightSchemaChange || theirsChanged
if oursChanged && theirsChanged {
// If both columns changed in the same way, the modifications converge, so accept the column.
// If not, don't report a conflict, since this case is already handled in checkForColumnConflicts.
if ours.Equals(*theirs) {
mergedColumns = append(mergedColumns, *theirs)
} else {
diffInfo.LeftAndRightSchemasDiffer = true
}
} else if theirsChanged {
diffInfo.LeftAndRightSchemasDiffer = true
// In this case, only theirsChanged, so we need to check if moving from ours->theirs
// is valid, otherwise it's a conflict
mergeInfo.LeftNeedsRewrite = true
Expand All @@ -451,6 +468,7 @@ func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, their
})
}
} else if oursChanged {
diffInfo.LeftAndRightSchemasDiffer = true
// In this case, only oursChanged, so we need to check if moving from theirs->ours
// is valid, otherwise it's a conflict
mergeInfo.RightNeedsRewrite = true
Expand All @@ -472,10 +490,15 @@ func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, their
mergedColumns = append(mergedColumns, *ours)
}
} else {
// The column was added on both branches.
diffInfo.LeftSchemaChange = true
diffInfo.RightSchemaChange = true
// If both columns changed in the same way, the modifications converge, so accept the column.
// If not, don't report a conflict, since this case is already handled in checkForColumnConflicts.
if ours.Equals(*theirs) {
mergedColumns = append(mergedColumns, *ours)
} else {
diffInfo.LeftAndRightSchemasDiffer = true
}
}
}
Expand All @@ -484,10 +507,10 @@ func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, their
// Check that there are no duplicate column names or tags in the merged column set
conflicts = append(conflicts, checkForColumnConflicts(mergedColumns)...)
if conflicts != nil {
return nil, conflicts, mergeInfo, nil
return nil, conflicts, mergeInfo, diffInfo, nil
}

return schema.NewColCollection(mergedColumns...), nil, mergeInfo, nil
return schema.NewColCollection(mergedColumns...), nil, mergeInfo, diffInfo, nil
}

// checkForColumnConflicts iterates over |mergedColumns|, checks for duplicate column names or column tags, and returns
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/merge/schema_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func testMergeSchemasWithConflicts(t *testing.T, test mergeSchemaConflictTest) {

otherSch := getSchema(t, dEnv)

_, actConflicts, mergeInfo, err := merge.SchemaMerge(context.Background(), types.Format_Default, mainSch, otherSch, ancSch, "test")
_, actConflicts, mergeInfo, _, err := merge.SchemaMerge(context.Background(), types.Format_Default, mainSch, otherSch, ancSch, "test")
assert.False(t, mergeInfo.InvalidateSecondaryIndexes)
if test.expectedErr != nil {
assert.True(t, errors.Is(err, test.expectedErr))
Expand Down
14 changes: 0 additions & 14 deletions go/libraries/doltcore/merge/schema_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,6 @@ var columnAddDropTests = []schemaMergeTest{
merged: singleRow(1, 2),
},
{
// Skipped because the differ currently doesn't see this as a data conflict because
// both left and right tuple representations are the same.
// (https://github.com/dolthub/dolt/issues/6748)
name: "one side sets to NULL, other drops non-NULL",
ancestor: singleRow(1, 2, 3),
left: singleRow(1, 2),
Expand All @@ -247,25 +244,18 @@ var columnAddDropTests = []schemaMergeTest{
skip: true,
},
{
// Skipped because the differ currently doesn't try to merge the dropped column.
// (https://github.com/dolthub/dolt/issues/6747)
name: "one side sets to NULL, other drops non-NULL, plus data change",
ancestor: singleRow(1, 2, 3),
left: singleRow(1, 2),
right: singleRow(1, 3, nil),
dataConflict: true,
skip: true,
},
{
// Skipped because the differ doesn't see left as modified because
// it has the same tuple representation as ancestor.
// (https://github.com/dolthub/dolt/issues/6746)
name: "one side sets to non-NULL, other drops NULL",
ancestor: singleRow(1, 2, nil),
left: singleRow(1, 2),
right: singleRow(1, 2, 3),
dataConflict: true,
skip: true,
},
{
name: "one side sets to non-NULL, other drops NULL, plus data change",
Expand Down Expand Up @@ -375,15 +365,11 @@ var columnAddDropTests = []schemaMergeTest{
merged: singleRow(1, 2, nil),
},
{
// Skipped because the differ doesn't see left as modified because
// it has the same tuple representation as ancestor.
// (https://github.com/dolthub/dolt/issues/6746)
name: "convergent adds with differing nullness",
ancestor: singleRow(1, 2),
left: singleRow(1, 2, nil),
right: singleRow(1, 2, 3),
dataConflict: true,
skip: true,
},
{
name: "convergent adds with differing nullness, plus convergent data change",
Expand Down
Loading

0 comments on commit 804f1c1

Please sign in to comment.