Skip to content

Commit 5da6d2b

Browse files
feat: add support for catalogs (apache#28416)
1 parent b1f85dc commit 5da6d2b

14 files changed

+504
-62
lines changed

Diff for: superset/db_engine_specs/README.md

+2-20
Original file line numberDiff line numberDiff line change
@@ -706,29 +706,11 @@ Hive and Trino:
706706
4. Table
707707
5. Column
708708

709-
If the database supports catalogs, then the DB engine spec should have the `supports_catalog` class attribute set to true.
709+
If the database supports catalogs, then the DB engine spec should have the `supports_catalog` class attribute set to true. It should also implement the `get_default_catalog` method, so that the proper permissions can be created when datasets are added.
710710

711711
### Dynamic catalog
712712

713-
Superset has no support for multiple catalogs. A given SQLAlchemy URI connects to a single catalog, and it's impossible to browse other catalogs, or change the catalog. This means that datasets can only be added for the main catalog of the database. For example, with this Postgres SQLAlchemy URI:
714-
715-
```
716-
postgresql://admin:[email protected]:5432/db
717-
```
718-
719-
Here, datasets can only be added to the `db` catalog (which Postgres calls a "database").
720-
721-
One confusing problem is that many databases allow querying across catalogs in SQL Lab. For example, with BigQuery one can write:
722-
723-
```sql
724-
SELECT * FROM project.schema.table
725-
```
726-
727-
This means that **even though the database is configured for a given catalog (project), users can query other projects**. This is a common workaround for creating datasets in catalogs other than the catalog configured in the database: just create a virtual dataset.
728-
729-
Ideally we would want users to be able to choose the catalog when using SQL Lab and when creating datasets. In order to do that, DB engine specs need to implement a method that rewrites the SQLAlchemy URI depending on the desired catalog. This method already exists, and is the same method used for dynamic schemas, `adjust_engine_params`, but currently there are no UI affordances for choosing a catalog.
730-
731-
Before the UI is implemented Superset still needs to implement support for catalogs in its security manager. But in the meantime, it's possible for DB engine spec developers to support dynamic catalogs, by setting `supports_dynamic_catalog` to true and implementing `adjust_engine_params` to handle a catalog.
713+
Superset support for multiple catalogs. Since, in general, a given SQLAlchemy URI connects only to a single catalog, it requires DB engine specs to implement the `adjust_engine_params` method to rewrite the URL to connect to a different catalog, similar to how dynamic schemas work. Additionally, DB engine specs should also implement the `get_catalog_names` method, so that users can browse the available catalogs.
732714

733715
### SSH tunneling
734716

Diff for: superset/db_engine_specs/bigquery.py

+33-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from sqlalchemy import column, types
3636
from sqlalchemy.engine.base import Engine
3737
from sqlalchemy.engine.reflection import Inspector
38+
from sqlalchemy.engine.url import URL
3839
from sqlalchemy.sql import sqltypes
3940

4041
from superset import sql_parse
@@ -127,7 +128,7 @@ class BigQueryEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-met
127128

128129
allows_hidden_cc_in_orderby = True
129130

130-
supports_catalog = False
131+
supports_catalog = supports_dynamic_catalog = True
131132

132133
"""
133134
https://www.python.org/dev/peps/pep-0249/#arraysize
@@ -459,6 +460,24 @@ def estimate_query_cost( # pylint: disable=too-many-arguments
459460
for statement in statements
460461
]
461462

463+
@classmethod
464+
def get_default_catalog(cls, database: Database) -> str | None:
465+
"""
466+
Get the default catalog.
467+
"""
468+
url = database.url_object
469+
470+
# The SQLAlchemy driver accepts both `bigquery://project` (where the project is
471+
# technically a host) and `bigquery:///project` (where it's a database). But
472+
# both can be missing, and the project is inferred from the authentication
473+
# credentials.
474+
if project := url.host or url.database:
475+
return project
476+
477+
with database.get_sqla_engine() as engine:
478+
client = cls._get_client(engine)
479+
return client.project
480+
462481
@classmethod
463482
def get_catalog_names(
464483
cls,
@@ -477,6 +496,19 @@ def get_catalog_names(
477496

478497
return {project.project_id for project in projects}
479498

499+
@classmethod
500+
def adjust_engine_params(
501+
cls,
502+
uri: URL,
503+
connect_args: dict[str, Any],
504+
catalog: str | None = None,
505+
schema: str | None = None,
506+
) -> tuple[URL, dict[str, Any]]:
507+
if catalog:
508+
uri = uri.set(host=catalog, database="")
509+
510+
return uri, connect_args
511+
480512
@classmethod
481513
def get_allow_cost_estimate(cls, extra: dict[str, Any]) -> bool:
482514
return True

Diff for: superset/db_engine_specs/presto.py

+36-21
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
1718
# pylint: disable=too-many-lines
19+
1820
from __future__ import annotations
1921

2022
import contextlib
@@ -165,6 +167,7 @@ class PrestoBaseEngineSpec(BaseEngineSpec, metaclass=ABCMeta):
165167
"""
166168

167169
supports_dynamic_schema = True
170+
supports_catalog = supports_dynamic_catalog = True
168171

169172
column_type_mappings = (
170173
(
@@ -295,6 +298,24 @@ def convert_dttm(
295298
def epoch_to_dttm(cls) -> str:
296299
return "from_unixtime({col})"
297300

301+
@classmethod
302+
def get_default_catalog(cls, database: "Database") -> str | None:
303+
"""
304+
Return the default catalog.
305+
"""
306+
return database.url_object.database.split("/")[0]
307+
308+
@classmethod
309+
def get_catalog_names(
310+
cls,
311+
database: Database,
312+
inspector: Inspector,
313+
) -> set[str]:
314+
"""
315+
Get all catalogs.
316+
"""
317+
return {catalog for (catalog,) in inspector.bind.execute("SHOW CATALOGS")}
318+
298319
@classmethod
299320
def adjust_engine_params(
300321
cls,
@@ -303,14 +324,22 @@ def adjust_engine_params(
303324
catalog: str | None = None,
304325
schema: str | None = None,
305326
) -> tuple[URL, dict[str, Any]]:
306-
database = uri.database
307-
if schema and database:
327+
if uri.database and "/" in uri.database:
328+
current_catalog, current_schema = uri.database.split("/", 1)
329+
else:
330+
current_catalog, current_schema = uri.database, None
331+
332+
if schema:
308333
schema = parse.quote(schema, safe="")
309-
if "/" in database:
310-
database = database.split("/")[0] + "/" + schema
311-
else:
312-
database += "/" + schema
313-
uri = uri.set(database=database)
334+
335+
adjusted_database = "/".join(
336+
[
337+
catalog or current_catalog or "",
338+
schema or current_schema or "",
339+
]
340+
).rstrip("/")
341+
342+
uri = uri.set(database=adjusted_database)
314343

315344
return uri, connect_args
316345

@@ -651,8 +680,6 @@ class PrestoEngineSpec(PrestoBaseEngineSpec):
651680
engine_name = "Presto"
652681
allows_alias_to_source_column = False
653682

654-
supports_catalog = False
655-
656683
custom_errors: dict[Pattern[str], tuple[str, SupersetErrorType, dict[str, Any]]] = {
657684
COLUMN_DOES_NOT_EXIST_REGEX: (
658685
__(
@@ -815,17 +842,6 @@ def get_view_names(
815842
results = cursor.fetchall()
816843
return {row[0] for row in results}
817844

818-
@classmethod
819-
def get_catalog_names(
820-
cls,
821-
database: Database,
822-
inspector: Inspector,
823-
) -> set[str]:
824-
"""
825-
Get all catalogs.
826-
"""
827-
return {catalog for (catalog,) in inspector.bind.execute("SHOW CATALOGS")}
828-
829845
@classmethod
830846
def _create_column_info(
831847
cls, name: str, data_type: types.TypeEngine
@@ -1251,7 +1267,6 @@ def get_extra_table_metadata(
12511267
),
12521268
}
12531269

1254-
# flake8 is not matching `Optional[str]` to `Any` for some reason...
12551270
metadata["view"] = cast(
12561271
Any,
12571272
cls.get_create_view(database, table.schema, table.table),

Diff for: superset/db_engine_specs/snowflake.py

+21-7
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class SnowflakeEngineSpec(PostgresBaseEngineSpec):
8585
sqlalchemy_uri_placeholder = "snowflake://"
8686

8787
supports_dynamic_schema = True
88-
supports_catalog = False
88+
supports_catalog = supports_dynamic_catalog = True
8989

9090
_time_grain_expressions = {
9191
None: "{col}",
@@ -144,12 +144,19 @@ def adjust_engine_params(
144144
catalog: Optional[str] = None,
145145
schema: Optional[str] = None,
146146
) -> tuple[URL, dict[str, Any]]:
147-
database = uri.database
148-
if "/" in database:
149-
database = database.split("/")[0]
150-
if schema:
151-
schema = parse.quote(schema, safe="")
152-
uri = uri.set(database=f"{database}/{schema}")
147+
if "/" in uri.database:
148+
current_catalog, current_schema = uri.database.split("/", 1)
149+
else:
150+
current_catalog, current_schema = uri.database, None
151+
152+
adjusted_database = "/".join(
153+
[
154+
catalog or current_catalog,
155+
schema or current_schema or "",
156+
]
157+
).rstrip("/")
158+
159+
uri = uri.set(database=adjusted_database)
153160

154161
return uri, connect_args
155162

@@ -169,6 +176,13 @@ def get_schema_from_engine_params(
169176

170177
return parse.unquote(database.split("/")[1])
171178

179+
@classmethod
180+
def get_default_catalog(cls, database: "Database") -> Optional[str]:
181+
"""
182+
Return the default catalog.
183+
"""
184+
return database.url_object.database.split("/")[0]
185+
172186
@classmethod
173187
def get_catalog_names(
174188
cls,

Diff for: superset/migrations/shared/catalogs.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class Slice(Base):
8686
schema_perm = sa.Column(sa.String(1000))
8787

8888

89-
def upgrade_catalog_perms(engine: str | None = None) -> None:
89+
def upgrade_catalog_perms(engines: set[str] | None = None) -> None:
9090
"""
9191
Update models when catalogs are introduced in a DB engine spec.
9292
@@ -102,7 +102,7 @@ def upgrade_catalog_perms(engine: str | None = None) -> None:
102102
for database in session.query(Database).all():
103103
db_engine_spec = database.db_engine_spec
104104
if (
105-
engine and db_engine_spec.engine != engine
105+
engines and db_engine_spec.engine not in engines
106106
) or not db_engine_spec.supports_catalog:
107107
continue
108108

@@ -166,7 +166,7 @@ def upgrade_catalog_perms(engine: str | None = None) -> None:
166166
session.commit()
167167

168168

169-
def downgrade_catalog_perms(engine: str | None = None) -> None:
169+
def downgrade_catalog_perms(engines: set[str] | None = None) -> None:
170170
"""
171171
Reverse the process of `upgrade_catalog_perms`.
172172
"""
@@ -175,7 +175,7 @@ def downgrade_catalog_perms(engine: str | None = None) -> None:
175175
for database in session.query(Database).all():
176176
db_engine_spec = database.db_engine_spec
177177
if (
178-
engine and db_engine_spec.engine != engine
178+
engines and db_engine_spec.engine not in engines
179179
) or not db_engine_spec.supports_catalog:
180180
continue
181181

Diff for: superset/migrations/versions/2024-05-01_10-52_58d051681a3b_add_catalog_perm_to_tables.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ def upgrade():
4444
"slices",
4545
sa.Column("catalog_perm", sa.String(length=1000), nullable=True),
4646
)
47-
upgrade_catalog_perms(engine="postgresql")
47+
upgrade_catalog_perms(engines={"postgresql"})
4848

4949

5050
def downgrade():
5151
op.drop_column("slices", "catalog_perm")
5252
op.drop_column("tables", "catalog_perm")
53-
downgrade_catalog_perms(engine="postgresql")
53+
downgrade_catalog_perms(engines={"postgresql"})

Diff for: superset/migrations/versions/2024-05-08_19-33_4081be5b6b74_enable_catalog_in_databricks.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333

3434

3535
def upgrade():
36-
upgrade_catalog_perms(engine="databricks")
36+
upgrade_catalog_perms(engines={"databricks"})
3737

3838

3939
def downgrade():
40-
downgrade_catalog_perms(engine="databricks")
40+
downgrade_catalog_perms(engines={"databricks"})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Enable catalog in BigQuery/Presto/Trino/Snowflake
18+
19+
Revision ID: 87ffc36f9842
20+
Revises: 4081be5b6b74
21+
Create Date: 2024-05-09 18:44:43.289445
22+
23+
"""
24+
25+
from superset.migrations.shared.catalogs import (
26+
downgrade_catalog_perms,
27+
upgrade_catalog_perms,
28+
)
29+
30+
# revision identifiers, used by Alembic.
31+
revision = "87ffc36f9842"
32+
down_revision = "4081be5b6b74"
33+
34+
35+
def upgrade():
36+
upgrade_catalog_perms(engines={"trino", "presto", "bigquery", "snowflake"})
37+
38+
39+
def downgrade():
40+
downgrade_catalog_perms(engines={"trino", "presto", "bigquery", "snowflake"})

Diff for: tests/integration_tests/databases/api_tests.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3281,7 +3281,7 @@ def test_available(self, app, get_available_engine_specs):
32813281
"sqlalchemy_uri_placeholder": "bigquery://{project_id}",
32823282
"engine_information": {
32833283
"supports_file_upload": True,
3284-
"supports_dynamic_catalog": False,
3284+
"supports_dynamic_catalog": True,
32853285
"disable_ssh_tunneling": True,
32863286
},
32873287
},

Diff for: tests/integration_tests/model_tests.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ def test_impersonate_user_presto(self, mocked_create_engine):
167167
model._get_sqla_engine()
168168
call_args = mocked_create_engine.call_args
169169

170-
assert str(call_args[0][0]) == "presto://gamma@localhost"
170+
assert str(call_args[0][0]) == "presto://gamma@localhost/"
171171

172172
assert call_args[1]["connect_args"] == {
173173
"protocol": "https",
@@ -180,7 +180,7 @@ def test_impersonate_user_presto(self, mocked_create_engine):
180180
model._get_sqla_engine()
181181
call_args = mocked_create_engine.call_args
182182

183-
assert str(call_args[0][0]) == "presto://localhost"
183+
assert str(call_args[0][0]) == "presto://localhost/"
184184

185185
assert call_args[1]["connect_args"] == {
186186
"protocol": "https",
@@ -225,7 +225,7 @@ def test_impersonate_user_trino(self, mocked_create_engine):
225225
model._get_sqla_engine()
226226
call_args = mocked_create_engine.call_args
227227

228-
assert str(call_args[0][0]) == "trino://localhost"
228+
assert str(call_args[0][0]) == "trino://localhost/"
229229
assert call_args[1]["connect_args"]["user"] == "gamma"
230230

231231
model = Database(
@@ -239,7 +239,7 @@ def test_impersonate_user_trino(self, mocked_create_engine):
239239

240240
assert (
241241
str(call_args[0][0])
242-
== "trino://original_user:original_user_password@localhost"
242+
== "trino://original_user:original_user_password@localhost/"
243243
)
244244
assert call_args[1]["connect_args"]["user"] == "gamma"
245245

0 commit comments

Comments
 (0)