Skip to content

Commit

Permalink
*: support read only lock (pingcap#21100)
Browse files Browse the repository at this point in the history
Signed-off-by: Shuaipeng Yu <[email protected]>
  • Loading branch information
jackysp authored Nov 25, 2020
1 parent ab2c337 commit 7279446
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 98 deletions.
72 changes: 72 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5630,6 +5630,78 @@ func (s *testDBSuite4) TestConcurrentLockTables(c *C) {
tk2.MustExec("unlock tables")
}

func (s *testDBSuite4) TestLockTableReadOnly(c *C) {
if israce.RaceEnabled {
c.Skip("skip race test")
}
tk := testkit.NewTestKit(c, s.store)
tk2 := testkit.NewTestKit(c, s.store)
tk2.MustExec("use test")

tk.MustExec("use test")
tk.MustExec("drop table if exists t1,t2")
defer func() {
tk.MustExec("alter table t1 read write")
tk.MustExec("alter table t2 read write")
tk.MustExec("drop table if exists t1,t2")
}()
tk.MustExec("create table t1 (a int key, b int)")
tk.MustExec("create table t2 (a int key)")

tk.MustExec("alter table t1 read only")
tk.MustQuery("select * from t1")
tk2.MustQuery("select * from t1")
_, err := tk.Exec("insert into t1 set a=1, b=2")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)
_, err = tk.Exec("update t1 set a=1")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)
_, err = tk.Exec("delete from t1")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)

_, err = tk2.Exec("insert into t1 set a=1, b=2")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)
_, err = tk2.Exec("update t1 set a=1")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)
_, err = tk2.Exec("delete from t1")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)
tk2.MustExec("alter table t1 read only")
_, err = tk2.Exec("insert into t1 set a=1, b=2")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)
tk.MustExec("alter table t1 read write")

tk.MustExec("lock tables t1 read")
c.Assert(terror.ErrorEqual(tk.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked), IsTrue)
c.Assert(terror.ErrorEqual(tk2.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked), IsTrue)
tk.MustExec("lock tables t1 write")
c.Assert(terror.ErrorEqual(tk.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked), IsTrue)
c.Assert(terror.ErrorEqual(tk2.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked), IsTrue)
tk.MustExec("lock tables t1 write local")
c.Assert(terror.ErrorEqual(tk.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked), IsTrue)
c.Assert(terror.ErrorEqual(tk2.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked), IsTrue)
tk.MustExec("unlock tables")

tk.MustExec("alter table t1 read only")
c.Assert(terror.ErrorEqual(tk.ExecToErr("lock tables t1 read"), infoschema.ErrTableLocked), IsTrue)
c.Assert(terror.ErrorEqual(tk2.ExecToErr("lock tables t1 read"), infoschema.ErrTableLocked), IsTrue)
c.Assert(terror.ErrorEqual(tk.ExecToErr("lock tables t1 write"), infoschema.ErrTableLocked), IsTrue)
c.Assert(terror.ErrorEqual(tk2.ExecToErr("lock tables t1 write"), infoschema.ErrTableLocked), IsTrue)
c.Assert(terror.ErrorEqual(tk.ExecToErr("lock tables t1 write local"), infoschema.ErrTableLocked), IsTrue)
c.Assert(terror.ErrorEqual(tk2.ExecToErr("lock tables t1 write local"), infoschema.ErrTableLocked), IsTrue)
tk.MustExec("admin cleanup table lock t1")
tk2.MustExec("insert into t1 set a=1, b=2")

tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1")
tk.MustExec("begin pessimistic")
tk.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 2"))
tk2.MustExec("update t1 set b = 3")
tk2.MustExec("alter table t1 read only")
tk2.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 3"))
tk.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 2"))
tk.MustExec("update t1 set b = 4")
c.Assert(terror.ErrorEqual(tk.ExecToErr("commit"), domain.ErrInfoSchemaChanged), IsTrue)
tk2.MustExec("alter table t1 read write")
}

