Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/gc temporary table #1204

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
8 changes: 5 additions & 3 deletions internal/database/dbutil/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package dbutil

import (
"fmt"
"strings"

"github.com/go-sql-driver/mysql"
"github.com/jackc/pgerrcode"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/oom-ai/oomstore/pkg/errdefs"
"github.com/oom-ai/oomstore/pkg/oomstore/types"
"github.com/snowflakedb/gosnowflake"
"google.golang.org/api/googleapi"
"modernc.org/sqlite"
sqlite3 "modernc.org/sqlite/lib"

"github.com/oom-ai/oomstore/pkg/errdefs"
"github.com/oom-ai/oomstore/pkg/oomstore/types"
)

const (
Expand Down Expand Up @@ -72,7 +74,7 @@ func IsTableNotFoundError(err error, backend types.BackendType) (bool, error) {
}
case types.BackendSnowflake:
if e2, ok := err.(*gosnowflake.SnowflakeError); ok {
return e2.Number == gosnowflake.ErrObjectNotExistOrAuthorized, nil
return strings.Contains(e2.Error(), "does not exist or not authorized"), nil
}
// https://cloud.google.com/bigquery/docs/error-messages
case types.BackendBigQuery:
Expand Down
25 changes: 25 additions & 0 deletions internal/database/dbutil/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,31 @@ func QuoteFn(backendType types.BackendType) func(...string) string {
}
}

func UnQuoteFn(backendType types.BackendType) func(string) string {
var q byte
switch backendType {
case types.BackendPostgres, types.BackendSnowflake, types.BackendRedshift, types.BackendCassandra, types.BackendSQLite:
q = '"'
case types.BackendMySQL, types.BackendBigQuery:
q = '`'
default:
panic(fmt.Sprintf("unsupported backend type %s", backendType))
}

return func(s string) string {
if s == "" {
return s
}
if s[0] == q && s[len(s)-1] == q {
if s == string(q) {
return s
}
return s[1 : len(s)-1]
}
return s
}
}

