Skip to content

Commit 8eefc90

Browse files
authored
Merge pull request #59 from LuckySting/tracing-and-request-timeout
Add request settings to execution options
2 parents 3ff29ac + 06f35f9 commit 8eefc90

File tree

5 files changed

+157
-55
lines changed

5 files changed

+157
-55
lines changed

ydb_sqlalchemy/dbapi/connection.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,25 @@ def __init__(
5858
self.tx_mode: ydb.AbstractTransactionModeBuilder = ydb.SerializableReadWrite()
5959
self.tx_context: Optional[ydb.TxContext] = None
6060
self.use_scan_query: bool = False
61+
self.request_settings: ydb.BaseRequestSettings = ydb.BaseRequestSettings()
6162

6263
def cursor(self):
6364
return self._cursor_class(
64-
self.driver, self.session_pool, self.tx_mode, self.tx_context, self.use_scan_query, self.table_path_prefix
65+
driver=self.driver,
66+
session_pool=self.session_pool,
67+
tx_mode=self.tx_mode,
68+
tx_context=self.tx_context,
69+
request_settings=self.request_settings,
70+
use_scan_query=self.use_scan_query,
71+
table_path_prefix=self.table_path_prefix,
6572
)
6673

6774
def describe(self, table_path: str) -> ydb.TableDescription:
6875
abs_table_path = posixpath.join(self.database, self.table_path_prefix, table_path)
6976
cursor = self.cursor()
7077
return cursor.describe_table(abs_table_path)
7178

72-
def check_exists(self, table_path: str) -> ydb.SchemeEntry:
79+
def check_exists(self, table_path: str) -> bool:
7380
abs_table_path = posixpath.join(self.database, self.table_path_prefix, table_path)
7481
cursor = self.cursor()
7582
return cursor.check_exists(abs_table_path)
@@ -124,6 +131,12 @@ def set_ydb_scan_query(self, value: bool) -> None:
124131
def get_ydb_scan_query(self) -> bool:
125132
return self.use_scan_query
126133

134+
def set_ydb_request_settings(self, value: ydb.BaseRequestSettings) -> None:
135+
self.request_settings = value
136+
137+
def get_ydb_request_settings(self) -> ydb.BaseRequestSettings:
138+
return self.request_settings
139+
127140
def begin(self):
128141
self.tx_context = None
129142
if self.interactive_transaction and not self.use_scan_query:

ydb_sqlalchemy/dbapi/cursor.py

Lines changed: 95 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,7 @@
55
import itertools
66
import posixpath
77
from collections.abc import AsyncIterator
8-
from typing import (
9-
Any,
10-
Dict,
11-
Generator,
12-
List,
13-
Mapping,
14-
Optional,
15-
Sequence,
16-
Union,
17-
)
8+
from typing import Any, Dict, Generator, List, Mapping, Optional, Sequence, Union
189

1910
import ydb
2011
import ydb.aio
@@ -29,6 +20,7 @@
2920
OperationalError,
3021
ProgrammingError,
3122
)
23+
from .tracing import maybe_get_current_trace_id
3224

3325

3426
def get_column_type(type_obj: Any) -> str:
@@ -87,35 +79,40 @@ def __init__(
8779
driver: Union[ydb.Driver, ydb.aio.Driver],
8880
session_pool: Union[ydb.SessionPool, ydb.aio.SessionPool],
8981
tx_mode: ydb.AbstractTransactionModeBuilder,
82+
request_settings: ydb.BaseRequestSettings,
9083
tx_context: Optional[ydb.BaseTxContext] = None,
9184
use_scan_query: bool = False,
9285
table_path_prefix: str = "",
9386
):
9487
self.driver = driver
9588
self.session_pool = session_pool
9689
self.tx_mode = tx_mode
90+
self.request_settings = request_settings
9791
self.tx_context = tx_context
9892
self.use_scan_query = use_scan_query
93+
self.root_directory = table_path_prefix
9994
self.description = None
10095
self.arraysize = 1
10196
self.rows = None
10297
self._rows_prefetched = None
103-
self.root_directory = table_path_prefix
10498

