Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Tableau/fix/connections #229

Merged
merged 3 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 42 additions & 44 deletions odd_collector/adapters/tableau/adapter.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,67 @@
from typing import Dict, List, Type
from typing import Type
from urllib.parse import urlparse

from odd_collector_sdk.domain.adapter import AbstractAdapter
from odd_models.models import DataEntity, DataEntityList
from oddrn_generator import TableauGenerator
from odd_collector_sdk.domain.adapter import BaseAdapter
from odd_models.models import DataEntityList
from oddrn_generator import Generator, TableauGenerator

from odd_collector.adapters.tableau.domain.table import EmbeddedTable
from odd_collector.domain.plugin import TableauPlugin

from .client import TableauBaseClient, TableauClient
from .domain.table import EmbeddedTable, Table
from .client import TableauClient
from .logger import logger
from .mappers.sheets import map_sheet
from .mappers.tables import map_table


class Adapter(AbstractAdapter):
class Adapter(BaseAdapter):
config: TableauPlugin
generator: TableauGenerator

def __init__(
self, config: TableauPlugin, client: Type[TableauBaseClient] = None
self, config: TableauPlugin, client: Type[TableauClient] = TableauClient
) -> None:
client = client or TableauClient
super().__init__(config)
self.client = client(config)

self.__oddrn_generator = TableauGenerator(
host_settings=self.client.get_server_host(), sites=config.site
)

def get_data_source_oddrn(self) -> str:
return self.__oddrn_generator.get_data_source_oddrn()
def create_generator(self) -> Generator:
site = self.config.site or "default"
host = urlparse(self.config.server).netloc
return TableauGenerator(host_settings=host, sites=site)

def get_data_entity_list(self) -> DataEntityList:
sheets = self._get_sheets()
tables = self._get_tables()
sheets = self.client.get_sheets()
tables = self.client.get_tables()

tables_data_entities_by_id: Dict[str, DataEntity] = {
table_id: map_table(self.__oddrn_generator, table)
for table_id, table in tables.items()
embedded_tables: list[EmbeddedTable] = [
t for t in tables.values() if isinstance(t, EmbeddedTable)
]

tbl_entities = {
table.id: map_table(self.generator, table) for table in embedded_tables
}
tables_data_entities = tables_data_entities_by_id.values()

sheets_data_entities = []
for sheet in sheets:
sheet_tables = [
tables_data_entities_by_id[table_id] for table_id in sheet.tables_id
]
data_entity = map_sheet(self.__oddrn_generator, sheet, sheet_tables)
sheets_data_entities.append(data_entity)

return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[*tables_data_entities, *sheets_data_entities],
)

def _get_tables(self) -> Dict[str, Table]:
tables: List[Table] = self.client.get_tables()
tables_by_id: Dict[str, Table] = {table.id: table for table in tables}
sheet_entity = map_sheet(self.generator, sheet)

ids = tables_ids_to_load(tables)
tables_columns = self.client.get_tables_columns(ids)
for table_id in sheet.tables_id:
table = tables.get(table_id)

for table_id, columns in tables_columns.items():
tables_by_id[table_id].columns = columns
if not table:
logger.warning(f"Table {table_id} not found in tables, skipping it")
continue

return tables_by_id
if table.is_embedded:
oddrn = tbl_entities[table_id].oddrn
else:
oddrn = tables.get(table_id).get_oddrn()

def _get_sheets(self):
return self.client.get_sheets()
sheet_entity.data_consumer.inputs.append(oddrn)

sheets_data_entities.append(sheet_entity)

def tables_ids_to_load(tables: List[Table]):
return [table.id for table in tables if isinstance(table, EmbeddedTable)]
return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[*tbl_entities.values(), *sheets_data_entities],
)
127 changes: 69 additions & 58 deletions odd_collector/adapters/tableau/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Union
from urllib.parse import urlparse
from typing import Any, Union

import tableauserverclient as TSC
from funcy import lmap
from odd_collector_sdk.errors import DataSourceAuthorizationError, DataSourceError
from tableauserverclient import PersonalAccessTokenAuth, TableauAuth

from odd_collector.adapters.tableau.domain.table import Table
from odd_collector.domain.plugin import TableauPlugin

from .domain.column import Column
from .domain.database import ConnectionParams, EmbeddedDatabase, ExternalDatabase
from .domain.sheet import Sheet
from .domain.table import Table, databases_to_tables
from .logger import logger

