Skip to content

Commit

Permalink
chore(athena): reuse existing connection when setting up data fixtures (
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored Jan 2, 2025
1 parent 7f0a94b commit f5df9b1
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 50 deletions.
64 changes: 30 additions & 34 deletions ibis/backends/athena/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,50 @@
from os import environ as env
from typing import TYPE_CHECKING, Any

import pytest
import sqlglot as sg
import sqlglot.expressions as sge
from sqlglot.dialects import Athena

import ibis
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest

if TYPE_CHECKING:
from pathlib import Path

import s3fs

from ibis.backends import BaseBackend


pq = pytest.importorskip("pyarrow.parquet")


IBIS_ATHENA_S3_STAGING_DIR = env.get(
"IBIS_ATHENA_S3_STAGING_DIR", "s3://aws-athena-query-results-ibis-testing/"
"IBIS_ATHENA_S3_STAGING_DIR", "s3://aws-athena-query-results-ibis-testing"
)
AWS_REGION = env.get("AWS_REGION", "us-east-2")
AWS_PROFILE = env.get("AWS_PROFILE")
CONNECT_ARGS = dict(
s3_staging_dir=IBIS_ATHENA_S3_STAGING_DIR,
s3_staging_dir=f"{IBIS_ATHENA_S3_STAGING_DIR}/",
region_name=AWS_REGION,
profile_name=AWS_PROFILE,
)


def create_table(con, *, fs: s3fs.S3FileSystem, file: str, folder: str) -> None:
import pyarrow.parquet as pq

def create_table(con, *, fs: s3fs.S3FileSystem, file: Path, folder: str) -> None:
from ibis.formats.pyarrow import PyArrowSchema

arrow_schema = pq.read_metadata(file).schema.to_arrow_schema()
schema = PyArrowSchema.to_ibis(arrow_schema).to_sqlglot("athena")
ibis_schema = PyArrowSchema.to_ibis(arrow_schema)
sg_schema = ibis_schema.to_sqlglot(Athena)
name = file.with_suffix("").name

ddl = sge.Create(
kind="TABLE",
this=sge.Schema(this=sg.table(name), expressions=schema),
exists=True,
this=sge.Schema(this=sg.table(name), expressions=sg_schema),
properties=sge.Properties(
expressions=[
sge.ExternalProperty(),
Expand All @@ -54,51 +61,40 @@ def create_table(con, *, fs: s3fs.S3FileSystem, file: str, folder: str) -> None:

fs.put(str(file), f"{folder.removeprefix('s3://')}/{name}/{file.name}")

drop_query = sge.Drop(kind="TABLE", this=sg.table(name), exists=True).sql("athena")
create_query = ddl.sql("athena")
create_query = ddl.sql(Athena)

with con.cursor() as cur:
cur.execute(drop_query)
cur.execute(create_query)


class TestConf(BackendTest):
supports_map = False
supports_json = False
supports_structs = False

driver_supports_multiple_statements = False
deps = ("pyathena", "s3fs")

deps = ("pyathena", "fsspec")

def _load_data(self, **_: Any) -> None:
import pyathena
import s3fs
import fsspec

files = list(self.data_dir.joinpath("parquet").glob("*.parquet"))
files = self.data_dir.joinpath("parquet").glob("*.parquet")

user = getpass.getuser()
python_version = "".join(map(str, sys.version_info[:3]))
folder = f"{user}_{python_version}"

fs = s3fs.S3FileSystem()

futures = []

with (
pyathena.connect(**CONNECT_ARGS) as con,
concurrent.futures.ThreadPoolExecutor() as executor,
):
for file in files:
futures.append(
executor.submit(
create_table,
con,
fs=fs,
file=file,
folder=f"{IBIS_ATHENA_S3_STAGING_DIR}{folder}",
)
)

for future in concurrent.futures.as_completed(futures):
fs = fsspec.filesystem("s3")

con = self.connection.con
folder = f"{IBIS_ATHENA_S3_STAGING_DIR}/{folder}"

with concurrent.futures.ThreadPoolExecutor() as executor:
for future in concurrent.futures.as_completed(
executor.submit(create_table, con, fs=fs, file=file, folder=folder)
for file in files
):
future.result()

@staticmethod
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ dev = [
tests = [
"cloudpickle",
"filelock>=3.7.0,<4",
"fsspec<2024.12.1",
"fsspec[s3]<2024.12.1",
"hypothesis>=6.58.0,<7",
"packaging>=21.3,<25",
"pytest>=8.2.0,<9",
Expand All @@ -256,7 +256,6 @@ tests = [
"pytest-xdist>=2.3.0,<4",
"requests>=2,<3",
"tomli>=2.0.1,<3",
"s3fs>=2024.10.0",
]
docs = [
"altair>=5.0.1,<6",
Expand Down
4 changes: 2 additions & 2 deletions requirements-dev.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 14 additions & 12 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f5df9b1

Please sign in to comment.