func DropTable(ctx context.Context, dbOpt DBOpt, tableName string) error {
query := fmt.Sprintf(`DROP TABLE IF EXISTS %s;`, tableName)
err := dbOpt.ExecContext(ctx, query)
Expand Down
29 changes: 3 additions & 26 deletions internal/database/offline/bigquery/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bigquery

import (
"context"
"fmt"
"strings"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -43,13 +42,9 @@ func bigqueryQueryResults(ctx context.Context, dbOpt dbutil.DBOpt, query string,
data := make(chan types.JoinRecord)
go func() {
defer func() {
if err = dropTemporaryTables(ctx, dbOpt.BigQueryDB, dropTableNames); err != nil {
select {
case data <- types.JoinRecord{Error: err}:
// nothing to do
default:
}
}
// The logic of the temporary table should not affect the main process, so nil is returned here.
// TODO: Print log in the cloud service version of oomstore
_ = sqlutil.DropTemporaryTables(ctx, dbOpt, dropTableNames)
close(data)
}()

Expand Down Expand Up @@ -99,24 +94,6 @@ func bigqueryQueryResults(ctx context.Context, dbOpt dbutil.DBOpt, query string,
}, nil
}

func dropTemporaryTables(ctx context.Context, db *bigquery.Client, tableNames []string) error {
var err error
for _, tableName := range tableNames {
if tmpErr := dropTable(ctx, db, tableName); tmpErr != nil {
err = tmpErr
}
}
return err
}

func dropTable(ctx context.Context, db *bigquery.Client, tableName string) error {
query := fmt.Sprintf(`DROP TABLE IF EXISTS %s;`, tableName)
if _, err := db.Query(query).Read(ctx); err != nil {
return errdefs.WithStack(err)
}
return nil
}

const READ_JOIN_RESULT_QUERY = `
SELECT
{{ qt .EntityRowsTableName }}.{{ qt .EntityKey }},
Expand Down
39 changes: 38 additions & 1 deletion internal/database/offline/bigquery/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package bigquery

import (
"context"
"fmt"

"cloud.google.com/go/bigquery"
"github.com/oom-ai/oomstore/pkg/errdefs"
"github.com/spf13/cast"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

"github.com/oom-ai/oomstore/internal/database/dbutil"
"github.com/oom-ai/oomstore/internal/database/offline"
"github.com/oom-ai/oomstore/internal/database/offline/sqlutil"
"github.com/oom-ai/oomstore/pkg/errdefs"
"github.com/oom-ai/oomstore/pkg/oomstore/types"
)

Expand Down Expand Up @@ -62,3 +65,37 @@ func (db *DB) Push(ctx context.Context, opt offline.PushOpt) error {
}
return nil
}

func (db *DB) DropTemporaryTable(ctx context.Context, tableNames []string) error {
dbOpt := dbutil.DBOpt{Backend: Backend, BigQueryDB: db.Client, DatasetID: &db.datasetID}
return sqlutil.DropTemporaryTables(ctx, dbOpt, tableNames)
}

func (db *DB) GetTemporaryTables(ctx context.Context, unixMilli int64) ([]string, error) {
qt := dbutil.QuoteFn(Backend)
query := fmt.Sprintf("SELECT table_name FROM %s.%s WHERE create_time < %s",
db.datasetID, qt(offline.TemporaryTableRecordTable), cast.ToString(unixMilli))

rows, err := db.Query(query).Read(ctx)
if err != nil {
tableNotFound, notFoundErr := dbutil.IsTableNotFoundError(err, Backend)
if notFoundErr != nil {
return nil, notFoundErr
}
if tableNotFound {
return nil, nil
}
return nil, err
}

var tableNames []string
for {
recordMap := make(map[string]bigquery.Value)
err = rows.Next(&recordMap)
if err == iterator.Done {
break
}
tableNames = append(tableNames, recordMap["table_name"].(string))
}
return tableNames, nil
}
29 changes: 29 additions & 0 deletions internal/database/offline/mock_offline/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions internal/database/offline/mysql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,12 @@ func (db *DB) Push(ctx context.Context, opt offline.PushOpt) error {
}
return nil
}

func (db *DB) DropTemporaryTable(ctx context.Context, tableNames []string) error {
dbOpt := dbutil.DBOpt{Backend: Backend, SqlxDB: db.DB}
return sqlutil.DropTemporaryTables(ctx, dbOpt, tableNames)
}

func (db *DB) GetTemporaryTables(ctx context.Context, unixMilli int64) ([]string, error) {
return sqlutil.GetTemporaryTables(ctx, db.DB, Backend, unixMilli)
}
9 changes: 9 additions & 0 deletions internal/database/offline/postgres/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,12 @@ func (db *DB) Push(ctx context.Context, opt offline.PushOpt) error {
}
return nil
}

func (db *DB) DropTemporaryTable(ctx context.Context, tableNames []string) error {
dbOpt := dbutil.DBOpt{Backend: Backend, SqlxDB: db.DB}
return sqlutil.DropTemporaryTables(ctx, dbOpt, tableNames)
}

func (db *DB) GetTemporaryTables(ctx context.Context, unixMilli int64) ([]string, error) {
return sqlutil.GetTemporaryTables(ctx, db.DB, Backend, unixMilli)
}
9 changes: 9 additions & 0 deletions internal/database/offline/redshift/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,12 @@ func (db *DB) Push(ctx context.Context, opt offline.PushOpt) error {
}
return nil
}

func (db *DB) DropTemporaryTable(ctx context.Context, tableNames []string) error {
dbOpt := dbutil.DBOpt{Backend: Backend, SqlxDB: db.DB}
return sqlutil.DropTemporaryTables(ctx, dbOpt, tableNames)
}

func (db *DB) GetTemporaryTables(ctx context.Context, unixMilli int64) ([]string, error) {
return sqlutil.GetTemporaryTables(ctx, db.DB, Backend, unixMilli)
}
9 changes: 9 additions & 0 deletions internal/database/offline/snowflake/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,12 @@ func (db *DB) Push(ctx context.Context, opt offline.PushOpt) error {
}
return nil
}

func (db *DB) DropTemporaryTable(ctx context.Context, tableNames []string) error {
dbOpt := dbutil.DBOpt{Backend: Backend, SqlxDB: db.DB}
return sqlutil.DropTemporaryTables(ctx, dbOpt, tableNames)
}

func (db *DB) GetTemporaryTables(ctx context.Context, unixMilli int64) ([]string, error) {
return sqlutil.GetTemporaryTables(ctx, db.DB, Backend, unixMilli)
}
9 changes: 9 additions & 0 deletions internal/database/offline/sqlite/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,12 @@ func (db *DB) Push(ctx context.Context, opt offline.PushOpt) error {
}
return nil
}

func (db *DB) DropTemporaryTable(ctx context.Context, tableNames []string) error {
dbOpt := dbutil.DBOpt{Backend: Backend, SqlxDB: db.DB}
return sqlutil.DropTemporaryTables(ctx, dbOpt, tableNames)
}

func (db *DB) GetTemporaryTables(ctx context.Context, unixMilli int64) ([]string, error) {
return sqlutil.GetTemporaryTables(ctx, db.DB, Backend, unixMilli)
}
5 changes: 5 additions & 0 deletions internal/database/offline/sqlutil/export_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func buildExportQuery(params exportQueryParams) (string, error) {
func prepareEntityTable(ctx context.Context, dbOpt dbutil.DBOpt, opt offline.ExportOpt, snapshotTables, cdcTables []string) (string, error) {
// Step 1: create table export_entity
tableName := dbutil.TempTable("export_entity")

// The logic of the temporary table should not affect the main process, so nil is returned here.
// TODO: Print log in the cloud service version of oomstore
_ = AddTemporaryTableRecord(ctx, dbOpt, buildTableName(dbOpt, tableName))

qtTableName, columnDefs, err := prepareTableSchema(dbOpt, prepareTableSchemaParams{
tableName: tableName,
entityName: opt.EntityName,
Expand Down
18 changes: 9 additions & 9 deletions internal/database/offline/sqlutil/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,12 @@ func readJoinedTable(ctx context.Context, dbOpt dbutil.DBOpt, opt readJoinedTabl

dropTableNames := []string{buildTableName(dbOpt, opt.EntityRowsTableName)}
for _, tableName := range opt.AllTableNames {
dropTableNames = append(dropTableNames, buildTableName(dbOpt, tableName))
dropTableName := buildTableName(dbOpt, tableName)
dropTableNames = append(dropTableNames, dropTableName)

// The logic of the temporary table should not affect the main process, so nil is returned here.
// TODO: Print log in the cloud service version of oomstore
_ = AddTemporaryTableRecord(ctx, dbOpt, dropTableName)
}

// Step 2: read joined results
Expand All @@ -308,14 +313,9 @@ func sqlxQueryResults(ctx context.Context, dbOpt dbutil.DBOpt, query string, hea
data := make(chan types.JoinRecord)
go func() {
defer func() {
if err := dropTemporaryTables(ctx, dbOpt.SqlxDB, dropTableNames); err != nil {
select {
case data <- types.JoinRecord{Error: err}:
// nothing to do
default:
}
}

// The logic of the temporary table should not affect the main process, so nil is returned here.
// TODO: Print log in the cloud service version of oomstore
_ = DropTemporaryTables(ctx, dbOpt, dropTableNames)
rows.Close()
close(data)
}()
Expand Down
22 changes: 5 additions & 17 deletions internal/database/offline/sqlutil/join_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"text/template"

"github.com/jmoiron/sqlx"
"github.com/oom-ai/oomstore/pkg/errdefs"

"github.com/oom-ai/oomstore/internal/database/dbutil"
Expand Down Expand Up @@ -127,6 +126,11 @@ func prepareEntityRowsTable(ctx context.Context,
) (string, error) {
// Step 1: create table entity_rows
tableName := dbutil.TempTable("entity_rows")

// The logic of the temporary table should not affect the main process, so nil is returned here.
// TODO: Print log in the cloud service version of oomstore
_ = AddTemporaryTableRecord(ctx, dbOpt, buildTableName(dbOpt, tableName))

qtTableName, columnDefs, err := prepareTableSchema(dbOpt, prepareTableSchemaParams{
tableName: tableName,
entityName: "entity_key",
Expand Down Expand Up @@ -248,22 +252,6 @@ func insertEntityRows(ctx context.Context,
return dbutil.InsertRecordsToTable(ctx, dbOpt, tableName, records, columns)
}

func dropTemporaryTables(ctx context.Context, db *sqlx.DB, tableNames []string) error {
var err error
for _, tableName := range tableNames {
if tmpErr := dropTable(ctx, db, tableName); tmpErr != nil {
err = tmpErr
}
}
return err
}

func dropTable(ctx context.Context, db *sqlx.DB, tableName string) error {
query := fmt.Sprintf(`DROP TABLE IF EXISTS %s;`, tableName)
_, err := db.ExecContext(ctx, query)
return errdefs.WithStack(err)
}

func supportIndex(backendType types.BackendType) bool {
for _, b := range []types.BackendType{types.BackendSnowflake, types.BackendRedshift, types.BackendBigQuery} {
if b == backendType {
Expand Down
Loading