Skip to content

Commit

Permalink
ddl, meta: support altering auto_increment ID to a smaller value (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Jul 13, 2021
1 parent 88cbdec commit a3919e3
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 18 deletions.
90 changes: 90 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl_test
import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -2623,6 +2624,95 @@ func (s *testSerialDBSuite1) TestAutoIncrementTableOption(c *C) {
tk.MustQuery("select * from t;").Check(testkit.Rows("12345678901234567890"))
}

func (s *testIntegrationSuite3) TestAutoIncrementForce(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("drop database if exists auto_inc_force;")
tk.MustExec("create database auto_inc_force;")
tk.MustExec("use auto_inc_force;")
getNextGlobalID := func() uint64 {
gidStr := tk.MustQuery("show table t next_row_id").Rows()[0][3]
gid, err := strconv.ParseUint(gidStr.(string), 10, 64)
c.Assert(err, IsNil)
return gid
}
// Rebase _tidb_row_id.
tk.MustExec("create table t (a int);")
tk.MustExec("insert into t values (1),(2);")
tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("1 1", "2 2"))
// Cannot set next global ID to 0.
tk.MustGetErrCode("alter table t force auto_increment = 0;", errno.ErrAutoincReadFailed)
tk.MustExec("alter table t force auto_increment = 1;")
c.Assert(getNextGlobalID(), Equals, uint64(1))
// inserting new rows can overwrite the existing data.
tk.MustExec("insert into t values (3);")
tk.MustExec("insert into t values (3);")
tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("3 1", "3 2"))

// Rebase auto_increment.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int primary key auto_increment, b int);")
tk.MustExec("insert into t values (1, 1);")
tk.MustExec("insert into t values (100000000, 1);")
tk.MustExec("delete from t where a = 100000000;")
c.Assert(getNextGlobalID(), Greater, uint64(100000000))
// Cannot set next global ID to 0.
tk.MustGetErrCode("alter table t /*T![force_inc] force */ auto_increment = 0;", errno.ErrAutoincReadFailed)
tk.MustExec("alter table t /*T![force_inc] force */ auto_increment = 2;")
c.Assert(getNextGlobalID(), Equals, uint64(2))
tk.MustExec("insert into t(b) values (2);")
tk.MustQuery("select a, b from t;").Check(testkit.Rows("1 1", "2 2"))

// Rebase auto_random.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint primary key auto_random(5));")
tk.MustExec("insert into t values ();")
tk.MustExec("set @@allow_auto_random_explicit_insert = true")
tk.MustExec("insert into t values (100000000);")
tk.MustExec("delete from t where a = 100000000;")
c.Assert(getNextGlobalID(), Greater, uint64(100000000))
// Cannot set next global ID to 0.
tk.MustGetErrCode("alter table t force auto_random_base = 0;", errno.ErrAutoincReadFailed)
tk.MustExec("alter table t force auto_random_base = 2;")
c.Assert(getNextGlobalID(), Equals, uint64(2))
tk.MustExec("insert into t values ();")
tk.MustQuery("select (a & 3) from t order by 1;").Check(testkit.Rows("1", "2"))

// Change next global ID.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint primary key auto_increment);")
tk.MustExec("insert into t values (1);")
bases := []uint64{1, 65535, 10, math.MaxUint64, math.MaxInt64 + 1, 1, math.MaxUint64, math.MaxInt64, 2}
lastBase := fmt.Sprintf("%d", bases[len(bases)-1])
for _, b := range bases {
tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b))
c.Assert(getNextGlobalID(), Equals, b)
}
tk.MustExec("insert into t values ();")
tk.MustQuery("select a from t;").Check(testkit.Rows("1", lastBase))
// Force alter unsigned int auto_increment column.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint unsigned primary key auto_increment);")
for _, b := range bases {
tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b))
c.Assert(getNextGlobalID(), Equals, b)
tk.MustExec("insert into t values ();")
tk.MustQuery("select a from t;").Check(testkit.Rows(fmt.Sprintf("%d", b)))
tk.MustExec("delete from t;")
}

// Force alter with @@auto_increment_increment and @@auto_increment_offset.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int key auto_increment);")
tk.MustExec("set @@auto_increment_offset=2;")
tk.MustExec("set @@auto_increment_increment = 11;")
tk.MustExec("insert into t values (500);")
tk.MustExec("alter table t force auto_increment=100;")
tk.MustExec("insert into t values (), ();")
tk.MustQuery("select * from t;").Check(testkit.Rows("101", "112", "500"))
tk.MustQuery("select * from t order by a;").Check(testkit.Rows("101", "112", "500"))
tk.MustExec("drop table if exists t;")
}

