Skip to content

Commit

Permalink
Merge pull request #217 from bmsuisse/updates
Browse files Browse the repository at this point in the history
  • Loading branch information
aersam authored Oct 29, 2024
2 parents 876a9a5 + f06145d commit aca8138
Show file tree
Hide file tree
Showing 10 changed files with 1,362 additions and 1,814 deletions.
8 changes: 4 additions & 4 deletions bmsdna/lakeapi/context/df_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime, timezone

from sqlglot import Dialect
from bmsdna.lakeapi.core.types import FileTypes
from bmsdna.lakeapi.core.types import FileTypes, OperatorType
from typing import Literal, Optional, List, Tuple, Any, TYPE_CHECKING, Union
import pyarrow as pa
from deltalake import DeltaTable
Expand Down Expand Up @@ -211,7 +211,7 @@ def get_pyarrow_dataset(
self,
uri: SourceUri,
file_type: FileTypes,
partitions: Optional[List[Tuple[str, str, Any]]],
partitions: Optional[List[Tuple[str, OperatorType, Any]]],
) -> "Optional[pas.Dataset | pa.Table]":
spec_fs, spec_uri = uri.get_fs_spec()
match file_type:
Expand Down Expand Up @@ -260,7 +260,7 @@ def get_pyarrow_dataset(
"Delta table protocol version not supported, use DuckDB or Polars"
)
return dt.to_pyarrow_dataset(
partitions=partitions,
partitions=partitions, # type: ignore
parquet_read_options={"coerce_int96_timestamp_unit": "us"},
)
case _:
Expand Down Expand Up @@ -387,7 +387,7 @@ def register_datasource(
source_table_name: Optional[str],
uri: SourceUri,
file_type: FileTypes,
partitions: Optional[List[Tuple[str, str, Any]]],
partitions: Optional[List[Tuple[str, OperatorType, Any]]],
):
ds = self.get_pyarrow_dataset(uri, file_type, partitions)
self.modified_dates[target_name] = self.get_modified_date(uri, file_type)
Expand Down
6 changes: 3 additions & 3 deletions bmsdna/lakeapi/context/df_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pyarrow as pa
from typing import List, Optional, Tuple, Any, Union, cast
from bmsdna.lakeapi.core.types import FileTypes
from bmsdna.lakeapi.core.types import FileTypes, OperatorType
from bmsdna.lakeapi.context.df_base import ExecutionContext, ResultData, get_sql
from deltalake2db import duckdb_create_view_for_delta, duckdb_apply_storage_options
import duckdb
Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(
def columns(self):
return self.arrow_schema().names

def query_builder(self) -> ex.Query:
def query_builder(self) -> ex.Select:
if not isinstance(self.original_sql, str):
return from_(self.original_sql.subquery())
else:
Expand Down Expand Up @@ -315,7 +315,7 @@ def register_datasource(
source_table_name: Optional[str],
uri: SourceUri,
file_type: FileTypes,
partitions: List[Tuple[str, str, Any]] | None,
partitions: List[Tuple[str, OperatorType, Any]] | None,
):
self.modified_dates[target_name] = self.get_modified_date(uri, file_type)

Expand Down
6 changes: 3 additions & 3 deletions bmsdna/lakeapi/context/df_odbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pyarrow as pa
from typing import List, Optional, Tuple, Any, Union, cast
from bmsdna.lakeapi.core.types import FileTypes
from bmsdna.lakeapi.core.types import FileTypes, OperatorType
from bmsdna.lakeapi.context.df_base import (
FLAVORS,
ExecutionContext,
Expand Down Expand Up @@ -70,7 +70,7 @@ def __init__(
def columns(self):
return self.arrow_schema().names

def query_builder(self) -> ex.Query:
def query_builder(self) -> ex.Select:
if not isinstance(self.original_sql, str):
return from_(self.original_sql.subquery().as_("t"))
else:
Expand Down Expand Up @@ -182,7 +182,7 @@ def register_datasource(
source_table_name: Optional[str],
uri: SourceUri,
file_type: FileTypes,
partitions: List[Tuple[str, str, Any]] | None,
partitions: List[Tuple[str, OperatorType, Any]] | None,
):
assert file_type == "odbc"
assert uri.account is None
Expand Down
2 changes: 1 addition & 1 deletion bmsdna/lakeapi/context/df_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def columns(self):
return _df.columns
return self._df.columns

def query_builder(self) -> ex.Query:
def query_builder(self) -> ex.Select:
if not isinstance(self.sql, str):
return from_(self.sql.subquery(alias="s1"))
else:
Expand Down
4 changes: 2 additions & 2 deletions bmsdna/lakeapi/core/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
)
from bmsdna.lakeapi.core.log import get_logger
from bmsdna.lakeapi.core.model import get_param_def
from bmsdna.lakeapi.core.types import DeltaOperatorTypes
from bmsdna.lakeapi.core.types import DeltaOperatorTypes, OperatorType

logger = get_logger(__name__)

Expand Down Expand Up @@ -176,7 +176,7 @@ def get_schema(self) -> pa.Schema:

def get_df(
self,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
partitions: Optional[List[Tuple[str, OperatorType, Any]]] = None,
endpoint: endpoints = "request",
) -> ResultData:
if self.df is None:
Expand Down
4 changes: 2 additions & 2 deletions bmsdna/lakeapi/core/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ def __init__(
**kwargs,
):
if isinstance(content, typing.AsyncIterable):
self.body_iterator = content
self.body_iterator = content # type: ignore
else:
self.body_iterator = iterate_in_threadpool(content)
self.body_iterator = iterate_in_threadpool(content) # type: ignore

# taking over from FileResponse
self.status_code = status_code
Expand Down
Loading

0 comments on commit aca8138

Please sign in to comment.