This repository has been archived by the owner on Nov 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(mysql): add schema as params to columns query (#232)
- Loading branch information
Showing
2 changed files
with
158 additions
and
88 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,82 +1,150 @@ | ||
import odd_models | ||
import pytest | ||
import sqlalchemy | ||
from odd_models import DataEntity | ||
from odd_models.models import DataEntityType | ||
from pydantic import SecretStr | ||
from testcontainers.mysql import MySqlContainer | ||
|
||
from odd_collector.adapters.mysql.adapter import Adapter | ||
from odd_collector.domain.plugin import MySQLPlugin | ||
from tests.integration.helpers import find_by_name, find_by_type | ||
|
||
create_tables = """ | ||
CREATE TABLE Persons ( | ||
PersonID int, | ||
LastName varchar(255), | ||
FirstName varchar(255), | ||
Address varchar(255), | ||
City varchar(255) | ||
);""" | ||
|
||
create_view = """ | ||
CREATE VIEW persons_names AS | ||
SELECT LastName, FirstName | ||
FROM Persons | ||
WHERE City = 'Sandnes'; | ||
""" | ||
|
||
create_view_from_view = """ | ||
CREATE VIEW persons_last_names AS | ||
SELECT LastName | ||
FROM persons_names; | ||
""" | ||
|
||
from odd_collector.adapters.mysql.adapter import Adapter | ||
from odd_collector.domain.plugin import MySQLPlugin | ||
def create_primary_schema(connection: sqlalchemy.engine.Connection): | ||
create_tables = """ | ||
CREATE TABLE Persons ( | ||
PersonID int, | ||
LastName varchar(255), | ||
FirstName varchar(255), | ||
Address varchar(255), | ||
City varchar(255) | ||
);""" | ||
|
||
create_view = """ | ||
CREATE VIEW persons_names AS | ||
SELECT LastName, FirstName | ||
FROM Persons; | ||
""" | ||
|
||
create_view_from_view = """ | ||
CREATE VIEW persons_last_names AS | ||
SELECT LastName | ||
FROM persons_names; | ||
""" | ||
|
||
connection.exec_driver_sql(create_tables) | ||
connection.exec_driver_sql(create_view) | ||
connection.exec_driver_sql(create_view_from_view) | ||
|
||
|
||
def create_other_schema(connection: sqlalchemy.engine.Connection): | ||
create_other_schema = """ | ||
CREATE DATABASE `other_schema`; | ||
""" | ||
|
||
create_tables = """ | ||
CREATE TABLE `other_schema`.`Persons` ( | ||
PersonID int, | ||
LastName varchar(255), | ||
FirstName varchar(255), | ||
Address varchar(255), | ||
City varchar(255) | ||
);""" | ||
|
||
create_view = """ | ||
CREATE VIEW `other_schema`.`persons_names` AS | ||
SELECT LastName, FirstName | ||
FROM `other_schema`.`Persons`; | ||
""" | ||
|
||
create_view_from_view = """ | ||
CREATE VIEW `other_schema`.`persons_last_names` AS | ||
SELECT LastName | ||
FROM `other_schema`.`persons_names`; | ||
""" | ||
|
||
connection.exec_driver_sql(create_other_schema) | ||
connection.exec_driver_sql(create_tables) | ||
connection.exec_driver_sql(create_view) | ||
connection.exec_driver_sql(create_view_from_view) | ||
|
||
|
||
def entities_are_unique(entities: list[odd_models.DataEntity]): | ||
return len(entities) == len({e.oddrn for e in entities}) | ||
|
||
|
||
@pytest.mark.integration | ||
def test_mysql(): | ||
with MySqlContainer() as mysql: | ||
@pytest.fixture(scope="module") | ||
def data_entities() -> odd_models.DataEntityList: | ||
with MySqlContainer(MYSQL_USER="root") as mysql: | ||
engine = sqlalchemy.create_engine(mysql.get_connection_url()) | ||
|
||
with engine.connect() as connection: | ||
connection.exec_driver_sql(create_tables) | ||
connection.exec_driver_sql(create_view) | ||
connection.exec_driver_sql(create_view_from_view) | ||
create_primary_schema(connection) | ||
create_other_schema(connection) | ||
|
||
config = MySQLPlugin( | ||
type="mysql", | ||
name="test_mysql", | ||
database="test", | ||
password=SecretStr("test"), | ||
user="test", | ||
user="root", | ||
host=mysql.get_container_host_ip(), | ||
port=mysql.get_exposed_port(3306), | ||
port=int(mysql.get_exposed_port(3306)), | ||
) | ||
|
||
data_entities = Adapter(config).get_data_entity_list() | ||
database_services: list[DataEntity] = find_by_type( | ||
data_entities, DataEntityType.DATABASE_SERVICE | ||
) | ||
assert len(database_services) == 1 | ||
database_service = database_services[0] | ||
assert len(database_service.data_entity_group.entities_list) == 3 | ||
return Adapter(config).get_data_entity_list() | ||
|
||
|
||
def test_entities_are_unique(data_entities: odd_models.DataEntityList): | ||
assert entities_are_unique(data_entities.items) | ||
|
||
|
||
def test_fetch_one_database_from_config(data_entities: odd_models.DataEntityList): | ||
databases: list[odd_models.DataEntity] = find_by_type( | ||
data_entities, odd_models.DataEntityType.DATABASE_SERVICE | ||
) | ||
assert len(databases) == 1 | ||
database = databases[0] | ||
assert database.data_entity_group is not None | ||
assert len(database.data_entity_group.entities_list) == 3 | ||
|
||
entities = database.data_entity_group.entities_list | ||
assert len(entities) == len(set(entities)) | ||
|
||
|
||
def test_fetch_only_one_table(data_entities: odd_models.DataEntityList): | ||
tables = find_by_type(data_entities, odd_models.DataEntityType.TABLE) | ||
|
||
assert entities_are_unique(tables) | ||
|
||
table = tables[0] | ||
|
||
assert len(tables) == 1 | ||
assert table.dataset is not None | ||
assert len(table.dataset.field_list) == 5 | ||
assert entities_are_unique(table.dataset.field_list) | ||
|
||
|
||
def test_fetch_two_views(data_entities: odd_models.DataEntityList): | ||
views = find_by_type(data_entities, odd_models.DataEntityType.VIEW) | ||
assert len(views) == 2 | ||
assert entities_are_unique(views) | ||
|
||
|
||
def test_view_depends_on_table(data_entities: odd_models.DataEntityList): | ||
table_entity = find_by_name(data_entities, "Persons") | ||
entity = find_by_name(data_entities, "persons_names") | ||
|
||
tables = find_by_type(data_entities, DataEntityType.TABLE) | ||
assert len(tables) == 1 | ||
table = tables[0] | ||
assert len(table.dataset.field_list) == 5 | ||
assert len(entity.dataset.field_list) == 2 | ||
assert len(entity.data_transformer.inputs) == 1 | ||
assert entity.data_transformer.inputs[0] == table_entity.oddrn | ||
|
||
views = find_by_type(data_entities, DataEntityType.VIEW) | ||
assert len(views) == 2 | ||
|
||
persons_view = find_by_name(data_entities, "persons_names") | ||
assert len(persons_view.dataset.field_list) == 2 | ||
assert len(persons_view.data_transformer.inputs) == 1 | ||
assert persons_view.data_transformer.inputs[0] == table.oddrn | ||
def test_view_depends_on_view(data_entities: odd_models.DataEntityList): | ||
view_entity = find_by_name(data_entities, "persons_names") | ||
entity = find_by_name(data_entities, "persons_last_names") | ||
assert len(entity.data_transformer.inputs) == 1 | ||
assert entity.data_transformer.inputs[0] == view_entity.oddrn | ||
|
||
last_names_view = find_by_name(data_entities, "persons_last_names") | ||
assert len(last_names_view.dataset.field_list) == 1 | ||
assert len(last_names_view.data_transformer.inputs) == 1 | ||
assert last_names_view.data_transformer.inputs[0] == persons_view.oddrn | ||
|
||
assert data_entities.json() | ||
def test_decoding_data_entities(data_entities: odd_models.DataEntityList): | ||
assert data_entities.json() |