Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/config/ydb_qa_db.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
[QA_DB]
DATABASE_PATH = /ru-central1/b1ggceeul2pkher8vhb6/etnvsjbk7kh1jc6bbfi8
DATABASE_ENDPOINT = grpcs://lb.etnvsjbk7kh1jc6bbfi8.ydb.mdb.yandexcloud.net:2135
CONNECTION_TIMEOUT = 60

[STATISTICS_DB]
DATABASE_PATH = /ru-central1/b1ggceeul2pkher8vhb6/etnvsjbk7kh1jc6bbfi8
DATABASE_ENDPOINT = grpcs://lb.etnvsjbk7kh1jc6bbfi8.ydb.mdb.yandexcloud.net:2135
STATISTICS_TABLE = analytics/query_statistics
STATISTICS_SHUTDOWN_TIMEOUT = 60

[YDBD]
YDBD_PATH = ydb/apps/ydbd/ydbd
145 changes: 53 additions & 92 deletions .github/scripts/analytics/data_mart_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,18 @@
#--query_path .github/scripts/analytics/data_mart_queries/perfomance_olap_mart.sql --table_path perfomance/olap/fast_results --store_type column --partition_keys Run_start_timestamp --primary_keys Db Suite Test Branch Run_start_timestamp --ttl_min 43200 --ttl_key Run_start_timestamp
import argparse
import ydb
import configparser
import os
import time
from ydb_wrapper import YDBWrapper

# Load configuration
# Get repository path
dir = os.path.dirname(__file__)
config = configparser.ConfigParser()
config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
config.read(config_file_path)
repo_path = os.path.abspath(f"{dir}/../../../")

DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]

def get_data_from_query_with_metadata(driver, query):
results = []
scan_query = ydb.ScanQuery(query, {})
it = driver.table_client.scan_query(scan_query)
print(f"Executing query")
start_time = time.time()
column_types = None
while True:
try:
result = next(it)
if column_types is None:
column_types = [(col.name, col.type) for col in result.result_set.columns]

results.extend(result.result_set.rows)

except StopIteration:
break

end_time = time.time()
print(f'Captured {len(results)} rows, duration: {end_time - start_time}s')
def get_data_from_query_with_metadata(ydb_wrapper, query):
"""Get data from query using ydb_wrapper and extract column metadata"""
# Use the new method that returns both data and column metadata in one call
results, column_types = ydb_wrapper.execute_scan_query_with_metadata(query)
return results, column_types

def ydb_type_to_str(ydb_type, store_type = 'ROW'):
Expand Down Expand Up @@ -66,8 +44,8 @@ def ydb_type_to_str(ydb_type, store_type = 'ROW'):
name = 'Uint8'
return result_type, name

def create_table(session, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key):
"""Create table based on the structure of the provided column types."""
def create_table(ydb_wrapper, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key):
"""Create table using ydb_wrapper based on the structure of the provided column types."""
if not column_types:
raise ValueError("No column types to create table from.")

Expand Down Expand Up @@ -100,26 +78,12 @@ def create_table(session, table_path, column_types, store_type, partition_keys,
)
"""

print(f"Creating table with query: {create_table_sql}")
session.execute_scheme(create_table_sql)
def create_table_if_not_exists(session, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key):
"""Create table if it does not already exist, based on column types."""
try:
session.describe_table(table_path)
print(f"Table '{table_path}' already exists.")
except ydb.Error:
print(f"Table '{table_path}' does not exist. Creating table...")
create_table(session, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key)

def bulk_upsert(table_client, table_path, rows, column_types,store_type='ROW'):
print(f"> Bulk upsert into: {table_path}")

column_types_map = ydb.BulkUpsertColumns()
for column_name, column_ydb_type in column_types:
column_type_obj, column_type_str = ydb_type_to_str(column_ydb_type, store_type.upper())
column_types_map.add_column(column_name, column_type_obj)

table_client.bulk_upsert(table_path, rows, column_types_map)
ydb_wrapper.create_table(table_path, create_table_sql)
def create_table_if_not_exists(ydb_wrapper, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key):
"""Create table if it does not already exist, using ydb_wrapper."""
# For now, we'll always try to create the table
# In a more sophisticated implementation, we could check if table exists first
create_table(ydb_wrapper, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key)

def parse_args():
parser = argparse.ArgumentParser(description="YDB Table Manager")
Expand All @@ -136,50 +100,47 @@ def parse_args():
def main():
args = parse_args()

if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
print("Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping")
return 1
else:
os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
"CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
]

table_path = args.table_path
batch_size = 1000

# Read SQL query from file
sql_query_path = os.path.join(repo_path, args.query_path)
print(f'Query found: {sql_query_path}')
with open(sql_query_path, 'r') as file:
sql_query = file.read()

with ydb.Driver(
endpoint=DATABASE_ENDPOINT,
database=DATABASE_PATH,
credentials=ydb.credentials_from_env_variables()
) as driver:
driver.wait(timeout=10, fail_fast=True)
with ydb.SessionPool(driver) as pool:
# Run query to get sample data and column types
results, column_types = get_data_from_query_with_metadata(driver, sql_query)
if not results:
print("No data to create table from.")
return

# Create table if not exists based on sample column types
pool.retry_operation_sync(
lambda session: create_table_if_not_exists(
session, f'{DATABASE_PATH}/{table_path}', column_types, args.store_type,
args.partition_keys, args.primary_keys, args.ttl_min, args.ttl_key
)
)

print(f'Preparing to upsert: {len(results)} rows')
for start in range(0, len(results), batch_size):
batch_rows = results[start:start + batch_size]
print(f'Upserting: {start}-{start + len(batch_rows)}/{len(results)} rows')
bulk_upsert(driver.table_client, f'{DATABASE_PATH}/{table_path}', batch_rows, column_types, args.store_type)
print('Data uploaded')
script_name = os.path.basename(__file__)

# Initialize YDB wrapper with context manager for automatic cleanup
with YDBWrapper(script_name=script_name) as ydb_wrapper:
# Check credentials
if not ydb_wrapper.check_credentials():
return 1

# Read SQL query from file
sql_query_path = os.path.join(repo_path, args.query_path)
print(f'Query found: {sql_query_path}')
with open(sql_query_path, 'r') as file:
sql_query = file.read()

# Run query to get sample data and column types
results, column_types = get_data_from_query_with_metadata(ydb_wrapper, sql_query)
if not results:
print("No data to create table from.")
return

# Create table if not exists based on sample column types
full_table_path = f"{ydb_wrapper.database_path}/{table_path}"
create_table_if_not_exists(
ydb_wrapper, full_table_path, column_types, args.store_type,
args.partition_keys, args.primary_keys, args.ttl_min, args.ttl_key
)

print(f'Preparing to upsert: {len(results)} rows')

# Подготавливаем column_types_map один раз
column_types_map = ydb.BulkUpsertColumns()
for column_name, column_ydb_type in column_types:
column_type_obj, column_type_str = ydb_type_to_str(column_ydb_type, args.store_type.upper())
column_types_map.add_column(column_name, column_type_obj)

# Используем bulk_upsert_batches для агрегированной статистики
ydb_wrapper.bulk_upsert_batches(full_table_path, results, column_types_map, batch_size)

print('Data uploaded')


if __name__ == "__main__":
Expand Down
Loading
Loading