Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/dl_connector_ydb/dl_connector_ydb/core/base/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ def _convert_bytes(value: bytes) -> str:
return value.decode("utf-8", errors="replace")

@staticmethod
def _convert_ts(value: int) -> datetime.datetime:
return datetime.datetime.utcfromtimestamp(value / 1e6).replace(tzinfo=datetime.timezone.utc)
def _convert_ts(value: datetime.datetime) -> datetime.datetime:
return value.replace(tzinfo=datetime.timezone.utc)

def _get_row_converters(self, cursor_info: ExecutionStepCursorInfo) -> Tuple[Optional[Callable[[Any], Any]], ...]:
type_names_norm = [col[1].lower().strip("?") for col in cursor_info.raw_cursor_description]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
)

import sqlalchemy as sa
import ydb.sqlalchemy as ydb_sa
import ydb_sqlalchemy.sqlalchemy as ydb_sa

from dl_constants.enums import UserDataType
from dl_type_transformer.type_transformer import (
Expand Down
17 changes: 9 additions & 8 deletions lib/dl_connector_ydb/dl_connector_ydb/core/ydb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import attr
import grpc
from ydb import DriverConfig
import ydb.dbapi as ydb_dbapi
from ydb.driver import credentials_impl
from ydb.driver import (
Driver,
credentials_impl,
)
import ydb.issues as ydb_cli_err
import ydb_dbapi

from dl_constants.enums import ConnectionType
from dl_core import exc
Expand Down Expand Up @@ -67,11 +70,9 @@ def _update_connect_args(self, args: dict) -> None:
def get_connect_args(self) -> dict:
target_dto = self._target_dto
args = dict(
endpoint="{}://{}:{}".format(
self.proto_schema,
target_dto.host,
target_dto.port,
),
host=target_dto.host,
port=target_dto.port,
protocol=self.proto_schema,
database=target_dto.db_name,
)
self._update_connect_args(args)
Expand All @@ -85,7 +86,7 @@ def _list_table_names_i(self, db_name: str, show_dot: bool = False) -> Iterable[
connection = db_engine.connect()
try:
# SA db_engine -> SA connection -> DBAPI connection -> YDB driver
driver = connection.connection.driver # type: ignore # 2024-01-24 # TODO: "DBAPIConnection" has no attribute "driver" [attr-defined]
driver: Driver = connection.connection._driver # type: ignore # 2024-01-24 # TODO: "DBAPIConnection" has no attribute "driver" [attr-defined]
assert driver

queue = [db_name]
Expand Down
6 changes: 0 additions & 6 deletions lib/dl_connector_ydb/dl_connector_ydb/core/ydb/connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from ydb.sqlalchemy import register_dialect as yql_register_dialect

from dl_core.connectors.base.connector import (
CoreBackendDefinition,
CoreConnectionDefinition,
Expand Down Expand Up @@ -72,7 +70,3 @@ class YDBCoreConnector(CoreConnector):
YDBCoreSubselectSourceDefinition,
)
rqe_adapter_classes = frozenset({YDBAdapter})

@classmethod
def registration_hook(cls) -> None:
yql_register_dialect()
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ class YQLEngineWrapper(EngineWrapperBase):

def get_conn_credentials(self, full: bool = False) -> dict:
return dict(
endpoint=self.engine.url.query["endpoint"],
db_name=self.engine.url.query["database"],
host=self.engine.url.host,
port=self.engine.url.port,
db_name=self.engine.url.database,
)

def get_version(self) -> Optional[str]:
Expand All @@ -63,12 +64,12 @@ def _generate_table_description(self, columns: Sequence[sa.Column]) -> ydb.Table
return table.with_primary_keys(*primary_keys)

def _get_table_path(self, table: sa.Table) -> str:
return os.path.join(self.engine.url.query["database"], table.name) # type: ignore # 2024-01-24 # TODO: Argument 1 to "join" has incompatible type "str | tuple[str, ...]"; expected "str" [arg-type]
return os.path.join(self.engine.url.database, table.name) # type: ignore # 2024-01-24 # TODO: Argument 1 to "join" has incompatible type "str | tuple[str, ...]"; expected "str" [arg-type]

def _get_connection_params(self) -> ydb.DriverConfig:
return ydb.DriverConfig(
endpoint=self.engine.url.query["endpoint"],
database=self.engine.url.query["database"],
endpoint=f"{self.engine.url.host}:{self.engine.url.port}",
database=self.engine.url.database,
)

def table_from_columns(
Expand All @@ -94,8 +95,8 @@ def create_table(self, table: sa.Table) -> None:

def insert_into_table(self, table: sa.Table, data: Sequence[dict]) -> None:
connection_params = ydb.DriverConfig(
endpoint=self.engine.url.query["endpoint"],
database=self.engine.url.query["database"],
endpoint=f"{self.engine.url.host}:{self.engine.url.port}",
database=self.engine.url.database,
)
driver = ydb.Driver(connection_params)
driver.wait(timeout=5)
Expand Down
2 changes: 1 addition & 1 deletion lib/dl_connector_ydb/dl_connector_ydb/formula/connector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ydb.sqlalchemy import YqlDialect as SAYqlDialect
from ydb_sqlalchemy.sqlalchemy import YqlDialect as SAYqlDialect

from dl_formula.connectors.base.connector import FormulaConnector
from dl_query_processing.compilation.query_mutator import RemoveConstFromGroupByFormulaAtomicQueryMutator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sqlalchemy as sa
import ydb.sqlalchemy as ydb_sa
import ydb_sqlalchemy.sqlalchemy as ydb_sa

from dl_formula.definitions.base import (
TranslationVariant,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sqlalchemy as sa
from sqlalchemy.sql.elements import ClauseElement
import ydb_sqlalchemy as ydb_sa

from dl_formula.connectors.base.literal import Literal
from dl_formula.definitions.base import (
Expand Down Expand Up @@ -55,6 +56,8 @@ def _date_datetime_add_yql(
type_name = "day"
mult_expr = mult_expr * 7 # type: ignore # 2024-04-02 # TODO: Unsupported operand types for * ("ClauseElement" and "int") [operator]

mult_expr = sa.func.UNWRAP(sa.cast(mult_expr, ydb_sa.types.Int32))

func_name = YQL_INTERVAL_FUNCS.get(type_name)
if func_name is not None:
func = getattr(sa.func.DateTime, func_name)
Expand Down Expand Up @@ -98,7 +101,7 @@ def _datetrunc2_yql_impl(date_ctx: TranslationCtx, unit_ctx: TranslationCtx) ->
return sa.func.DateTime.MakeDatetime(
sa.func.DateTime.StartOf(
date_expr,
func(amount),
func(sa.cast(amount, ydb_sa.types.Int32)),
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sqlalchemy as sa
import ydb_sqlalchemy as ydb_sa

from dl_formula.definitions.base import (
TranslationVariant,
Expand Down Expand Up @@ -162,7 +163,7 @@
base.FuncRound2(
variants=[
# in YQL Math::Round takes power of 10 instead of precision, so we have to invert the `num` value
V(D.YQL, lambda x, num: sa.func.Math.Round(x, -num)),
V(D.YQL, lambda x, num: sa.func.Math.Round(x, -sa.cast(num, ydb_sa.types.Int32))),
]
),
# sign
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sqlalchemy as sa
import ydb.sqlalchemy as ydb_sa
import ydb_sqlalchemy.sqlalchemy as ydb_sa

from dl_formula.definitions.base import TranslationVariant
from dl_formula.definitions.common import (
Expand All @@ -22,7 +22,11 @@
V(
D.YQL,
lambda x: sa.func.ListHead(
sa.func.Unicode.ToCodePointList(sa.func.Unicode.Substring(sa.cast(x, sa.TEXT), 0, 1))
sa.func.Unicode.ToCodePointList(
sa.func.Unicode.Substring(
sa.cast(x, sa.TEXT), sa.cast(0, ydb_sa.types.UInt64), sa.cast(1, ydb_sa.types.UInt64)
)
)
),
),
]
Expand All @@ -41,7 +45,7 @@
sa.func.Unicode.FromCodePointList(
sa.func.AsList(
# coalesce is needed to un-Nullable the type.
sa.func.COALESCE(sa.cast(value, ydb_sa.types.UInt32), 0),
sa.func.COALESCE(sa.cast(value, ydb_sa.types.UInt32), sa.cast(0, ydb_sa.types.UInt32)),
)
),
),
Expand Down Expand Up @@ -110,7 +114,10 @@
# In YQL indices start from 0, but we count them from 1, so have to do -1/+1 here
V(
D.YQL,
lambda text, piece, startpos: sa.func.COALESCE(sa.func.Unicode.Find(text, piece, startpos), -1) + 1,
lambda text, piece, startpos: sa.func.COALESCE(
sa.func.Unicode.Find(text, piece, sa.cast(startpos, ydb_sa.types.UInt64)), -1
)
+ 1,
),
]
),
Expand All @@ -126,7 +133,12 @@
# left
base.FuncLeft(
variants=[
V(D.YQL, lambda x, y: sa.func.Unicode.Substring(sa.cast(x, sa.TEXT), 0, y)),
V(
D.YQL,
lambda x, y: sa.func.Unicode.Substring(
sa.cast(x, sa.TEXT), sa.cast(0, ydb_sa.types.UInt64), sa.cast(y, ydb_sa.types.UInt64)
),
),
]
),
# len
Expand Down Expand Up @@ -168,7 +180,7 @@
D.YQL,
lambda x, y: sa.func.Unicode.Substring(
sa.cast(x, sa.TEXT),
sa.func.Unicode.GetLength(sa.cast(x, sa.TEXT)) - y,
sa.cast(sa.func.Unicode.GetLength(sa.cast(x, sa.TEXT)) - y, ydb_sa.types.UInt64),
),
),
]
Expand All @@ -190,7 +202,7 @@
lambda text, delim, ind: sa.func.ListHead(
sa.func.ListSkip(
sa.func.Unicode.SplitToList(sa.cast(text, sa.TEXT), delim), # must be non-nullable
ind - 1,
sa.func.UNWRAP(sa.cast(ind - 1, ydb_sa.types.UInt32)),
)
),
),
Expand All @@ -213,13 +225,25 @@
base.FuncSubstr2(
variants=[
# In YQL indices start from 0, but we count them from 1, so have to do -1 here
V(D.YQL, lambda val, start: sa.func.Unicode.Substring(sa.cast(val, sa.TEXT), start - 1)),
V(
D.YQL,
lambda val, start: sa.func.Unicode.Substring(
sa.cast(val, sa.TEXT), sa.func.UNWRAP(sa.cast(start - 1, ydb_sa.types.UInt64))
),
),
]
),
base.FuncSubstr3(
variants=[
# In YQL indices start from 0, but we count them from 1, so have to do -1 here
V(D.YQL, lambda val, start, length: sa.func.Unicode.Substring(sa.cast(val, sa.TEXT), start - 1, length)),
V(
D.YQL,
lambda val, start, length: sa.func.Unicode.Substring(
sa.cast(val, sa.TEXT),
sa.func.UNWRAP(sa.cast(start - 1, ydb_sa.types.UInt64)),
sa.func.UNWRAP(sa.cast(length, ydb_sa.types.UInt64)),
),
),
]
),
# upper
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sqlalchemy as sa
import ydb_sqlalchemy as ydb_sa

from dl_formula.definitions.base import TranslationVariant
from dl_formula.definitions.common_datetime import DAY_USEC
Expand All @@ -24,12 +25,12 @@
base.BinaryPlusStrings.for_dialect(D.YQL),
base.BinaryPlusDateInt(
variants=[
V(D.YQL, lambda date, days: date + sa.func.DateTime.IntervalFromDays(days)),
V(D.YQL, lambda date, days: date + sa.func.DateTime.IntervalFromDays(sa.cast(days, ydb_sa.types.Int32))),
]
),
base.BinaryPlusDateFloat(
variants=[
V(D.YQL, lambda date, days: date + sa.func.DateTime.IntervalFromDays(sa.cast(days, sa.INTEGER))),
V(D.YQL, lambda date, days: date + sa.func.DateTime.IntervalFromDays(sa.cast(days, ydb_sa.types.Int32))),
]
),
base.BinaryPlusDatetimeNumber(
Expand Down Expand Up @@ -57,15 +58,15 @@
base.BinaryMinusNumbers.for_dialect(D.YQL),
base.BinaryMinusDateInt(
variants=[
V(D.YQL, lambda date, days: date - sa.func.DateTime.IntervalFromDays(days)),
V(D.YQL, lambda date, days: date - sa.func.DateTime.IntervalFromDays(sa.cast(days, ydb_sa.types.Int32))),
]
),
base.BinaryMinusDateFloat(
variants=[
V(
D.YQL,
lambda date, days: (
date - sa.func.DateTime.IntervalFromDays(sa.cast(sa.func.Math.Ceil(days), sa.INTEGER))
date - sa.func.DateTime.IntervalFromDays(sa.cast(sa.func.Math.Ceil(days), ydb_sa.types.Int32))
),
),
]
Expand Down
2 changes: 1 addition & 1 deletion lib/dl_connector_ydb/dl_connector_ydb_tests/db/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
core_connector_ep_names=["ydb"],
)

_DB_URL = f'yql:///?endpoint={get_test_container_hostport("db-ydb", fallback_port=51900).host}%3A{get_test_container_hostport("db-ydb", fallback_port=51900).port}&database=%2Flocal'
_DB_URL = f'yql://{get_test_container_hostport("db-ydb", fallback_port=51900).host}:{get_test_container_hostport("db-ydb", fallback_port=51900).port}//local'
DB_CORE_URL = _DB_URL
DB_CONFIGURATIONS = {
D.YDB: _DB_URL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ class TestConditionalBlockYQL(YQLTestBase, DefaultLiteralFormulaConnectorTestSui
supports_microseconds = False
supports_utc = False
supports_custom_tz = False

def test_number123(self, dbe) -> None: # TODO remove debug
assert dbe.eval("1") == 1
assert type(dbe.eval("1")) is int
x = dbe.eval("1.2")
assert x == 1.2, x
2 changes: 1 addition & 1 deletion lib/dl_connector_ydb/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.7'

services:
db-ydb:
image: "cr.yandex/yc/yandex-docker-local-ydb:latest"
image: "ydbplatform/local-ydb:trunk"
environment:
YDB_LOCAL_SURVIVE_RESTART: "true"
GRPC_PORT: "51900"
Expand Down
6 changes: 4 additions & 2 deletions lib/dl_connector_ydb/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ marshmallow = ">=3.19.0"
python = ">=3.10, <3.13"
sqlalchemy = ">=1.4.46, <2.0"
typing-extensions = ">=4.9.0"
ydb = ">=3.5.1"
ydb = ">=3.18.10"
ydb-sqlalchemy = ">=0.1.4"
ydb-dbapi = ">=0.1.5"
dl-api-commons = {path = "../dl_api_commons"}
dl-api-connector = {path = "../dl_api_connector"}
dl-configs = {path = "../dl_configs"}
Expand Down Expand Up @@ -85,7 +87,7 @@ check_untyped_defs = true
strict_optional = true

[[tool.mypy.overrides]]
module = ["ydb_proto_stubs_import.*", "ydb.*"]
module = ["ydb_proto_stubs_import.*", "ydb.*", "ydb_sqlalchemy.*", "ydb_dbapi.*"]
ignore_missing_imports = true

[datalens.i18n.domains]
Expand Down
1 change: 1 addition & 0 deletions lib/dl_formula_testing/dl_formula_testing/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def convert(val): # type: ignore # 2024-01-29 # TODO: Function is missing a ty
return [convert(row[0]) for row in self.db.execute(query).fetchall()]
else:
result = convert(self.db.execute(query).scalar())
print("result:", result) # TODO remove debug
return result
except Exception as exc:
exc_str = str(exc)
Expand Down
Loading
Loading