sheets_query = """
query GetSheets($count: Int, $after: String) {
Expand Down Expand Up @@ -49,6 +47,7 @@
nodes {
id
name
isEmbedded
connectionType
downstreamOwners {
name
Expand All @@ -58,6 +57,13 @@
schema
name
description
columns {
id
name
remoteType
isNullable
description
}
}
}
pageInfo {
Expand All @@ -67,18 +73,18 @@
}
}
"""
tables_columns_query = """
query GetTablesColumns($ids: [ID], $count: Int, $after: String){
tablesConnection(filter: {idWithin: $ids}, first: $count, after: $after, orderBy: {field: NAME, direction: ASC}) {

database_servers_query = """
query DatabaseServersConnection($count: Int, $after: String) {
databaseServersConnection(first: $count, after: $after, orderBy: {field: NAME, direction: ASC}) {
nodes {
id
columns {
id
name
remoteType
isNullable
description
}
name
isEmbedded
connectionType
hostName
port
service
}
pageInfo {
hasNextPage
Expand All @@ -89,64 +95,69 @@
"""


class TableauBaseClient(ABC):
@abstractmethod
def get_server_host(self):
raise NotImplementedError

@abstractmethod
def get_sheets(self) -> List[Sheet]:
raise NotImplementedError
class TableauClient:
def __init__(self, config: TableauPlugin) -> None:
self.config = config
self.__auth = self._get_auth(config)
self.server = TSC.Server(config.server, use_server_version=True)

@abstractmethod
def get_tables(self) -> List[Table]:
raise NotImplementedError
def get_sheets(self) -> list[Sheet]:
sheets_response = self._query(query=sheets_query, root_key="sheetsConnection")

@abstractmethod
def get_tables_columns(self, tables_ids: List[str]) -> Dict[str, Table]:
raise NotImplementedError
return [Sheet.from_response(response) for response in sheets_response]

def get_databases(self) -> dict[str, Union[EmbeddedDatabase, ExternalDatabase]]:
logger.debug("Getting databases")
databases = self._query(query=databases_query, root_key="databasesConnection")

class TableauClient(TableauBaseClient):
def __init__(self, config: TableauPlugin) -> None:
self.__config = config
self.__auth = self.__get_auth(config)
self.server = TSC.Server(config.server, use_server_version=True)
connection_params = self.get_servers()

def get_server_host(self):
return urlparse(self.__config.server).netloc
result = {}
for db in databases:
if db.get("isEmbedded"):
result[db.get("id")] = EmbeddedDatabase.from_dict(**db)
else:
try:
database = ExternalDatabase(
id=db.get("id"),
name=db.get("name"),
connection_type=db.get("connectionType"),
connection_params=connection_params[db.get("id")],
tables=db.get("tables"),
)
result[database.id] = database
except Exception as e:
logger.warning(f"Couldn't get database: {db.get('name')} {e}")
continue

def get_sheets(self) -> List[Sheet]:
sheets_response = self.__query(query=sheets_query, root_key="sheetsConnection")
logger.debug(f"Got {len(result)} databases")
return result

return [Sheet.from_response(response) for response in sheets_response]
def get_tables(self) -> dict[str, Table]:
databases = self.get_databases()

def get_tables(self) -> List[Table]:
databases_response = self.__query(
query=databases_query, root_key="databasesConnection"
)
return databases_to_tables(databases_response)
return {
table.id: table
for database in databases.values()
for table in database.tables
}

def get_tables_columns(self, table_ids: List[str]) -> Dict[str, List[Column]]:
response: List = self.__query(
query=tables_columns_query,
variables={"ids": table_ids},
root_key="tablesConnection",
def get_servers(self) -> dict[str, ConnectionParams]:
servers = self._query(
query=database_servers_query, root_key="databaseServersConnection"
)

return {
table.get("id"): lmap(Column.from_response, table.get("columns"))
for table in response
server.get("id"): ConnectionParams.from_dict(**server) for server in servers
}

def __query(
def _query(
self,
query: str,
root_key: str,
variables: object = None,
) -> Any:
if variables is None:
variables = {"count": self.__config.pagination_size}
variables = {"count": self.config.pagination_size}

with self.server.auth.sign_in(self.__auth):
try:
Expand All @@ -173,18 +184,18 @@ def __query(
) from e

@staticmethod
def __get_auth(
def _get_auth(
config: TableauPlugin,
) -> Union[PersonalAccessTokenAuth, TableauAuth]:
try:
if config.token_value and config.token_name:
return TSC.PersonalAccessTokenAuth(
return PersonalAccessTokenAuth(
config.token_name,
config.token_value.get_secret_value(),
config.site,
)
else:
return TSC.TableauAuth(
return TableauAuth(
config.user,
config.password.get_secret_value(),
config.site,
Expand Down
39 changes: 18 additions & 21 deletions odd_collector/adapters/tableau/domain/column.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
class Column:
def __init__(
self,
id: str,
name: str,
is_nullable: bool,
remote_type: str = None,
description: str = None,
):
self.id = id
self.name = name
self.remote_type = remote_type
self.is_nullable = is_nullable
self.description = description or None
id: str
name: str
is_nullable: bool
remote_type: Optional[str] = None
description: Optional[str] = None

@staticmethod
def from_response(response):
return Column(
response.get("id"),
response.get("name"),
response.get("isNullable"),
response.get("remoteType"),
response.get("description"),
@classmethod
def from_dict(cls, **data) -> "Column":
return cls(
data["id"],
data["name"],
data["isNullable"],
data.get("remoteType"),
data.get("description"),
)
23 changes: 23 additions & 0 deletions odd_collector/adapters/tableau/domain/connection_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass
from typing import Optional


@dataclass
class ConnectionParams:
id: str
name: str
connection_type: str
host: str
port: int
service: Optional[str]

@classmethod
def from_dict(cls, **kwargs):
return cls(
id=kwargs["id"],
name=kwargs["name"],
connection_type=kwargs["connectionType"],
host=kwargs["hostName"],
port=kwargs["port"],
service=kwargs["service"],
)
Loading