func (s *testDBSuite4) testParallelExecSQL(c *C, sql1, sql2 string, se1, se2 session.Session, f checkRet) {
callback := &ddl.TestDDLCallback{}
times := 0
Expand Down
18 changes: 18 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2425,6 +2425,24 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
err = d.DropTablePartition(ctx, ident, spec)
case ast.AlterTableTruncatePartition:
err = d.TruncateTablePartition(ctx, ident, spec)
case ast.AlterTableWriteable:
if !config.TableLockEnabled() {
return nil
}
tName := &ast.TableName{Schema: ident.Schema, Name: ident.Name}
if spec.Writeable {
err = d.CleanupTableLock(ctx, []*ast.TableName{tName})
} else {
lockStmt := &ast.LockTablesStmt{
TableLocks: []ast.TableLock{
{
Table: tName,
Type: model.TableLockReadOnly,
},
},
}
err = d.LockTables(ctx, lockStmt)
}
case ast.AlterTableExchangePartition:
err = d.ExchangeTablePartition(ctx, ident, spec)
case ast.AlterTableAddConstraint:
Expand Down
10 changes: 6 additions & 4 deletions ddl/table_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func lockTable(tbInfo *model.TableInfo, idx int, arg *lockTablesArg) error {
if tbInfo.Lock.State == model.TableLockStatePreLock {
return nil
}
if tbInfo.Lock.Tp == model.TableLockRead && arg.LockTables[idx].Tp == model.TableLockRead {
if (tbInfo.Lock.Tp == model.TableLockRead && arg.LockTables[idx].Tp == model.TableLockRead) ||
(tbInfo.Lock.Tp == model.TableLockReadOnly && arg.LockTables[idx].Tp == model.TableLockReadOnly) {
sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo)
// repeat lock.
if sessionIndex >= 0 {
Expand All @@ -145,7 +146,8 @@ func checkTableLocked(tbInfo *model.TableInfo, lockTp model.TableLockType, sessi
if tbInfo.Lock.State == model.TableLockStatePreLock {
return nil
}
if tbInfo.Lock.Tp == model.TableLockRead && lockTp == model.TableLockRead {
if (tbInfo.Lock.Tp == model.TableLockRead && lockTp == model.TableLockRead) ||
(tbInfo.Lock.Tp == model.TableLockReadOnly && lockTp == model.TableLockReadOnly) {
return nil
}
sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, sessionInfo)
Expand All @@ -154,8 +156,8 @@ func checkTableLocked(tbInfo *model.TableInfo, lockTp model.TableLockType, sessi
if tbInfo.Lock.Tp == lockTp {
return nil
}
// If no other session locked this table.
if len(tbInfo.Lock.Sessions) == 1 {
// If no other session locked this table, and it is not a read only lock (session unrelated).
if len(tbInfo.Lock.Sessions) == 1 && tbInfo.Lock.Tp != model.TableLockReadOnly {
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func (e *DDLExec) executeLockTables(s *ast.LockTablesStmt) error {
return domain.GetDomain(e.ctx).DDL().LockTables(e.ctx, s)
}

func (e *DDLExec) executeUnlockTables(s *ast.UnlockTablesStmt) error {
func (e *DDLExec) executeUnlockTables(_ *ast.UnlockTablesStmt) error {
if !config.TableLockEnabled() {
return nil
}
Expand Down
34 changes: 17 additions & 17 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,26 +340,26 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
// fallthrough to snapshot get.
}

isLocked := e.tblInfo.IsLocked()
if !isLocked || e.tblInfo.Lock.Tp != model.TableLockRead { // if not read lock or table was unlock then snapshot get
return e.snapshot.Get(ctx, key)
}

cacheDB := e.ctx.GetStore().GetMemCache()
val = cacheDB.Get(ctx, e.tblInfo.ID, key)
// key does not exist then get from snapshot and set to cache
if val == nil {
val, err = e.snapshot.Get(ctx, key)
if err != nil {
return nil, err
}
lock := e.tblInfo.Lock
if lock != nil && (lock.Tp == model.TableLockRead || lock.Tp == model.TableLockReadOnly) {
cacheDB := e.ctx.GetStore().GetMemCache()
val = cacheDB.Get(ctx, e.tblInfo.ID, key)
// key does not exist then get from snapshot and set to cache
if val == nil {
val, err = e.snapshot.Get(ctx, key)
if err != nil {
return nil, err
}

err := cacheDB.Set(e.tblInfo.ID, key, val)
if err != nil {
return nil, err
err := cacheDB.Set(e.tblInfo.ID, key, val)
if err != nil {
return nil, err
}
}
return val, nil
}
return val, nil
// if not read lock or table was unlock then snapshot get
return e.snapshot.Get(ctx, key)
}

// EncodeUniqueIndexKey encodes a unique index key.
Expand Down
17 changes: 11 additions & 6 deletions lock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewChecker(ctx sessionctx.Context, is infoschema.InfoSchema) *Checker {
}

// CheckTableLock uses to check table lock.
func (c *Checker) CheckTableLock(db, table string, privilege mysql.PrivilegeType) error {
func (c *Checker) CheckTableLock(db, table string, privilege mysql.PrivilegeType, alterWriteable bool) error {
if db == "" && table == "" {
return nil
}
Expand All @@ -43,7 +43,7 @@ func (c *Checker) CheckTableLock(db, table string, privilege mysql.PrivilegeType
return nil
}
// check operation on database.
if table == "" {
if !alterWriteable && table == "" {
return c.CheckLockInDB(db, privilege)
}
switch privilege {
Expand All @@ -67,7 +67,7 @@ func (c *Checker) CheckTableLock(db, table string, privilege mysql.PrivilegeType
if err != nil {
return err
}
if c.ctx.HasLockedTables() {
if !alterWriteable && c.ctx.HasLockedTables() {
if locked, tp := c.ctx.CheckTableLocked(tb.Meta().ID); locked {
if checkLockTpMeetPrivilege(tp, privilege) {
return nil
Expand All @@ -83,19 +83,24 @@ func (c *Checker) CheckTableLock(db, table string, privilege mysql.PrivilegeType

if privilege == mysql.SelectPriv {
switch tb.Meta().Lock.Tp {
case model.TableLockRead, model.TableLockWriteLocal:
case model.TableLockRead, model.TableLockWriteLocal, model.TableLockReadOnly:
return nil
}
}
if alterWriteable && tb.Meta().Lock.Tp == model.TableLockReadOnly {
return nil
}

return infoschema.ErrTableLocked.GenWithStackByArgs(tb.Meta().Name.L, tb.Meta().Lock.Tp, tb.Meta().Lock.Sessions[0])
}

func checkLockTpMeetPrivilege(tp model.TableLockType, privilege mysql.PrivilegeType) bool {
// TableLockReadOnly doesn't need to check in this, because it is session unrelated.
switch tp {
case model.TableLockWrite, model.TableLockWriteLocal:
return true
case model.TableLockRead:
// ShowDBPriv, AllPrivMask,CreatePriv, CreateViewPriv already checked before.
// ShowDBPriv, AllPrivMask, CreatePriv, CreateViewPriv already checked before.
// The other privilege in read lock was not allowed.
if privilege == mysql.SelectPriv {
return true
Expand All @@ -117,7 +122,7 @@ func (c *Checker) CheckLockInDB(db string, privilege mysql.PrivilegeType) error
}
tables := c.is.SchemaTables(model.NewCIStr(db))
for _, tbl := range tables {
err := c.CheckTableLock(db, tbl.Meta().Name.L, privilege)
err := c.CheckTableLock(db, tbl.Meta().Name.L, privilege, false)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 7279446

Please sign in to comment.