Skip to content

Commit

Permalink
Changed "table cp" and "table import" to use TableEditor
Browse files Browse the repository at this point in the history
  • Loading branch information
Hydrocharged committed May 29, 2020
1 parent 9aa6a05 commit ad76f8a
Show file tree
Hide file tree
Showing 12 changed files with 309 additions and 171 deletions.
31 changes: 1 addition & 30 deletions go/cmd/dolt/commands/tblcmds/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (cmd CpCmd) Exec(ctx context.Context, commandStr string, args []string, dEn
Dest: mvdata.TableDataLocation{Name: newTbl},
}

mover, verr := newTableCopyDataMover(ctx, root, dEnv.FS, cpOpts, importStatsCB)
mover, verr := newTableCopyDataMover(ctx, root, dEnv.FS, cpOpts, nil)

if verr != nil {
return commands.HandleVErrAndExitCode(verr, usage)
Expand All @@ -196,8 +196,6 @@ func (cmd CpCmd) Exec(ctx context.Context, commandStr string, args []string, dEn
return commands.HandleVErrAndExitCode(verr, usage)
}

verr = buildNewIndexes(ctx, dEnv, newTbl)

return commands.HandleVErrAndExitCode(verr, usage)
}

Expand Down Expand Up @@ -253,30 +251,3 @@ func newTableCopyDataMover(ctx context.Context, root *doltdb.RootValue, fs files

return imp, nil
}

func buildNewIndexes(ctx context.Context, dEnv *env.DoltEnv, newTblName string) errhand.VerboseError {
//TODO: change this to not use the executeImport function, and instead the SQL code path
newWorking, err := dEnv.WorkingRoot(ctx)
if err != nil {
return errhand.BuildDError("Unable to load the working set to build the indexes.").AddCause(err).Build()
}
updatedTable, ok, err := newWorking.GetTable(ctx, newTblName)
if err != nil {
return errhand.BuildDError("Unable to load the table to build the indexes.").AddCause(err).Build()
} else if !ok {
return errhand.BuildDError("Unable to find the table to build the indexes.").Build()
}
updatedTable, err = updatedTable.RebuildIndexData(ctx)
if err != nil {
return errhand.BuildDError("Unable to build the indexes.").AddCause(err).Build()
}
newWorking, err = newWorking.PutTable(ctx, newTblName, updatedTable)
if err != nil {
return errhand.BuildDError("Unable to write the indexes to the working set.").AddCause(err).Build()
}
err = dEnv.UpdateWorkingRoot(ctx, newWorking)
if err != nil {
return errhand.BuildDError("Unable to update the working set containing the indexes.").AddCause(err).Build()
}
return nil
}
38 changes: 0 additions & 38 deletions go/cmd/dolt/commands/tblcmds/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,6 @@ func (cmd ImportCmd) Exec(ctx context.Context, commandStr string, args []string,
if skipped > 0 {
cli.PrintErrln(color.YellowString("Lines skipped: %d", skipped))
}
if verr != nil {
return commands.HandleVErrAndExitCode(verr, usage)
}

verr = buildIndexes(ctx, dEnv, mvOpts.tableName)

if verr == nil {
cli.PrintErrln(color.CyanString("Import completed successfully."))
}
Expand Down Expand Up @@ -445,11 +439,6 @@ func newImportDataMover(ctx context.Context, root *doltdb.RootValue, fs filesys.
}
}()

if impOpts.srcIsStream() {
// todo: capture stream data to file so we can use schema inference
wrSch = rd.GetSchema()
}

err = wrSch.GetPKCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
preImage := impOpts.nameMapper.PreImage(col.Name)
_, found := rd.GetSchema().GetAllCols().GetByName(preImage)
Expand Down Expand Up @@ -570,33 +559,6 @@ func inferSchema(ctx context.Context, root *doltdb.RootValue, rd table.TableRead
return schema.SchemaFromCols(newCols), nil
}