func (s *testIntegrationSuite3) TestIssue20490(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
Expand Down
37 changes: 24 additions & 13 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2532,15 +2532,15 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
}
err = d.ShardRowID(ctx, ident, opt.UintValue)
case ast.TableOptionAutoIncrement:
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.RowIDAllocType)
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.RowIDAllocType, opt.BoolValue)
case ast.TableOptionAutoIdCache:
if opt.UintValue > uint64(math.MaxInt64) {
// TODO: Refine this error.
return errors.New("table option auto_id_cache overflows int64")
}
err = d.AlterTableAutoIDCache(ctx, ident, int64(opt.UintValue))
case ast.TableOptionAutoRandomBase:
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.AutoRandomType)
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.AutoRandomType, opt.BoolValue)
case ast.TableOptionComment:
spec.Comment = opt.StrValue
err = d.AlterTableComment(ctx, ident, spec)
Expand Down Expand Up @@ -2596,7 +2596,7 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
return nil
}

func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType) error {
func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType, force bool) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ident)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -2625,31 +2625,42 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
actionType = model.ActionRebaseAutoID
}

if alloc := t.Allocators(ctx).Get(tp); alloc != nil {
autoID, err := alloc.NextGlobalAutoID(t.Meta().ID)
if !force {
newBase, err = adjustNewBaseToNextGlobalID(ctx, t, tp, newBase)
if err != nil {
return errors.Trace(err)
return err
}
// If newBase < autoID, we need to do a rebase before returning.
// Assume there are 2 TiDB servers: TiDB-A with allocator range of 0 ~ 30000; TiDB-B with allocator range of 30001 ~ 60000.
// If the user sends SQL `alter table t1 auto_increment = 100` to TiDB-B,
// and TiDB-B finds 100 < 30001 but returns without any handling,
// then TiDB-A may still allocate 99 for auto_increment column. This doesn't make sense for the user.
newBase = int64(mathutil.MaxUint64(uint64(newBase), uint64(autoID)))
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: actionType,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newBase},
Args: []interface{}{newBase, force},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func adjustNewBaseToNextGlobalID(ctx sessionctx.Context, t table.Table, tp autoid.AllocatorType, newBase int64) (int64, error) {
alloc := t.Allocators(ctx).Get(tp)
if alloc == nil {
return newBase, nil
}
autoID, err := alloc.NextGlobalAutoID(t.Meta().ID)
if err != nil {
return newBase, errors.Trace(err)
}
// If newBase < autoID, we need to do a rebase before returning.
// Assume there are 2 TiDB servers: TiDB-A with allocator range of 0 ~ 30000; TiDB-B with allocator range of 30001 ~ 60000.
// If the user sends SQL `alter table t1 auto_increment = 100` to TiDB-B,
// and TiDB-B finds 100 < 30001 but returns without any handling,
// then TiDB-A may still allocate 99 for auto_increment column. This doesn't make sense for the user.
return int64(mathutil.MaxUint64(uint64(newBase), uint64(autoID))), nil
}

// ShardRowID shards the implicit row ID by adding shard value to the row ID's first few bits.
func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint64) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, tableIdent)
Expand Down
14 changes: 11 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,11 @@ func onRebaseAutoRandomType(store kv.Storage, t *meta.Meta, job *model.Job) (ver

func onRebaseAutoID(store kv.Storage, t *meta.Meta, job *model.Job, tp autoid.AllocatorType) (ver int64, _ error) {
schemaID := job.SchemaID
var newBase int64
err := job.DecodeArgs(&newBase)
var (
newBase int64
force bool
)
err := job.DecodeArgs(&newBase, &force)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -581,8 +584,13 @@ func onRebaseAutoID(store kv.Storage, t *meta.Meta, job *model.Job, tp autoid.Al
if alloc := tbl.Allocators(nil).Get(tp); alloc != nil {
// The next value to allocate is `newBase`.
newEnd := newBase - 1
err = alloc.Rebase(tblInfo.ID, newEnd, false)
if force {
err = alloc.ForceRebase(tblInfo.ID, newEnd)
} else {
err = alloc.Rebase(tblInfo.ID, newEnd, false)
}
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}
Expand Down
40 changes: 38 additions & 2 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type Allocator interface {
// If allocIDs is false, it will not allocate IDs.
Rebase(tableID, newBase int64, allocIDs bool) error

// ForceRebase set the next global auto ID to newBase.
ForceRebase(tableID, newBase int64) error

// RebaseSeq rebases the sequence value in number axis with tableID and the new base value.
RebaseSeq(table, newBase int64) (int64, bool, error)

Expand Down Expand Up @@ -370,16 +373,49 @@ func (alloc *allocator) Rebase(tableID, requiredBase int64, allocIDs bool) error
if tableID == 0 {
return errInvalidTableID.GenWithStack("Invalid tableID")
}

alloc.mu.Lock()
defer alloc.mu.Unlock()

if alloc.isUnsigned {
return alloc.rebase4Unsigned(tableID, uint64(requiredBase), allocIDs)
}
return alloc.rebase4Signed(tableID, requiredBase, allocIDs)
}

// ForceRebase implements autoid.Allocator ForceRebase interface.
func (alloc *allocator) ForceRebase(tableID, requiredBase int64) error {
if tableID <= 0 {
return errInvalidTableID.GenWithStack("Invalid tableID")
}
if requiredBase == -1 {
return ErrAutoincReadFailed.GenWithStack("Cannot force rebase the next global ID to '0'")
}
alloc.mu.Lock()
defer alloc.mu.Unlock()
startTime := time.Now()
err := kv.RunInNewTxn(context.Background(), alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
currentEnd, err1 := GetAutoID(m, alloc.dbID, tableID, alloc.allocType)
if err1 != nil {
return err1
}
var step int64
if !alloc.isUnsigned {
step = requiredBase - currentEnd
} else {
uRequiredBase, uCurrentEnd := uint64(requiredBase), uint64(currentEnd)
step = int64(uRequiredBase - uCurrentEnd)
}
_, err1 = GenerateAutoID(m, alloc.dbID, tableID, step, alloc.allocType)
return err1
})
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDRebase, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return err
}
alloc.base, alloc.end = requiredBase, requiredBase
return nil
}

// Rebase implements autoid.Allocator RebaseSeq interface.
// The return value is quite same as expression function, bool means whether it should be NULL,
// here it will be used in setval expression function (true meaning the set value has been satisfied, return NULL).
Expand Down
6 changes: 6 additions & 0 deletions meta/autoid/memid.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ func (alloc *inMemoryAllocator) Rebase(tableID, requiredBase int64, allocIDs boo
return nil
}

// ForceRebase implements autoid.Allocator ForceRebase interface.
func (alloc *inMemoryAllocator) ForceRebase(tableID, requiredBase int64) error {
alloc.base = requiredBase
return nil
}

func (alloc *inMemoryAllocator) alloc4Signed(n uint64, increment, offset int64) (int64, int64, error) {
// Check offset rebase if necessary.
if offset-1 > alloc.base {
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,14 @@ func (s *testBinlogSuite) TestAddSpecialComment(c *C) {
"create table t1 (id int, a varchar(255) key clustered);",
"create table t1 (id int, a varchar(255) key /*T![clustered_index] clustered */ );",
},
{
"alter table t force auto_increment = 12;",
"alter table t /*T![force_inc] force */ auto_increment = 12;",
},
{
"alter table t force, auto_increment = 12;",
"alter table t force, auto_increment = 12;",
},
}
for _, ca := range testCase {
re := binloginfo.AddSpecialComment(ca.input)
Expand Down
4 changes: 4 additions & 0 deletions types/parser_driver/special_cmt_ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
parser.SpecialCommentsController.Register(string(FeatureIDAutoIDCache))
parser.SpecialCommentsController.Register(string(FeatureIDAutoRandomBase))
parser.SpecialCommentsController.Register(string(FeatureClusteredIndex))
parser.SpecialCommentsController.Register(string(FeatureForceAutoInc))
}

// SpecialCommentVersionPrefix is the prefix of TiDB executable comments.
Expand All @@ -61,6 +62,8 @@ const (
FeatureIDAutoRandomBase featureID = "auto_rand_base"
// FeatureClusteredIndex is the `clustered_index` feature.
FeatureClusteredIndex featureID = "clustered_index"
// FeatureForceAutoInc is the `force auto_increment` feature.
FeatureForceAutoInc featureID = "force_inc"
)

// FeatureIDPatterns is used to record special comments patterns.
Expand All @@ -69,4 +72,5 @@ var FeatureIDPatterns = map[featureID]*regexp.Regexp{
FeatureIDAutoIDCache: regexp.MustCompile(`(?P<REPLACE>(?i)AUTO_ID_CACHE\s*=?\s*\d+\s*)`),
FeatureIDAutoRandomBase: regexp.MustCompile(`(?P<REPLACE>(?i)AUTO_RANDOM_BASE\s*=?\s*\d+\s*)`),
FeatureClusteredIndex: regexp.MustCompile(`(?i)(PRIMARY)?\s+KEY(\s*\(.*\))?\s+(?P<REPLACE>(NON)?CLUSTERED\b)`),
FeatureForceAutoInc: regexp.MustCompile(`(?P<REPLACE>(?i)FORCE)\b\s*AUTO_INCREMENT\s*`),
}

0 comments on commit a3919e3

Please sign in to comment.