Skip to content

Commit

Permalink
topsql: refine collect information for DDL and internal SQL (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Jul 15, 2021
1 parent 60b600e commit 2b192e3
Show file tree
Hide file tree
Showing 35 changed files with 351 additions and 260 deletions.
11 changes: 6 additions & 5 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
return result
}

func (w *backfillWorker) run(d *ddlCtx, bf backfiller) {
func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
logutil.BgLogger().Info("[ddl] backfill worker start", zap.Int("workerID", w.id))
defer func() {
w.resultCh <- &backfillResult{err: errReorgPanic}
Expand All @@ -297,6 +297,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller) {
if !more {
break
}
w.ddlWorker.setDDLLabelForTopSQL(job)

logutil.BgLogger().Debug("[ddl] backfill worker got task", zap.Int("workerID", w.id), zap.String("task", task.String()))
failpoint.Inject("mockBackfillRunErr", func() {
Expand Down Expand Up @@ -497,7 +498,7 @@ func loadDDLReorgVars(w *worker) error {
return errors.Trace(err)
}
defer w.sessPool.put(ctx)
return ddlutil.LoadDDLReorgVars(ctx)
return ddlutil.LoadDDLReorgVars(w.ddlJobCtx, ctx)
}

func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) {
Expand Down Expand Up @@ -599,17 +600,17 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
case typeUpdateColumnWorker:
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
updateWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker, job)
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
default:
return errors.New("unknow backfill type")
}
Expand Down
14 changes: 7 additions & 7 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,7 +1646,7 @@ func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,

// checkForNullValue ensure there are no null values of the column of this table.
// `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql.
func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, newCol model.CIStr, oldCols ...*model.ColumnInfo) error {
func checkForNullValue(ctx context.Context, sctx sessionctx.Context, isDataTruncated bool, schema, table, newCol model.CIStr, oldCols ...*model.ColumnInfo) error {
var buf strings.Builder
buf.WriteString("select 1 from %n.%n where ")
paramsList := make([]interface{}, 0, 2+len(oldCols))
Expand All @@ -1661,11 +1661,11 @@ func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, tab
}
}
buf.WriteString(" limit 1")
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), buf.String(), paramsList...)
stmt, err := sctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(ctx, buf.String(), paramsList...)
if err != nil {
return errors.Trace(err)
}
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(ctx, stmt)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1785,12 +1785,12 @@ func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.
func modifyColsFromNull2NotNull(w *worker, dbInfo *model.DBInfo, tblInfo *model.TableInfo, cols []*model.ColumnInfo,
newColName model.CIStr, isDataTruncated bool) error {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
var sctx sessionctx.Context
sctx, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(ctx)
defer w.sessPool.put(sctx)

skipCheck := false
failpoint.Inject("skipMockContextDoExec", func(val failpoint.Value) {
Expand All @@ -1800,7 +1800,7 @@ func modifyColsFromNull2NotNull(w *worker, dbInfo *model.DBInfo, tblInfo *model.
})
if !skipCheck {
// If there is a null value inserted, it cannot be modified and needs to be rollback.
err = checkForNullValue(ctx, isDataTruncated, dbInfo.Name, tblInfo.Name, newColName, cols...)
err = checkForNullValue(w.ddlJobCtx, sctx, isDataTruncated, dbInfo.Name, tblInfo.Name, newColName, cols...)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type DDL interface {
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr,
columnNames []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error
DropIndex(ctx sessionctx.Context, tableIdent ast.Ident, indexName model.CIStr, ifExists bool) error
AlterTable(ctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error
AlterTable(ctx context.Context, sctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error
TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error
RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident, isAlterTable bool) error
RenameTables(ctx sessionctx.Context, oldTableIdent, newTableIdent []ast.Ident, isAlterTable bool) error
Expand Down
Loading

0 comments on commit 2b192e3

Please sign in to comment.