func buildIndexes(ctx context.Context, dEnv *env.DoltEnv, newTblName string) errhand.VerboseError {
//TODO: change this to not use the executeImport function, and instead the SQL code path, so that we don't rebuild indexes on every import
newWorking, err := dEnv.WorkingRoot(ctx)
if err != nil {
return errhand.BuildDError("Unable to load the working set to build the indexes.").AddCause(err).Build()
}
updatedTable, ok, err := newWorking.GetTable(ctx, newTblName)
if err != nil {
return errhand.BuildDError("Unable to load the table to build the indexes.").AddCause(err).Build()
} else if !ok {
return errhand.BuildDError("Unable to find the table to build the indexes.").Build()
}
updatedTable, err = updatedTable.RebuildIndexData(ctx)
if err != nil {
return errhand.BuildDError("Unable to build the indexes.").AddCause(err).Build()
}
newWorking, err = newWorking.PutTable(ctx, newTblName, updatedTable)
if err != nil {
return errhand.BuildDError("Unable to write the indexes to the working set.").AddCause(err).Build()
}
err = dEnv.UpdateWorkingRoot(ctx, newWorking)
if err != nil {
return errhand.BuildDError("Unable to update the working set containing the indexes.").AddCause(err).Build()
}
return nil
}

func newDataMoverErrToVerr(mvOpts *importOptions, err *mvdata.DataMoverCreationError) errhand.VerboseError {
switch err.ErrType {
case mvdata.CreateReaderErr:
Expand Down
1 change: 1 addition & 0 deletions go/libraries/doltcore/doltdb/index_editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (indexEd *IndexEditor) Flush(ctx context.Context) error {
}
newIndexData, err := indexEd.ed.Map(ctx)
if err != nil {
indexEd.reset(indexEd.data)
return err
}
indexEd.reset(newIndexData)
Expand Down
38 changes: 29 additions & 9 deletions go/libraries/doltcore/doltdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,37 @@ type Table struct {
tableStruct types.Struct
}

// NewTable creates a noms Struct which stores the schema and the row data
func NewTable(ctx context.Context, vrw types.ValueReadWriter, schema types.Value, rowData types.Map, indexData *types.Map) (*Table, error) {
// NewTable creates a noms Struct which stores the schema and the row data. If indexData is nil, then it is rebuilt.
func NewTable(ctx context.Context, vrw types.ValueReadWriter, schemaVal types.Value, rowData types.Map, indexData *types.Map) (*Table, error) {
if indexData == nil {
emptyIndexData, err := types.NewMap(ctx, vrw)
sch, err := encoding.UnmarshalSchemaNomsValue(ctx, rowData.Format(), schemaVal)
if err != nil {
return nil, err
}
indexData = &emptyIndexData
indexesMap, err := types.NewMap(ctx, vrw)
if err != nil {
return nil, err
}

for _, index := range sch.Indexes().AllIndexes() {
rebuiltIndexRowData, err := rebuildIndexRowData(ctx, vrw, sch, rowData, index)
if err != nil {
return nil, err
}
rebuiltIndexRowDataRef, err := writeValAndGetRef(ctx, vrw, rebuiltIndexRowData)
if err != nil {
return nil, err
}
indexesMap, err = indexesMap.Edit().Set(types.String(index.Name()), rebuiltIndexRowDataRef).Map(ctx)
if err != nil {
return nil, err
}
}

indexData = &indexesMap
}

schemaRef, err := writeValAndGetRef(ctx, vrw, schema)
schemaRef, err := writeValAndGetRef(ctx, vrw, schemaVal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -555,7 +575,7 @@ func (t *Table) RebuildIndexData(ctx context.Context) (*Table, error) {
}

for _, index := range sch.Indexes().AllIndexes() {
rebuiltIndexRowData, err := t.rebuildIndexRowData(ctx, sch, tableRowData, index)
rebuiltIndexRowData, err := rebuildIndexRowData(ctx, t.vrw, sch, tableRowData, index)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -627,7 +647,7 @@ func (t *Table) RebuildIndexRowData(ctx context.Context, indexName string) (type
return types.EmptyMap, fmt.Errorf("index `%s` does not exist", indexName)
}

rebuiltIndexData, err := t.rebuildIndexRowData(ctx, sch, tableRowData, index)
rebuiltIndexData, err := rebuildIndexRowData(ctx, t.vrw, sch, tableRowData, index)
if err != nil {
return types.EmptyMap, err
}
Expand Down Expand Up @@ -677,8 +697,8 @@ func (t *Table) DeleteIndexRowData(ctx context.Context, indexName string) (*Tabl
return t.SetIndexData(ctx, indexesMap)
}

func (t *Table) rebuildIndexRowData(ctx context.Context, sch schema.Schema, tblRowData types.Map, index schema.Index) (types.Map, error) {
emptyIndexMap, err := types.NewMap(ctx, t.vrw)
func rebuildIndexRowData(ctx context.Context, vrw types.ValueReadWriter, sch schema.Schema, tblRowData types.Map, index schema.Index) (types.Map, error) {
emptyIndexMap, err := types.NewMap(ctx, vrw)
if err != nil {
return types.EmptyMap, err
}
Expand Down
88 changes: 56 additions & 32 deletions go/libraries/doltcore/doltdb/table_editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,22 @@ func (te *TableEditor) Flush(ctx context.Context) (*Table, error) {

updated, err := te.ed.Map(ctx)
if err != nil {
_ = te.reset(ctx, te.t, te.tSch)
return nil, errhand.BuildDError("failed to modify table").AddCause(err).Build()
}
originalRowData, err := te.t.GetRowData(ctx)
if err != nil {
_ = te.reset(ctx, te.t, te.tSch)
return nil, errhand.BuildDError("failed to read table").AddCause(err).Build()
}
newTable, err := te.t.UpdateRows(ctx, updated)
if err != nil {
_ = te.reset(ctx, te.t, te.tSch)
return nil, errhand.BuildDError("failed to update rows").AddCause(err).Build()
}
newTable, err = te.updateIndexes(ctx, newTable, originalRowData, updated)
if err != nil {
_ = te.reset(ctx, te.t, te.tSch)
return nil, errhand.BuildDError("failed to update indexes").AddCause(err).Build()
}

Expand Down Expand Up @@ -275,49 +279,69 @@ func (te *TableEditor) updateIndexes(ctx context.Context, tbl *Table, originalRo
return tbl, nil
}

wg := &sync.WaitGroup{}
var anyErr error // we only care to catch any error, doesn't matter if it's overwritten

for _, key := range te.affectedKeys {
var originalRow row.Row
var updatedRow row.Row
wg.Add(1)
go func(key types.Value) {
defer wg.Done()

if val, ok, err := originalRowData.MaybeGet(ctx, key); err == nil && ok {
originalRow, err = row.FromNoms(te.tSch, key.(types.Tuple), val.(types.Tuple))
if err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}
if val, ok, err := updated.MaybeGet(ctx, key); err == nil && ok {
updatedRow, err = row.FromNoms(te.tSch, key.(types.Tuple), val.(types.Tuple))
if err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}
var originalRow row.Row
var updatedRow row.Row

for _, indexEd := range te.indexEds {
var err error
var originalIndexRow row.Row
var updatedIndexRow row.Row
if originalRow != nil {
originalIndexRow, err = originalRow.ReduceToIndex(indexEd.Index())
if val, ok, err := originalRowData.MaybeGet(ctx, key); err == nil && ok {
originalRow, err = row.FromNoms(te.tSch, key.(types.Tuple), val.(types.Tuple))
if err != nil {
return nil, err
anyErr = err
return
}
} else if err != nil {
anyErr = err
return
}
if updatedRow != nil {
updatedIndexRow, err = updatedRow.ReduceToIndex(indexEd.Index())
if val, ok, err := updated.MaybeGet(ctx, key); err == nil && ok {
updatedRow, err = row.FromNoms(te.tSch, key.(types.Tuple), val.(types.Tuple))
if err != nil {
return nil, err
anyErr = err
return
}
} else if err != nil {
anyErr = err
return
}

err = indexEd.UpdateIndex(ctx, originalIndexRow, updatedIndexRow)
if err != nil {
return nil, err
for _, indexEd := range te.indexEds {
var err error
var originalIndexRow row.Row
var updatedIndexRow row.Row
if originalRow != nil {
originalIndexRow, err = originalRow.ReduceToIndex(indexEd.Index())
if err != nil {
anyErr = err
return
}
}
if updatedRow != nil {
updatedIndexRow, err = updatedRow.ReduceToIndex(indexEd.Index())
if err != nil {
anyErr = err
return
}
}

err = indexEd.UpdateIndex(ctx, originalIndexRow, updatedIndexRow)
if err != nil {
anyErr = err
return
}
}
}
}(key)
}

wg.Wait()
if anyErr != nil {
return nil, anyErr
}

for _, indexEd := range te.indexEds {
Expand Down
Loading

0 comments on commit ad76f8a

Please sign in to comment.