Skip to content

Commit

Permalink
enable filesystem sql client to work on streamlit
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Aug 13, 2024
1 parent 779bca6 commit 584ab47
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
2 changes: 1 addition & 1 deletion dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJob]:


class FilesystemClient(
FSClientBase, JobClientBase, WithStagingDataset, WithStateSync, WithSqlClient
FSClientBase, WithSqlClient, JobClientBase, WithStagingDataset, WithStateSync
):
"""filesystem client storing jobs in memory"""

Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,9 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB
yield DuckDBDBApiCursorImpl(self._conn) # type: ignore
except duckdb.Error as outer:
raise outer

def open_connection(self) -> None:
pass

def close_connection(self) -> None:
pass
11 changes: 10 additions & 1 deletion dlt/destinations/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dlt.common.typing import TFun
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.utils import concat_strings_with_limit
from dlt.common.destination.reference import JobClientBase

from dlt.destinations.exceptions import (
DestinationConnectionError,
Expand Down Expand Up @@ -286,11 +287,19 @@ def _truncate_table_sql(self, qualified_table_name: str) -> str:
return f"DELETE FROM {qualified_table_name} WHERE 1=1;"


class WithSqlClient:
class WithSqlClient(JobClientBase):
@property
@abstractmethod
def sql_client(self) -> SqlClientBase[TNativeConn]: ...

def __enter__(self) -> "WithSqlClient":
return self

def __exit__(
self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType
) -> None:
pass


class DBApiCursorImpl(DBApiCursor):
"""A DBApi Cursor wrapper with dataframes reading functionality"""
Expand Down
9 changes: 7 additions & 2 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
from dlt.extract.extract import Extract, data_to_sources
from dlt.normalize import Normalize
from dlt.normalize.configuration import NormalizeConfiguration
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
from dlt.destinations.fs_client import FSClientBase
from dlt.destinations.job_client_impl import SqlJobClientBase
from dlt.load.configuration import LoaderConfiguration
Expand Down Expand Up @@ -1001,7 +1001,12 @@ def sql_client(self, schema_name: str = None) -> SqlClientBase[Any]:
# "Sql Client is not available in a pipeline without a default schema. Extract some data first or restore the pipeline from the destination using 'restore_from_destination' flag. There's also `_inject_schema` method for advanced users."
# )
schema = self._get_schema_or_create(schema_name)
return self._sql_job_client(schema).sql_client
client_config = self._get_destination_client_initial_config()
client = self._get_destination_clients(schema, client_config)[0]
if isinstance(client, WithSqlClient):
return client.sql_client
else:
raise SqlClientNotAvailable(self.pipeline_name, self.destination.destination_name)

def _fs_client(self, schema_name: str = None) -> FSClientBase:
"""Returns a filesystem client configured to point to the right folder / bucket for each table.
Expand Down

0 comments on commit 584ab47

Please sign in to comment.