Skip to content

Commit

Permalink
support create database/table level snapshot (#20850)
Browse files Browse the repository at this point in the history
support create database/table level snapshot

Approved by: @heni02, @daviszhen
  • Loading branch information
YANGGMM authored Dec 20, 2024
1 parent 2b6ab7e commit b4cb363
Show file tree
Hide file tree
Showing 3 changed files with 357 additions and 31 deletions.
216 changes: 185 additions & 31 deletions pkg/frontend/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package frontend
import (
"context"
"fmt"
"math"
"slices"
"strings"
"time"

"github.com/google/uuid"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -188,21 +190,56 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
return err
}

// 2.only sys can create cluster level snapshot
tenantInfo := ses.GetTenantInfo()
currentAccount := tenantInfo.GetTenant()
snapshotLevel = stmt.Object.SLevel.Level
if snapshotLevel == tree.SNAPSHOTLEVELCLUSTER && currentAccount != sysAccountName {
return moerr.NewInternalError(ctx, "only sys tenant can create cluster level snapshot")

// 1.check create snapshot priv
err = doCheckCreateSnapshotPriv(ctx, ses, stmt)
if err != nil {
return err
}

// 3.only sys can create tenant level snapshot for other tenant
if snapshotLevel == tree.SNAPSHOTLEVELACCOUNT {
snapshotForAccount = string(stmt.Object.ObjName)
if currentAccount != sysAccountName && currentAccount != snapshotForAccount {
return moerr.NewInternalError(ctx, "only sys tenant can create tenant level snapshot for other tenant")
// 2. check snapshot exists or not
snapshotName = string(stmt.Name)
snapshotExist, err = checkSnapShotExistOrNot(ctx, bh, snapshotName)
if err != nil {
return err
}
if snapshotExist {
if !stmt.IfNotExists {
return moerr.NewInternalErrorf(ctx, "snapshot %s already exists", snapshotName)
} else {
return nil
}
}

// 3.1 generate snapshot id
newUUid, err := uuid.NewV7()
if err != nil {
return err
}
snapshotId = newUUid.String()

// 3. get database name , table name and objId according to the snapshot level
switch snapshotLevel {
case tree.SNAPSHOTLEVELCLUSTER:
sql, err = getSqlForCreateSnapshot(
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotLevel.String(),
"",
"",
"",
math.MaxUint64,
)
if err != nil {
return err
}
case tree.SNAPSHOTLEVELACCOUNT:
snapshotForAccount = string(stmt.Object.ObjName)
// check account exists or not and get accountId
getAccountIdFunc := func(accountName string) (accountId uint64, rtnErr error) {
var erArray []ExecResult
Expand Down Expand Up @@ -244,48 +281,165 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
} else {
objId = uint64(tenantInfo.GetTenantID())
}
}

// check snapshot exists or not
snapshotName = string(stmt.Name)
snapshotExist, err = checkSnapShotExistOrNot(ctx, bh, snapshotName)
if err != nil {
return err
}
if snapshotExist {
if !stmt.IfNotExists {
return moerr.NewInternalErrorf(ctx, "snapshot %s already exists", snapshotName)
} else {
return nil
sql, err = getSqlForCreateSnapshot(
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotLevel.String(),
snapshotForAccount,
"",
"",
objId,
)
if err != nil {
return err
}
} else {
// insert record to the system table
case tree.SNAPSHOTLEVELDATABASE:
databaseName = string(stmt.Object.ObjName)
if len(databaseName) > 0 && needSkipDb(databaseName) {
return moerr.NewInternalError(ctx, fmt.Sprintf("can not create snapshot for current database %s", databaseName))
}

getDatabaseIdFunc := func(dbName string) (dbId uint64, rtnErr error) {
var erArray []ExecResult
sql, rtnErr = getSqlForCheckDatabase(ctx, dbName)
if rtnErr != nil {
return 0, rtnErr
}
bh.ClearExecResultSet()
rtnErr = bh.Exec(ctx, sql)
if rtnErr != nil {
return 0, rtnErr
}

erArray, rtnErr = getResultSet(ctx, bh)
if rtnErr != nil {
return 0, rtnErr
}

// 1. get snapshot id
newUUid, err := uuid.NewV7()
if execResultArrayHasData(erArray) {
for i := uint64(0); i < erArray[0].GetRowCount(); i++ {
dbId, rtnErr = erArray[0].GetUint64(ctx, i, 0)
if rtnErr != nil {
return 0, rtnErr
}
}
} else {
return 0, moerr.NewInternalErrorf(ctx, "database %s does not exist", dbName)
}
return dbId, rtnErr
}
objId, err = getDatabaseIdFunc(databaseName)
if err != nil {
return err
}

sql, err = getSqlForCreateSnapshot(
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotLevel.String(),
currentAccount,
databaseName,
"",
objId,
)
if err != nil {
return err
}
snapshotId = newUUid.String()

// 2. get snapshot ts
// ts := ses.proc.TxnOperator.SnapshotTS()
// snapshotTs = ts.String()
case tree.SNAPSHOTLEVELTABLE:
objectName := string(stmt.Object.ObjName)
objects := strings.Split(objectName, ".")
if len(objects) != 2 {
return moerr.NewInternalError(ctx, fmt.Sprintf("invalid table name %s", objectName))
}
databaseName = objects[0]
tableName = objects[1]
if len(databaseName) > 0 && needSkipDb(databaseName) {
return moerr.NewInternalError(ctx, fmt.Sprintf("can not create pitr for current table %s.%s", databaseName, tableName))
}

getTableIdFunc := func(dbName, tblName string) (tblId uint64, rtnErr error) {
var erArray []ExecResult
sql, rtnErr = getSqlForCheckDatabaseTable(ctx, dbName, tblName)
if rtnErr != nil {
return 0, rtnErr
}
bh.ClearExecResultSet()
rtnErr = bh.Exec(ctx, sql)
if rtnErr != nil {
return 0, rtnErr
}

erArray, rtnErr = getResultSet(ctx, bh)
if rtnErr != nil {
return 0, rtnErr
}

sql, err = getSqlForCreateSnapshot(ctx, snapshotId, snapshotName, time.Now().UTC().UnixNano(), snapshotLevel.String(), string(stmt.Object.ObjName), databaseName, tableName, objId)
if execResultArrayHasData(erArray) {
for i := uint64(0); i < erArray[0].GetRowCount(); i++ {
tblId, rtnErr = erArray[0].GetUint64(ctx, i, 0)
if rtnErr != nil {
return 0, rtnErr
}
}
} else {
return 0, moerr.NewInternalErrorf(ctx, "table %s.%s does not exist", dbName, tblName)
}
return tblId, rtnErr
}
objId, err = getTableIdFunc(databaseName, tableName)
if err != nil {
return err
}

err = bh.Exec(ctx, sql)
sql, err = getSqlForCreateSnapshot(
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotLevel.String(),
currentAccount,
databaseName,
tableName,
objId,
)
if err != nil {
return err
}
}

getLogger(ses.GetService()).Info("create pitr", zap.String("sql", sql))
err = bh.Exec(ctx, sql)
if err != nil {
return err
}

getLogger(ses.GetService()).Info(fmt.Sprintf("create snapshot %s success", snapshotName))
return err
}

// insert record to the system table
func doCheckCreateSnapshotPriv(ctx context.Context, ses *Session, stmt *tree.CreateSnapShot) error {
var err error
snapshotLevel := stmt.Object.SLevel.Level
tenantInfo := ses.GetTenantInfo()
currentAccount := tenantInfo.GetTenant()

switch snapshotLevel {
case tree.SNAPSHOTLEVELCLUSTER:
if currentAccount != sysAccountName {
return moerr.NewInternalError(ctx, "only sys tenant can create cluster level snapshot")
}
case tree.SNAPSHOTLEVELACCOUNT:
snapshotForAccount := string(stmt.Object.ObjName)
if currentAccount != sysAccountName && currentAccount != snapshotForAccount {
return moerr.NewInternalError(ctx, "only sys tenant can create tenant level snapshot for other tenant")
}
}
return err
}

Expand Down
80 changes: 80 additions & 0 deletions test/distributed/cases/snapshot/snapshot_database_level.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
drop snapshot if exists sn1;
create snapshot sn1 for database db1;
internal error: database db1 does not exist
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
drop database if exists db1;
create database if not exists db1;
create snapshot sn2 for database db1;
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
sn2 2024-12-20 02:58:51.442498 database sys db1
drop database if exists db1;
drop snapshot if exists sn2;
drop snapshot if exists sn1;
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
drop account if exists acc01;
create account acc01 admin_name = 'test_account' identified by '111';
drop snapshot if exists sn1;
create snapshot sn1 for database db1;
internal error: database db1 does not exist
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
drop database if exists db1;
create database if not exists db1;
create snapshot sn2 for database db1;
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
sn2 2024-12-20 02:58:51.951431 database acc01 db1
drop database if exists db1;
drop snapshot if exists sn2;
drop snapshot if exists sn1;
drop account if exists acc1;
drop snapshot if exists sn1;
create snapshot sn1 for table db1 tbl1;
internal error: table db1.tbl1 does not exist
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
drop database if exists db1;
create database if not exists db1;
create table db1.tbl1 (a int);
insert into db1.tbl1 values (1), (2), (3);
create snapshot sn2 for table db1 tbl1;
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
sn2 2024-12-20 02:58:52.013035 table sys db1 tbl1
drop database if exists db1;
drop snapshot if exists sn2;
drop snapshot if exists sn1;
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
drop account if exists acc01;
create account acc01 admin_name = 'test_account' identified by '111';
drop snapshot if exists sn1;
create snapshot sn1 for table db1 tbl1;
internal error: table db1.tbl1 does not exist
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
drop database if exists db1;
create database if not exists db1;
create table db1.tbl1 (a int);
insert into db1.tbl1 values (1), (2), (3);
create snapshot sn2 for table db1 tbl1;
show snapshots;
SNAPSHOT_NAME TIMESTAMP SNAPSHOT_LEVEL ACCOUNT_NAME DATABASE_NAME TABLE_NAME
sn2 2024-12-20 02:58:52.689975 table acc01 db1 tbl1
drop database if exists db1;
drop snapshot if exists sn2;
drop snapshot if exists sn1;
drop account if exists acc1;
create snapshot sn1 for account sys;
create snapshot sn1 for account sys;
internal error: snapshot sn1 already exists
create snapshot if not exists sn1 for account sys;
drop snapshot if exists sn1;
create snapshot sn1 for database mo_catalog;
internal error: can not create snapshot for current database mo_catalog
create snapshot sn1 for table mo_catalog mo_user;
internal error: can not create pitr for current table mo_catalog.mo_user
drop snapshot if exists sn1;
Loading

0 comments on commit b4cb363

Please sign in to comment.