Skip to content

Commit

Permalink
Merge pull request #820 from liquidata-inc/andy/fk-pk
Browse files Browse the repository at this point in the history
foreign keys can use primary keys as parent table indexes
  • Loading branch information
andy-wm-arthur authored Aug 4, 2020
2 parents 03c3b53 + 90fdf47 commit cd7b12b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
10 changes: 10 additions & 0 deletions bats/foreign-keys.bats
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ SQL
[[ `echo "$output" | tr -d "\n" | tr -s " "` =~ 'CONSTRAINT `fk_named` FOREIGN KEY (`v1`) REFERENCES `parent` (`v1`)' ]] || false
}

@test "foreign-keys: parent table index required" {
# parent doesn't have an index over (v1,v2) to reference
run dolt sql -q "ALTER TABLE child ADD CONSTRAINT fk1 FOREIGN KEY (v1,v2) REFERENCES parent(v1,v2);"
[ "$status" -ne "0" ]

# parent implicitly has an index over its primary key
run dolt sql -q "ALTER TABLE child ADD CONSTRAINT fk_id FOREIGN KEY (v1) REFERENCES parent(id);"
[ "$status" -eq "0" ]
}

@test "foreign-keys: CREATE TABLE Name Collision" {
run dolt sql <<SQL
CREATE TABLE child (
Expand Down
56 changes: 44 additions & 12 deletions go/libraries/doltcore/sqle/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema/alterschema"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema/encoding"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema/typeinfo"
"github.com/liquidata-inc/dolt/go/libraries/utils/set"
"github.com/liquidata-inc/dolt/go/store/types"
)

Expand Down Expand Up @@ -458,7 +459,7 @@ func (t *AlterableDoltTable) ModifyColumn(ctx *sql.Context, columnName string, c

// CreateIndex implements sql.IndexAlterableTable
func (t *AlterableDoltTable) CreateIndex(ctx *sql.Context, indexName string, using sql.IndexUsing, constraint sql.IndexConstraint, columns []sql.IndexColumn, comment string) error {
newTable, _, _, err := t.createIndex(ctx, indexName, using, constraint, columns, comment)
newTable, _, _, err := createIndexForTable(ctx, t.table, indexName, using, constraint, columns, comment)
if err != nil {
return err
}
Expand Down Expand Up @@ -629,7 +630,7 @@ func (t *AlterableDoltTable) CreateForeignKey(
tableIndex, ok := t.sch.Indexes().GetIndexByTags(colTags...)
if !ok {
// if child index doesn't exist, create it
t.table, _, tableIndex, err = t.createIndex(ctx, "", sql.IndexUsing_Default, sql.IndexConstraint_None, sqlColNames, "")
t.table, _, tableIndex, err = createIndexForTable(ctx, t.table, "", sql.IndexUsing_Default, sql.IndexConstraint_None, sqlColNames, "")
if err != nil {
return err
}
Expand All @@ -641,8 +642,27 @@ func (t *AlterableDoltTable) CreateForeignKey(

refTableIndex, ok := refSch.Indexes().GetIndexByTags(refColTags...)
if !ok {
// parent index must exist
return fmt.Errorf("missing index for constraint '%s' in the referenced table '%s'", fkName, refTblName)
parentPKs := set.NewUint64Set(refSch.GetPKCols().Tags)
if parentPKs.ContainsAll(refColTags) {
// special exception for parent table primary keys
// todo: make clustered PK index usable as parent table FK index
var colNames []sql.IndexColumn
for _, t := range refColTags {
c, _ := refSch.GetAllCols().GetByTag(t)
colNames = append(colNames, sql.IndexColumn{Name: c.Name})
}
refTbl, _, refTableIndex, err = createIndexForTable(ctx, refTbl, "", sql.IndexUsing_Default, sql.IndexConstraint_None, colNames, "")
if err != nil {
return err
}
root, err = root.PutTable(ctx, refTblName, refTbl)
if err != nil {
return err
}
} else {
// parent index must exist
return fmt.Errorf("missing index for constraint '%s' in the referenced table '%s'", fkName, refTblName)
}
}

foreignKeyCollection, err := root.GetForeignKeyCollection(ctx)
Expand Down Expand Up @@ -820,14 +840,19 @@ func parseFkReferenceOption(refOp sql.ForeignKeyReferenceOption) (doltdb.Foreign
}

// createIndex creates the given index on the given table with the given schema. Returns the updated table, updated schema, and created index.
func (t *AlterableDoltTable) createIndex(ctx *sql.Context, indexName string, using sql.IndexUsing, constraint sql.IndexConstraint, columns []sql.IndexColumn, comment string) (*doltdb.Table, schema.Schema, schema.Index, error) {
func createIndexForTable(ctx *sql.Context, table *doltdb.Table, indexName string, using sql.IndexUsing, constraint sql.IndexConstraint, columns []sql.IndexColumn, comment string) (*doltdb.Table, schema.Schema, schema.Index, error) {
if constraint != sql.IndexConstraint_None && constraint != sql.IndexConstraint_Unique {
return nil, nil, nil, fmt.Errorf("not yet supported")
}

sch, err := table.GetSchema(ctx)
if err != nil {
return nil, nil, nil, err
}

// get the real column names as CREATE INDEX columns are case-insensitive
var realColNames []string
allTableCols := t.sch.GetAllCols()
allTableCols := sch.GetAllCols()
for _, indexCol := range columns {
tableCol, ok := allTableCols.GetByNameCaseInsensitive(indexCol.Name)
if !ok {
Expand All @@ -838,31 +863,38 @@ func (t *AlterableDoltTable) createIndex(ctx *sql.Context, indexName string, usi

if indexName == "" {
indexName = strings.Join(realColNames, "")
_, ok := sch.Indexes().GetByNameCaseInsensitive(indexName)
var i int
for ok {
i++
indexName = fmt.Sprintf("%s_%d", strings.Join(realColNames, ""), i)
_, ok = sch.Indexes().GetByNameCaseInsensitive(indexName)
}
}
if !doltdb.IsValidTableName(indexName) {
return nil, nil, nil, fmt.Errorf("invalid index name `%s` as they must match the regular expression %s", indexName, doltdb.TableNameRegexStr)
}

// create the index metadata, will error if index names are taken or an index with the same columns in the same order exists
index, err := t.sch.Indexes().AddIndexByColNames(indexName, realColNames, schema.IndexProperties{IsUnique: constraint == sql.IndexConstraint_Unique, Comment: comment})
index, err := sch.Indexes().AddIndexByColNames(indexName, realColNames, schema.IndexProperties{IsUnique: constraint == sql.IndexConstraint_Unique, Comment: comment})
if err != nil {
return nil, nil, nil, err
}

// update the table schema with the new index
newSchemaVal, err := encoding.MarshalSchemaAsNomsValue(ctx, t.table.ValueReadWriter(), t.sch)
newSchemaVal, err := encoding.MarshalSchemaAsNomsValue(ctx, table.ValueReadWriter(), sch)
if err != nil {
return nil, nil, nil, err
}
tableRowData, err := t.table.GetRowData(ctx)
tableRowData, err := table.GetRowData(ctx)
if err != nil {
return nil, nil, nil, err
}
indexData, err := t.table.GetIndexData(ctx)
indexData, err := table.GetIndexData(ctx)
if err != nil {
return nil, nil, nil, err
}
newTable, err := doltdb.NewTable(ctx, t.table.ValueReadWriter(), newSchemaVal, tableRowData, &indexData)
newTable, err := doltdb.NewTable(ctx, table.ValueReadWriter(), newSchemaVal, tableRowData, &indexData)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -876,7 +908,7 @@ func (t *AlterableDoltTable) createIndex(ctx *sql.Context, indexName string, usi
if err != nil {
return nil, nil, nil, err
}
return newTable, t.sch, index, nil
return newTable, sch, index, nil
}

// dropIndex drops the given index on the given table with the given schema. Returns the updated table and updated schema.
Expand Down

0 comments on commit cd7b12b

Please sign in to comment.