Skip to content

Commit

Permalink
Merge branch 'main' into remove-unuse-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 20, 2024
2 parents 09a3dac + b4cb363 commit 60e1668
Show file tree
Hide file tree
Showing 24 changed files with 3,689 additions and 1,122 deletions.
6 changes: 3 additions & 3 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/google/uuid"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/clusterservice"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -71,8 +74,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/route"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func createDropDatabaseErrorInfo() string {
Expand Down Expand Up @@ -1033,7 +1034,6 @@ func doExplainStmt(reqCtx context.Context, ses *Session, stmt *tree.ExplainStmt)
if err != nil {
return err
}
es.CmpContext = ses.GetTxnCompileCtx()

//get query optimizer and execute Optimize
exPlan, err := buildPlan(reqCtx, ses, ses.GetTxnCompileCtx(), stmt.Statement)
Expand Down
216 changes: 185 additions & 31 deletions pkg/frontend/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package frontend
import (
"context"
"fmt"
"math"
"slices"
"strings"
"time"

"github.com/google/uuid"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -188,21 +190,56 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
return err
}

// 2.only sys can create cluster level snapshot
tenantInfo := ses.GetTenantInfo()
currentAccount := tenantInfo.GetTenant()
snapshotLevel = stmt.Object.SLevel.Level
if snapshotLevel == tree.SNAPSHOTLEVELCLUSTER && currentAccount != sysAccountName {
return moerr.NewInternalError(ctx, "only sys tenant can create cluster level snapshot")

// 1.check create snapshot priv
err = doCheckCreateSnapshotPriv(ctx, ses, stmt)
if err != nil {
return err
}

// 3.only sys can create tenant level snapshot for other tenant
if snapshotLevel == tree.SNAPSHOTLEVELACCOUNT {
snapshotForAccount = string(stmt.Object.ObjName)
if currentAccount != sysAccountName && currentAccount != snapshotForAccount {
return moerr.NewInternalError(ctx, "only sys tenant can create tenant level snapshot for other tenant")
// 2. check snapshot exists or not
snapshotName = string(stmt.Name)
snapshotExist, err = checkSnapShotExistOrNot(ctx, bh, snapshotName)
if err != nil {
return err
}
if snapshotExist {
if !stmt.IfNotExists {
return moerr.NewInternalErrorf(ctx, "snapshot %s already exists", snapshotName)
} else {
return nil
}
}

// 3.1 generate snapshot id
newUUid, err := uuid.NewV7()
if err != nil {
return err
}
snapshotId = newUUid.String()

// 3. get database name , table name and objId according to the snapshot level
switch snapshotLevel {
case tree.SNAPSHOTLEVELCLUSTER:
sql, err = getSqlForCreateSnapshot(
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotLevel.String(),
"",
"",
"",
math.MaxUint64,
)
if err != nil {
return err
}
case tree.SNAPSHOTLEVELACCOUNT:
snapshotForAccount = string(stmt.Object.ObjName)
// check account exists or not and get accountId
getAccountIdFunc := func(accountName string) (accountId uint64, rtnErr error) {
var erArray []ExecResult
Expand Down Expand Up @@ -244,48 +281,165 @@ func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapSh
} else {
objId = uint64(tenantInfo.GetTenantID())
}
}

// check snapshot exists or not
snapshotName = string(stmt.Name)
snapshotExist, err = checkSnapShotExistOrNot(ctx, bh, snapshotName)
if err != nil {
return err
}
if snapshotExist {
if !stmt.IfNotExists {
return moerr.NewInternalErrorf(ctx, "snapshot %s already exists", snapshotName)
} else {
return nil
sql, err = getSqlForCreateSnapshot(
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotLevel.String(),
snapshotForAccount,
"",
"",
objId,
)
if err != nil {
return err
}
} else {
// insert record to the system table
case tree.SNAPSHOTLEVELDATABASE:
databaseName = string(stmt.Object.ObjName)
if len(databaseName) > 0 && needSkipDb(databaseName) {
return moerr.NewInternalError(ctx, fmt.Sprintf("can not create snapshot for current database %s", databaseName))
}

getDatabaseIdFunc := func(dbName string) (dbId uint64, rtnErr error) {
var erArray []ExecResult
sql, rtnErr = getSqlForCheckDatabase(ctx, dbName)
if rtnErr != nil {
return 0, rtnErr
}
bh.ClearExecResultSet()
rtnErr = bh.Exec(ctx, sql)
if rtnErr != nil {
return 0, rtnErr
}

erArray, rtnErr = getResultSet(ctx, bh)
if rtnErr != nil {
return 0, rtnErr
}

// 1. get snapshot id
newUUid, err := uuid.NewV7()
if execResultArrayHasData(erArray) {
for i := uint64(0); i < erArray[0].GetRowCount(); i++ {
dbId, rtnErr = erArray[0].GetUint64(ctx, i, 0)
if rtnErr != nil {
return 0, rtnErr
}
}
} else {
return 0, moerr.NewInternalErrorf(ctx, "database %s does not exist", dbName)
}
return dbId, rtnErr
}
objId, err = getDatabaseIdFunc(databaseName)
if err != nil {
return err
}

sql, err = getSqlForCreateSnapshot(
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotLevel.String(),
currentAccount,
databaseName,
"",
objId,
)
if err != nil {
return err
}
snapshotId = newUUid.String()

// 2. get snapshot ts
// ts := ses.proc.TxnOperator.SnapshotTS()
// snapshotTs = ts.String()
case tree.SNAPSHOTLEVELTABLE:
objectName := string(stmt.Object.ObjName)
objects := strings.Split(objectName, ".")
if len(objects) != 2 {
return moerr.NewInternalError(ctx, fmt.Sprintf("invalid table name %s", objectName))
}
databaseName = objects[0]
tableName = objects[1]
if len(databaseName) > 0 && needSkipDb(databaseName) {
return moerr.NewInternalError(ctx, fmt.Sprintf("can not create pitr for current table %s.%s", databaseName, tableName))
}

getTableIdFunc := func(dbName, tblName string) (tblId uint64, rtnErr error) {
var erArray []ExecResult
sql, rtnErr = getSqlForCheckDatabaseTable(ctx, dbName, tblName)
if rtnErr != nil {
return 0, rtnErr
}
bh.ClearExecResultSet()
rtnErr = bh.Exec(ctx, sql)
if rtnErr != nil {
return 0, rtnErr
}

erArray, rtnErr = getResultSet(ctx, bh)
if rtnErr != nil {
return 0, rtnErr
}

sql, err = getSqlForCreateSnapshot(ctx, snapshotId, snapshotName, time.Now().UTC().UnixNano(), snapshotLevel.String(), string(stmt.Object.ObjName), databaseName, tableName, objId)
if execResultArrayHasData(erArray) {
for i := uint64(0); i < erArray[0].GetRowCount(); i++ {
tblId, rtnErr = erArray[0].GetUint64(ctx, i, 0)
if rtnErr != nil {
return 0, rtnErr
}
}
} else {
return 0, moerr.NewInternalErrorf(ctx, "table %s.%s does not exist", dbName, tblName)
}
return tblId, rtnErr
}
objId, err = getTableIdFunc(databaseName, tableName)
if err != nil {
return err
}

err = bh.Exec(ctx, sql)
sql, err = getSqlForCreateSnapshot(
ctx,
snapshotId,
snapshotName,
time.Now().UTC().UnixNano(),
snapshotLevel.String(),
currentAccount,
databaseName,
tableName,
objId,
)
if err != nil {
return err
}
}

getLogger(ses.GetService()).Info("create pitr", zap.String("sql", sql))
err = bh.Exec(ctx, sql)
if err != nil {
return err
}

getLogger(ses.GetService()).Info(fmt.Sprintf("create snapshot %s success", snapshotName))
return err
}

// insert record to the system table
func doCheckCreateSnapshotPriv(ctx context.Context, ses *Session, stmt *tree.CreateSnapShot) error {
var err error
snapshotLevel := stmt.Object.SLevel.Level
tenantInfo := ses.GetTenantInfo()
currentAccount := tenantInfo.GetTenant()

switch snapshotLevel {
case tree.SNAPSHOTLEVELCLUSTER:
if currentAccount != sysAccountName {
return moerr.NewInternalError(ctx, "only sys tenant can create cluster level snapshot")
}
case tree.SNAPSHOTLEVELACCOUNT:
snapshotForAccount := string(stmt.Object.ObjName)
if currentAccount != sysAccountName && currentAccount != snapshotForAccount {
return moerr.NewInternalError(ctx, "only sys tenant can create tenant level snapshot for other tenant")
}
}
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ var gSysVarsDefs = map[string]SystemVariable{
Scope: ScopeBoth,
Dynamic: true,
SetVarHintApplies: false,
Type: InitSystemVariableIntType("max_allowed_packet", 1024, 67108864, false),
Type: InitSystemVariableIntType("max_allowed_packet", 1024, 1073741824, false),
Default: int64(67108864),
},
"version_comment": {
Expand Down
Loading

0 comments on commit 60e1668

Please sign in to comment.