Skip to content

Commit

Permalink
fix: init catalog properly
Browse files Browse the repository at this point in the history
  • Loading branch information
NoyException committed Dec 24, 2024
1 parent 32e8806 commit 3997d3a
Showing 1 changed file with 41 additions and 32 deletions.
73 changes: 41 additions & 32 deletions catalog/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,6 @@ func NewDBProvider(defaultTimeZone, dataDir, defaultDB string) (prov *DatabasePr
prov.storage = stdsql.OpenDB(prov.connector)
prov.pool = NewConnectionPool(prov.catalogName, prov.connector, prov.storage)

err = prov.initCatalog()
if err != nil {
return nil, err
}

prov.ready = true
return prov, nil
}

func (prov *DatabaseProvider) initCatalog() error {
bootQueries := []string{
"INSTALL arrow",
"LOAD arrow",
Expand All @@ -97,10 +87,21 @@ func (prov *DatabaseProvider) initCatalog() error {
if _, err := prov.storage.ExecContext(context.Background(), q); err != nil {
prov.storage.Close()
prov.connector.Close()
return fmt.Errorf("failed to execute boot query %q: %w", q, err)
return nil, fmt.Errorf("failed to execute boot query %q: %w", q, err)
}
}

err = prov.initCatalog()
if err != nil {
return nil, err
}

prov.ready = true
return prov, nil
}

func (prov *DatabaseProvider) initCatalog() error {

for _, t := range internalSchemas {
if _, err := prov.storage.ExecContext(
context.Background(),
Expand Down Expand Up @@ -158,7 +159,7 @@ func (prov *DatabaseProvider) IsReady() bool {
return prov.ready
}

func (prov *DatabaseProvider) ExistCatalog(name string) bool {
func (prov *DatabaseProvider) ExistsCatalog(name string) bool {
name = strings.TrimSpace(name)
// in memory database does not need to be created
if name == "" || name == "memory" {
Expand All @@ -178,38 +179,46 @@ func (prov *DatabaseProvider) CreateCatalog(name string, ifNotExists bool) error
return nil
}
dsn := filepath.Join(prov.dataDir, name+".db")

_, err := os.Stat(dsn)
shouldInit := os.IsNotExist(err)

// attach
attachSQL := "ATTACH"
if ifNotExists {
attachSQL += " IF NOT EXISTS"
}
attachSQL += " '" + dsn + "' AS " + name
res, err := prov.storage.ExecContext(context.Background(), attachSQL)
if err != nil {
return err
}
rows, err := res.RowsAffected()
_, err = prov.storage.ExecContext(context.Background(), attachSQL)
if err != nil {
logrus.Errorf("Failed to get rows affected: %v", err)
return err
}
if rows <= 0 {
return nil
}

// if newly created, initialize the catalog
if _, err := prov.storage.ExecContext(context.Background(), "USE "+name); err != nil {
return err
}
defer func() {
if _, err := prov.storage.ExecContext(context.Background(), "USE "+prov.catalogName); err != nil {
logrus.WithError(err).Errorln("Failed to switch back to the old database")
if shouldInit {
res, err := prov.storage.QueryContext(context.Background(), "SELECT current_catalog")
if err != nil {
return fmt.Errorf("failed to init catalog: %w", err)
}
lastCatalog := ""
for res.Next() {
if err := res.Scan(&lastCatalog); err != nil {
return fmt.Errorf("failed to init catalog: %w", err)
}
}
}()

err = prov.initCatalog()
if err != nil {
return err
if _, err := prov.storage.ExecContext(context.Background(), "USE "+name); err != nil {
return fmt.Errorf("failed to switch to the new catalog: %w", err)
}

defer func() {
if _, err := prov.storage.ExecContext(context.Background(), "USE "+lastCatalog); err != nil {
logrus.WithError(err).Errorln("Failed to switch back to the old catalog")
}
}()
err = prov.initCatalog()
if err != nil {
return err
}
}
return nil
}
Expand Down

0 comments on commit 3997d3a

Please sign in to comment.