10599
@_handle_ydb_errors
106100
def describe_table(self, abs_table_path: str) -> ydb.TableDescription:
107-
return self._retry_operation_in_pool(self._describe_table, abs_table_path)
101+
settings = self._get_request_settings()
102+
return self._retry_operation_in_pool(self._describe_table, abs_table_path, settings)
108103

109104
def check_exists(self, abs_table_path: str) -> bool:
105+
settings = self._get_request_settings()
110106
try:
111-
self._retry_operation_in_pool(self._describe_path, abs_table_path)
107+
self._retry_operation_in_pool(self._describe_path, abs_table_path, settings)
112108
return True
113109
except ydb.SchemeError:
114110
return False
115111

116112
@_handle_ydb_errors
117113
def get_table_names(self, abs_dir_path: str) -> List[str]:
118-
directory: ydb.Directory = self._retry_operation_in_pool(self._list_directory, abs_dir_path)
114+
settings = self._get_request_settings()
115+
directory: ydb.Directory = self._retry_operation_in_pool(self._list_directory, abs_dir_path, settings)
119116
result = []
120117
for child in directory.children:
121118
child_abs_path = posixpath.join(abs_dir_path, child.name)
@@ -180,79 +177,95 @@ def _make_data_query(
180177
def _execute_scan_query(
181178
self, query: Union[ydb.DataQuery, str], parameters: Optional[Mapping[str, Any]] = None
182179
) -> Generator[ydb.convert.ResultSet, None, None]:
180+
settings = self._get_request_settings()
183181
prepared_query = query
184182
if isinstance(query, str) and parameters:
185-
prepared_query: ydb.DataQuery = self._retry_operation_in_pool(self._prepare, query)
183+
prepared_query: ydb.DataQuery = self._retry_operation_in_pool(self._prepare, query, settings)
186184

187185
if isinstance(query, str):
188186
scan_query = ydb.ScanQuery(query, None)
189187
else:
190188
scan_query = ydb.ScanQuery(prepared_query.yql_text, prepared_query.parameters_types)
191189

192-
return self._execute_scan_query_in_driver(scan_query, parameters)
190+
return self._execute_scan_query_in_driver(scan_query, parameters, settings)
193191

194192
@_handle_ydb_errors
195193
def _execute_dml(
196194
self, query: Union[ydb.DataQuery, str], parameters: Optional[Mapping[str, Any]] = None
197195
) -> ydb.convert.ResultSets:
196+
settings = self._get_request_settings()
198197
prepared_query = query
199198
if isinstance(query, str) and parameters:
200199
if self.tx_context:
201-
prepared_query = self._run_operation_in_session(self._prepare, query)
200+
prepared_query = self._run_operation_in_session(self._prepare, query, settings)
202201
else:
203-
prepared_query = self._retry_operation_in_pool(self._prepare, query)
202+
prepared_query = self._retry_operation_in_pool(self._prepare, query, settings)
204203

205204
if self.tx_context:
206-
return self._run_operation_in_tx(self._execute_in_tx, prepared_query, parameters)
205+
return self._run_operation_in_tx(self._execute_in_tx, prepared_query, parameters, settings)
207206

208-
return self._retry_operation_in_pool(self._execute_in_session, self.tx_mode, prepared_query, parameters)
207+
return self._retry_operation_in_pool(
208+
self._execute_in_session, self.tx_mode, prepared_query, parameters, settings
209+
)
209210

210211
@_handle_ydb_errors
211212
def _execute_ddl(self, query: str) -> ydb.convert.ResultSets:
212-
return self._retry_operation_in_pool(self._execute_scheme, query)
213+
settings = self._get_request_settings()
214+
return self._retry_operation_in_pool(self._execute_scheme, query, settings)
213215

214216
@staticmethod
215-
def _execute_scheme(session: ydb.Session, query: str) -> ydb.convert.ResultSets:
216-
return session.execute_scheme(query)
217+
def _execute_scheme(
218+
session: ydb.Session,
219+
query: str,
220+
settings: ydb.BaseRequestSettings,
221+
) -> ydb.convert.ResultSets:
222+
return session.execute_scheme(query, settings)
217223

218224
@staticmethod
219-
def _describe_table(session: ydb.Session, abs_table_path: str) -> ydb.TableDescription:
220-
return session.describe_table(abs_table_path)
225+
def _describe_table(
226+
session: ydb.Session, abs_table_path: str, settings: ydb.BaseRequestSettings
227+
) -> ydb.TableDescription:
228+
return session.describe_table(abs_table_path, settings)
221229

222230
@staticmethod
223-
def _describe_path(session: ydb.Session, table_path: str) -> ydb.SchemeEntry:
224-
return session._driver.scheme_client.describe_path(table_path)
231+
def _describe_path(session: ydb.Session, table_path: str, settings: ydb.BaseRequestSettings) -> ydb.SchemeEntry:
232+
return session._driver.scheme_client.describe_path(table_path, settings)
225233

226234
@staticmethod
227-
def _list_directory(session: ydb.Session, abs_dir_path: str) -> ydb.Directory:
228-
return session._driver.scheme_client.list_directory(abs_dir_path)
235+
def _list_directory(session: ydb.Session, abs_dir_path: str, settings: ydb.BaseRequestSettings) -> ydb.Directory:
236+
return session._driver.scheme_client.list_directory(abs_dir_path, settings)
229237

230238
@staticmethod
231-
def _prepare(session: ydb.Session, query: str) -> ydb.DataQuery:
232-
return session.prepare(query)
239+
def _prepare(session: ydb.Session, query: str, settings: ydb.BaseRequestSettings) -> ydb.DataQuery:
240+
return session.prepare(query, settings)
233241

234242
@staticmethod
235243
def _execute_in_tx(
236-
tx_context: ydb.TxContext, prepared_query: ydb.DataQuery, parameters: Optional[Mapping[str, Any]]
244+
tx_context: ydb.TxContext,
245+
prepared_query: ydb.DataQuery,
246+
parameters: Optional[Mapping[str, Any]],
247+
settings: ydb.BaseRequestSettings,
237248
) -> ydb.convert.ResultSets:
238-
return tx_context.execute(prepared_query, parameters, commit_tx=False)
249+
return tx_context.execute(prepared_query, parameters, commit_tx=False, settings=settings)
239250

240251
@staticmethod
241252
def _execute_in_session(
242253
session: ydb.Session,
243254
tx_mode: ydb.AbstractTransactionModeBuilder,
244255
prepared_query: ydb.DataQuery,
245256
parameters: Optional[Mapping[str, Any]],
257+
settings: ydb.BaseRequestSettings,
246258
) -> ydb.convert.ResultSets:
247-
return session.transaction(tx_mode).execute(prepared_query, parameters, commit_tx=True)
259+
return session.transaction(tx_mode).execute(prepared_query, parameters, commit_tx=True, settings=settings)
248260

249261
def _execute_scan_query_in_driver(
250262
self,
251263
scan_query: ydb.ScanQuery,
252264
parameters: Optional[Mapping[str, Any]],
265+
settings: ydb.BaseRequestSettings,
253266
) -> Generator[ydb.convert.ResultSet, None, None]:
254267
chunk: ydb.ScanQueryResult
255-
for chunk in self.driver.table_client.scan_query(scan_query, parameters):
268+
for chunk in self.driver.table_client.scan_query(scan_query, parameters, settings):
256269
yield chunk.result_set
257270

258271
def _run_operation_in_tx(self, callee: collections.abc.Callable, *args, **kwargs):
@@ -325,52 +338,85 @@ def close(self):
325338
def rowcount(self):
326339
return len(self._ensure_prefetched())
327340

341+
def _get_request_settings(self) -> ydb.BaseRequestSettings:
342+
settings = self.request_settings.make_copy()
343+
344+
if self.request_settings.trace_id is None:
345+
settings = settings.with_trace_id(maybe_get_current_trace_id())
346+
347+
return settings
348+
328349

329350
class AsyncCursor(Cursor):
330351
_await = staticmethod(util.await_only)
331352

332353
@staticmethod
333-
async def _describe_table(session: ydb.aio.table.Session, abs_table_path: str) -> ydb.TableDescription:
334-
return await session.describe_table(abs_table_path)
354+
async def _describe_table(
355+
session: ydb.aio.table.Session,
356+
abs_table_path: str,
357+
settings: ydb.BaseRequestSettings,
358+
) -> ydb.TableDescription:
359+
return await session.describe_table(abs_table_path, settings)
335360

336361
@staticmethod
337-
async def _describe_path(session: ydb.aio.table.Session, abs_table_path: str) -> ydb.SchemeEntry:
338-
return await session._driver.scheme_client.describe_path(abs_table_path)
362+
async def _describe_path(
363+
session: ydb.aio.table.Session,
364+
abs_table_path: str,
365+
settings: ydb.BaseRequestSettings,
366+
) -> ydb.SchemeEntry:
367+
return await session._driver.scheme_client.describe_path(abs_table_path, settings)
339368

340369
@staticmethod
341-
async def _list_directory(session: ydb.aio.table.Session, abs_dir_path: str) -> ydb.Directory:
342-
return await session._driver.scheme_client.list_directory(abs_dir_path)
370+
async def _list_directory(
371+
session: ydb.aio.table.Session,
372+
abs_dir_path: str,
373+
settings: ydb.BaseRequestSettings,
374+
) -> ydb.Directory:
375+
return await session._driver.scheme_client.list_directory(abs_dir_path, settings)
343376

344377
@staticmethod
345-
async def _execute_scheme(session: ydb.aio.table.Session, query: str) -> ydb.convert.ResultSets:
346-
return await session.execute_scheme(query)
378+
async def _execute_scheme(
379+
session: ydb.aio.table.Session,
380+
query: str,
381+
settings: ydb.BaseRequestSettings,
382+
) -> ydb.convert.ResultSets:
383+
return await session.execute_scheme(query, settings)
347384

348385
@staticmethod
349-
async def _prepare(session: ydb.aio.table.Session, query: str) -> ydb.DataQuery:
350-
return await session.prepare(query)
386+
async def _prepare(
387+
session: ydb.aio.table.Session,
388+
query: str,
389+
settings: ydb.BaseRequestSettings,
390+
) -> ydb.DataQuery:
391+
return await session.prepare(query, settings)
351392

352393
@staticmethod
353394
async def _execute_in_tx(
354-
tx_context: ydb.aio.table.TxContext, prepared_query: ydb.DataQuery, parameters: Optional[Mapping[str, Any]]
395+
tx_context: ydb.aio.table.TxContext,
396+
prepared_query: ydb.DataQuery,
397+
parameters: Optional[Mapping[str, Any]],
398+
settings: ydb.BaseRequestSettings,
355399
) -> ydb.convert.ResultSets:
356-
return await tx_context.execute(prepared_query, parameters, commit_tx=False)
400+
return await tx_context.execute(prepared_query, parameters, commit_tx=False, settings=settings)
357401

358402
@staticmethod
359403
async def _execute_in_session(
360404
session: ydb.aio.table.Session,
361405
tx_mode: ydb.AbstractTransactionModeBuilder,
362406
prepared_query: ydb.DataQuery,
363407
parameters: Optional[Mapping[str, Any]],
408+
settings: ydb.BaseRequestSettings,
364409
) -> ydb.convert.ResultSets:
365-
return await session.transaction(tx_mode).execute(prepared_query, parameters, commit_tx=True)
410+
return await session.transaction(tx_mode).execute(prepared_query, parameters, commit_tx=True, settings=settings)
366411

367412
def _execute_scan_query_in_driver(
368413
self,
369414
scan_query: ydb.ScanQuery,
370415
parameters: Optional[Mapping[str, Any]],
416+
settings: ydb.BaseRequestSettings,
371417
) -> Generator[ydb.convert.ResultSet, None, None]:
372418
iterator: AsyncIterator[ydb.ScanQueryResult] = self._await(
373-
self.driver.table_client.scan_query(scan_query, parameters)
419+
self.driver.table_client.scan_query(scan_query, parameters, settings)
374420
)
375421
while True:
376422
try:

ydb_sqlalchemy/dbapi/tracing.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import importlib.util
2+
from typing import Optional
3+
4+
5+
def maybe_get_current_trace_id() -> Optional[str]:
6+
# Check if OpenTelemetry is available
7+
if importlib.util.find_spec("opentelemetry"):
8+
from opentelemetry import trace
9+
10+
current_span = trace.get_current_span()
11+
12+
if current_span.get_span_context().is_valid:
13+
return format(current_span.get_span_context().trace_id, "032x")
14+
15+
# Return None if OpenTelemetry is not available or trace ID is invalid
16+
return None

0 commit comments

Comments
 (0)