From fe91c2654371fa992be47bd9e81d8d6cd1274ed1 Mon Sep 17 00:00:00 2001 From: DMITRIY PEDCHENKO Date: Mon, 24 Jun 2024 22:34:03 +0300 Subject: [PATCH] [DOP-17436] add hive type --- syncmaster/backend/api/v1/connections.py | 4 +- .../test_run_transfer/test_hve.py | 267 ------------------ 2 files changed, 2 insertions(+), 269 deletions(-) delete mode 100644 tests/test_integration/test_run_transfer/test_hve.py diff --git a/syncmaster/backend/api/v1/connections.py b/syncmaster/backend/api/v1/connections.py index 62022a97..3252ab89 100644 --- a/syncmaster/backend/api/v1/connections.py +++ b/syncmaster/backend/api/v1/connections.py @@ -17,7 +17,7 @@ ) from syncmaster.exceptions.credentials import AuthDataNotFoundError from syncmaster.exceptions.group import GroupNotFoundError -from syncmaster.schemas.v1.connection_types import ORACLE_TYPE, POSTGRES_TYPE +from syncmaster.schemas.v1.connection_types import HIVE_TYPE, ORACLE_TYPE, POSTGRES_TYPE from syncmaster.schemas.v1.connections.connection import ( ConnectionCopySchema, ConnectionPageSchema, @@ -30,7 +30,7 @@ router = APIRouter(tags=["Connections"]) -CONNECTION_TYPES = ORACLE_TYPE, POSTGRES_TYPE +CONNECTION_TYPES = ORACLE_TYPE, POSTGRES_TYPE, HIVE_TYPE @router.get("/connections") diff --git a/tests/test_integration/test_run_transfer/test_hve.py b/tests/test_integration/test_run_transfer/test_hve.py deleted file mode 100644 index 67757876..00000000 --- a/tests/test_integration/test_run_transfer/test_hve.py +++ /dev/null @@ -1,267 +0,0 @@ -import secrets - -import pytest -import pytest_asyncio -from httpx import AsyncClient -from onetl.db import DBReader -from pyspark.sql import DataFrame -from sqlalchemy.ext.asyncio import AsyncSession - -from syncmaster.db.models import Connection, Group, Queue, Status, Transfer -from tests.mocks import MockUser -from tests.test_unit.utils import create_transfer -from tests.utils import get_run_on_end - -pytestmark = [pytest.mark.asyncio, pytest.mark.worker] - - -@pytest_asyncio.fixture -async def postgres_to_hive( - session: AsyncSession, - group: Group, - queue: Queue, - hive_connection: Connection, - postgres_connection: Connection, -): - result = await create_transfer( - session=session, - group_id=group.id, - name=f"postgres2hive_{secrets.token_hex(5)}", - source_connection_id=postgres_connection.id, - target_connection_id=hive_connection.id, - source_params={ - "type": "postgres", - "table_name": "public.source_table", - }, - target_params={ - "type": "hive", - "table_name": "default.target_table", - }, - queue_id=queue.id, - ) - yield result - await session.delete(result) - await session.commit() - - -@pytest_asyncio.fixture -async def hive_to_postgres( - session: AsyncSession, - group: Group, - queue: Queue, - hive_connection: Connection, - postgres_connection: Connection, -): - result = await create_transfer( - session=session, - group_id=group.id, - name=f"hive2postgres_{secrets.token_hex(5)}", - source_connection_id=hive_connection.id, - target_connection_id=postgres_connection.id, - source_params={ - "type": "hive", - "table_name": "default.source_table", - }, - target_params={ - "type": "postgres", - "table_name": "public.target_table", - }, - queue_id=queue.id, - ) - yield result - await session.delete(result) - await session.commit() - - -async def test_run_transfer_postgres_to_hive( - client: AsyncClient, - group_owner: MockUser, - prepare_postgres, - prepare_hive, - init_df: DataFrame, - postgres_to_hive: Transfer, -): - # Arrange - _, fill_with_data = prepare_postgres - fill_with_data(init_df) - hive, _ = prepare_hive - - # Act - result = await client.post( - "v1/runs", - headers={"Authorization": f"Bearer {group_owner.token}"}, - json={"transfer_id": postgres_to_hive.id}, - ) - # Assert - assert result.status_code == 200 - - run_data = await get_run_on_end( - client=client, - run_id=result.json()["id"], - token=group_owner.token, - ) - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] - - assert run_data["status"] == Status.FINISHED.value - assert source_auth_data["user"] - assert "password" not in source_auth_data - assert target_auth_data["user"] - assert "password" not in target_auth_data - - reader = DBReader( - connection=hive, - table="default.target_table", - ) - df = reader.run() - for field in init_df.schema: - df = df.withColumn(field.name, df[field.name].cast(field.dataType)) - - assert df.sort("ID").collect() == init_df.sort("ID").collect() - - -async def test_run_transfer_postgres_to_hive_mixed_naming( - client: AsyncClient, - group_owner: MockUser, - prepare_postgres, - prepare_hive, - init_df_with_mixed_column_naming: DataFrame, - postgres_to_hive: Transfer, -): - # Arrange - _, fill_with_data = prepare_postgres - fill_with_data(init_df_with_mixed_column_naming) - hive, _ = prepare_hive - - # Act - result = await client.post( - "v1/runs", - headers={"Authorization": f"Bearer {group_owner.token}"}, - json={"transfer_id": postgres_to_hive.id}, - ) - # Assert - assert result.status_code == 200 - - run_data = await get_run_on_end( - client=client, - run_id=result.json()["id"], - token=group_owner.token, - ) - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] - - assert run_data["status"] == Status.FINISHED.value - assert source_auth_data["user"] - assert "password" not in source_auth_data - assert target_auth_data["user"] - assert "password" not in target_auth_data - reader = DBReader( - connection=hive, - table="default.target_table", - ) - df = reader.run() - - assert df.columns != init_df_with_mixed_column_naming.columns - assert df.columns == [column.lower() for column in init_df_with_mixed_column_naming.columns] - - for field in init_df_with_mixed_column_naming.schema: - df = df.withColumn(field.name, df[field.name].cast(field.dataType)) - - assert df.collect() == init_df_with_mixed_column_naming.collect() - - -async def test_run_transfer_hive_to_postgres( - client: AsyncClient, - group_owner: MockUser, - prepare_hive, - prepare_postgres, - init_df: DataFrame, - hive_to_postgres: Transfer, -): - # Arrange - _, fill_with_data = prepare_hive - fill_with_data(init_df) - postgres, _ = prepare_postgres - - # Act - result = await client.post( - "v1/runs", - headers={"Authorization": f"Bearer {group_owner.token}"}, - json={"transfer_id": hive_to_postgres.id}, - ) - # Assert - assert result.status_code == 200 - - run_data = await get_run_on_end( - client=client, - run_id=result.json()["id"], - token=group_owner.token, - ) - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] - - assert run_data["status"] == Status.FINISHED.value - assert source_auth_data["user"] - assert "password" not in source_auth_data - assert target_auth_data["user"] - assert "password" not in target_auth_data - reader = DBReader( - connection=postgres, - table="public.target_table", - ) - df = reader.run() - for field in init_df.schema: - df = df.withColumn(field.name, df[field.name].cast(field.dataType)) - - assert df.sort("ID").collect() == init_df.sort("ID").collect() - - -async def test_run_transfer_hive_to_postgres_mixes_naming( - client: AsyncClient, - group_owner: MockUser, - prepare_hive, - prepare_postgres, - init_df_with_mixed_column_naming: DataFrame, - hive_to_postgres: Transfer, -): - # Arrange - _, fill_with_data = prepare_hive - fill_with_data(init_df_with_mixed_column_naming) - postgres, _ = prepare_postgres - - # Act - result = await client.post( - "v1/runs", - headers={"Authorization": f"Bearer {group_owner.token}"}, - json={"transfer_id": hive_to_postgres.id}, - ) - # Assert - assert result.status_code == 200 - - run_data = await get_run_on_end( - client=client, - run_id=result.json()["id"], - token=group_owner.token, - ) - source_auth_data = run_data["transfer_dump"]["source_connection"]["auth_data"] - target_auth_data = run_data["transfer_dump"]["target_connection"]["auth_data"] - - assert run_data["status"] == Status.FINISHED.value - assert source_auth_data["user"] - assert "password" not in source_auth_data - assert target_auth_data["user"] - assert "password" not in target_auth_data - - reader = DBReader( - connection=postgres, - table="public.target_table", - ) - df = reader.run() - - assert df.columns != init_df_with_mixed_column_naming.columns - assert df.columns == [column.lower() for column in init_df_with_mixed_column_naming.columns] - - for field in init_df_with_mixed_column_naming.schema: - df = df.withColumn(field.name, df[field.name].cast(field.dataType)) - - assert df.collect() == init_df_with_mixed_column_naming.collect()