Skip to content

Commit

Permalink
fix: resolve compatibilities for Metabase (#340) (#352)
Browse files Browse the repository at this point in the history
* fix: solve issue of handling extended query (#342)

* fix: add internal view pg_stat_user_tables and add query rewritten for type casting

* create internal macros to mimic some pg system functions

* wip: add pg_catalog.pg_get_indexdef

* fix: use a better regex pattern to replace the sys function names

* fix: make 'SET SESSION CHARACTERISTICS TRANSACTION ...' work and use session level pgtypes.Map for each session to encode results

* fix: cast DuckDB HUGEINT to pgtype.Numeric

* fix: adopt CR feedbacks

* fix: add tests and resolve failed tests
  • Loading branch information
VWagen1989 authored Jan 9, 2025
1 parent 4145be1 commit 34b0213
Show file tree
Hide file tree
Showing 13 changed files with 935 additions and 161 deletions.
58 changes: 58 additions & 0 deletions catalog/internal_macro.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package catalog

import "strings"

type MacroDefinition struct {
Params []string
DDL string
}

type InternalMacro struct {
Schema string
Name string
IsTableMacro bool
// A macro can be overloaded with multiple definitions, each with a different set of parameters.
// https://duckdb.org/docs/sql/statements/create_macro.html#overloading
Definitions []MacroDefinition
}

func (v *InternalMacro) QualifiedName() string {
if strings.ToLower(v.Schema) == "pg_catalog" {
return "__sys__." + v.Name
}
return v.Schema + "." + v.Name
}

var InternalMacros = []InternalMacro{
{
Schema: "information_schema",
Name: "_pg_expandarray",
IsTableMacro: true,
Definitions: []MacroDefinition{
{
Params: []string{"a"},
DDL: `SELECT STRUCT_PACK(
x := unnest(a),
n := generate_series(1, array_length(a))
) AS item`,
},
},
},
{
Schema: "pg_catalog",
Name: "pg_get_indexdef",
IsTableMacro: false,
Definitions: []MacroDefinition{
{
Params: []string{"index_oid"},
// Do nothing currently
DDL: `''`,
},
{
Params: []string{"index_oid", "column_no", "pretty_bool"},
// Do nothing currently
DDL: `''`,
},
},
},
}
48 changes: 48 additions & 0 deletions catalog/internal_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ var InternalTables = struct {
PGProc InternalTable
PGClass InternalTable
PGNamespace InternalTable
PGMatViews InternalTable
}{
PersistentVariable: InternalTable{
Schema: "__sys__",
Expand Down Expand Up @@ -608,6 +609,52 @@ var InternalTables = struct {
"nspacl TEXT",
InitialData: InitialDataTables.PGNamespace,
},
// View "pg_catalog.pg_matviews"
// postgres=# \d+ pg_catalog.pg_matviews
// View "pg_catalog.pg_matviews"
// Column | Type | Collation | Nullable | Default | Storage | Description
//--------------+---------+-----------+----------+---------+----------+-------------
// schemaname | name | | | | plain |
// matviewname | name | | | | plain |
// matviewowner | name | | | | plain |
// tablespace | name | | | | plain |
// hasindexes | boolean | | | | plain |
// ispopulated | boolean | | | | plain |
// definition | text | | | | extended |
//View definition:
// SELECT n.nspname AS schemaname,
// c.relname AS matviewname,
// pg_get_userbyid(c.relowner) AS matviewowner,
// t.spcname AS tablespace,
// c.relhasindex AS hasindexes,
// c.relispopulated AS ispopulated,
// pg_get_viewdef(c.oid) AS definition
// FROM pg_class c
// LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
// LEFT JOIN pg_tablespace t ON t.oid = c.reltablespace
// WHERE c.relkind = 'm'::"char";
PGMatViews: InternalTable{
Schema: "__sys__",
Name: "pg_matviews",
KeyColumns: []string{
"schemaname",
"matviewname",
},
ValueColumns: []string{
"matviewowner",
"tablespace",
"hasindexes",
"ispopulated",
"definition",
},
DDL: "schemaname VARCHAR NOT NULL, " +
"matviewname VARCHAR NOT NULL, " +
"matviewowner VARCHAR, " +
"tablespace VARCHAR, " +
"hasindexes BOOLEAN, " +
"ispopulated BOOLEAN, " +
"definition TEXT",
},
}

var internalTables = []InternalTable{
Expand All @@ -621,6 +668,7 @@ var internalTables = []InternalTable{
InternalTables.PGProc,
InternalTables.PGClass,
InternalTables.PGNamespace,
InternalTables.PGMatViews,
}

func GetInternalTables() []InternalTable {
Expand Down
88 changes: 88 additions & 0 deletions catalog/internal_views.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package catalog

type InternalView struct {
Schema string
Name string
DDL string
}

func (v *InternalView) QualifiedName() string {
return v.Schema + "." + v.Name
}

var InternalViews = []InternalView{
{
Schema: "__sys__",
Name: "pg_stat_user_tables",
DDL: `SELECT
t.table_schema || '.' || t.table_name AS relid, -- Create a unique ID for the table
t.table_schema AS schemaname, -- Schema name
t.table_name AS relname, -- Table name
0 AS seq_scan, -- Default to 0 (DuckDB doesn't track this)
NULL AS last_seq_scan, -- Placeholder (DuckDB doesn't track this)
0 AS seq_tup_read, -- Default to 0
0 AS idx_scan, -- Default to 0
NULL AS last_idx_scan, -- Placeholder
0 AS idx_tup_fetch, -- Default to 0
0 AS n_tup_ins, -- Default to 0 (inserted tuples not tracked)
0 AS n_tup_upd, -- Default to 0 (updated tuples not tracked)
0 AS n_tup_del, -- Default to 0 (deleted tuples not tracked)
0 AS n_tup_hot_upd, -- Default to 0 (HOT updates not tracked)
0 AS n_tup_newpage_upd, -- Default to 0 (new page updates not tracked)
0 AS n_live_tup, -- Default to 0 (live tuples not tracked)
0 AS n_dead_tup, -- Default to 0 (dead tuples not tracked)
0 AS n_mod_since_analyze, -- Default to 0
0 AS n_ins_since_vacuum, -- Default to 0
NULL AS last_vacuum, -- Placeholder
NULL AS last_autovacuum, -- Placeholder
NULL AS last_analyze, -- Placeholder
NULL AS last_autoanalyze, -- Placeholder
0 AS vacuum_count, -- Default to 0
0 AS autovacuum_count, -- Default to 0
0 AS analyze_count, -- Default to 0
0 AS autoanalyze_count -- Default to 0
FROM
information_schema.tables t
WHERE
t.table_type = 'BASE TABLE'; -- Include only base tables (not views)`,
},
{
Schema: "__sys__",
Name: "pg_index",
DDL: `SELECT
ROW_NUMBER() OVER () AS indexrelid, -- Simulated unique ID for the index
t.table_oid AS indrelid, -- OID of the table
COUNT(k.column_name) AS indnatts, -- Number of columns included in the index
COUNT(k.column_name) AS indnkeyatts, -- Number of key columns in the index (same as indnatts here)
CASE
WHEN c.constraint_type = 'UNIQUE' THEN TRUE
ELSE FALSE
END AS indisunique, -- Indicates if the index is unique
CASE
WHEN c.constraint_type = 'PRIMARY KEY' THEN TRUE
ELSE FALSE
END AS indisprimary, -- Indicates if the index is a primary key
ARRAY_AGG(k.ordinal_position ORDER BY k.ordinal_position) AS indkey, -- Array of column positions
ARRAY[]::BIGINT[] AS indcollation, -- DuckDB does not support collation, set to default
ARRAY[]::BIGINT[] AS indclass, -- DuckDB does not support index class, set to default
ARRAY[]::INTEGER[] AS indoption, -- DuckDB does not support index options, set to default
NULL AS indexprs, -- DuckDB does not support expression indexes, set to NULL
NULL AS indpred -- DuckDB does not support partial indexes, set to NULL
FROM
information_schema.key_column_usage k
JOIN
information_schema.table_constraints c
ON k.constraint_name = c.constraint_name
AND k.table_name = c.table_name
JOIN
duckdb_tables() t
ON k.table_name = t.table_name
AND k.table_schema = t.schema_name
WHERE
c.constraint_type IN ('PRIMARY KEY', 'UNIQUE') -- Only select primary key and unique constraints
GROUP BY
t.table_oid, c.constraint_type, c.constraint_name
ORDER BY
t.table_oid;`,
},
}
52 changes: 44 additions & 8 deletions catalog/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func NewDBProvider(defaultTimeZone, dataDir, defaultDB string) (prov *DatabasePr
dataDir: dataDir,
}

shouldInit := true
if defaultDB == "" || defaultDB == "memory" {
prov.defaultCatalogName = "memory"
prov.dbFile = ""
Expand All @@ -66,8 +65,6 @@ func NewDBProvider(defaultTimeZone, dataDir, defaultDB string) (prov *DatabasePr
prov.defaultCatalogName = defaultDB
prov.dbFile = defaultDB + ".db"
prov.dsn = filepath.Join(prov.dataDir, prov.dbFile)
_, err = os.Stat(prov.dsn)
shouldInit = os.IsNotExist(err)
}

prov.connector, err = duckdb.NewConnector(prov.dsn, nil)
Expand All @@ -94,11 +91,9 @@ func NewDBProvider(defaultTimeZone, dataDir, defaultDB string) (prov *DatabasePr
}
}

if shouldInit {
err = prov.initCatalog()
if err != nil {
return nil, err
}
err = prov.initCatalog()
if err != nil {
return nil, err
}

err = prov.attachCatalogs()
Expand Down Expand Up @@ -182,6 +177,47 @@ func (prov *DatabaseProvider) initCatalog() error {
}
}

for _, v := range InternalViews {
if _, err := prov.storage.ExecContext(
context.Background(),
"CREATE SCHEMA IF NOT EXISTS "+v.Schema,
); err != nil {
return fmt.Errorf("failed to create internal schema %q: %w", v.Schema, err)
}
if _, err := prov.storage.ExecContext(
context.Background(),
"CREATE VIEW IF NOT EXISTS "+v.QualifiedName()+" AS "+v.DDL,
); err != nil {
return fmt.Errorf("failed to create internal view %q: %w", v.Name, err)
}
}

for _, m := range InternalMacros {
if _, err := prov.storage.ExecContext(
context.Background(),
"CREATE SCHEMA IF NOT EXISTS "+m.Schema,
); err != nil {
return fmt.Errorf("failed to create internal schema %q: %w", m.Schema, err)
}
definitions := make([]string, 0, len(m.Definitions))
for _, d := range m.Definitions {
macroParams := strings.Join(d.Params, ", ")
var asType string
if m.IsTableMacro {
asType = "TABLE\n"
} else {
asType = "\n"
}
definitions = append(definitions, fmt.Sprintf("\n(%s) AS %s%s", macroParams, asType, d.DDL))
}
if _, err := prov.storage.ExecContext(
context.Background(),
"CREATE OR REPLACE MACRO "+m.QualifiedName()+strings.Join(definitions, ",")+";",
); err != nil {
return fmt.Errorf("failed to create internal macro %q: %w", m.Name, err)
}
}

if _, err := prov.pool.ExecContext(context.Background(), "PRAGMA enable_checkpoint_on_shutdown"); err != nil {
logrus.WithError(err).Fatalln("Failed to enable checkpoint on shutdown")
}
Expand Down
15 changes: 15 additions & 0 deletions pgserver/connection_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,26 @@ type ConvertedStatement struct {
Tag string
PgParsable bool
HasSentRowDesc bool
IsExtendedQuery bool
SubscriptionConfig *SubscriptionConfig
BackupConfig *BackupConfig
RestoreConfig *RestoreConfig
}

func (cs ConvertedStatement) WithQueryString(queryString string) ConvertedStatement {
return ConvertedStatement{
String: queryString,
AST: cs.AST,
Tag: cs.Tag,
PgParsable: cs.PgParsable,
HasSentRowDesc: cs.HasSentRowDesc,
IsExtendedQuery: cs.IsExtendedQuery,
SubscriptionConfig: cs.SubscriptionConfig,
BackupConfig: cs.BackupConfig,
RestoreConfig: cs.RestoreConfig,
}
}

// copyFromStdinState tracks the metadata for an import of data into a table using a COPY FROM STDIN statement. When
// this statement is processed, the server accepts COPY DATA messages from the client with chunks of data to load
// into a table.
Expand Down
Loading

0 comments on commit 34b0213

Please sign in to comment.