Skip to content

Commit

Permalink
[pipes] refactor Pipes tests interface
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Dec 28, 2024
1 parent 62ff166 commit fe9d62a
Show file tree
Hide file tree
Showing 8 changed files with 459 additions and 77 deletions.
25 changes: 19 additions & 6 deletions libraries/pipes/tests/dagster-pipes-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ See the [pipes_config.py](src/dagster_pipes_tests/pipes_config.py) class for mor

In order to run the tests, follow these steps:

1. Install `pytest` and `dagster-pipes-tests`:
1. Install `pytest` and `dagster-pipes-tests`. This can be done with [uv](https://docs.astral.sh/uv/):

```shell
uv pip install pytest
# TODO: publish the package to PyPI
uv pip install <path-to-pipes-tests>
# assuming the command is run in libraries/pipes/implementations/<language>
uv add --group dev pytest --editable ../../tests/dagster-pipes-tests
```

2. Import the test suite in your `pytest` code and configure it with the base arguments (usually containing the testing executable). The executable will be invoked with various arguments, and the test suite will assert certain side effects produced by the executable. Base arguments will be concatenated with additional arguments provided by the test suite.
> [!NOTE]
> To install `dagster-pipes-tests` in a repository other than this one, replace `--editable ../../tests/dagster-pipes-tests` with `git+https://github.com/dagster-io/communioty-integrations.git#subdirectory=libraries/pipes/tests/dagster-pipes-tests`
2. Import the test suite in your `pytest` code (for example, in `tests/test_pipes.py`) and configure it with the base arguments (usually containing the testing executable). The executable will be invoked with various arguments, and the test suite will assert certain side effects produced by the executable. Base arguments will be concatenated with additional arguments provided by the test suite.

For example, for Java:

Expand All @@ -45,8 +47,13 @@ class TestJavaPipes(PipesTestSuite):
]
```

3 [Optional]. When working with compiled languages, it's recommended to setup a `pytest` fixture that compiles the executable before running the tests. This way, the executable is only compiled once, and the tests can be run multiple times without recompiling.
> [!NOTE]
> Each test has it's own `--test-name` argument which can be used to identify the test being run.
> [!WARNING]
> This code must be placed in a file that is discovered by `pytest`, e.g. starts with `test_`.
When working with compiled languages, it's recommended to setup a `pytest` fixture that compiles the executable before running the tests. This way, the executable is only compiled once, and the tests can be run multiple times without recompiling.

For example, for Java, put the following code in `conftest.py`:

Expand All @@ -59,3 +66,9 @@ import subprocess
def built_jar():
subprocess.run(["./gradlew", "build"], check=True)
```

4. Run the tests with `pytest`:

```shell
uv run pytest
```
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
METADATA = json.loads(METADATA_PATH.read_text())


def _get_current_test_name(request):
return request.node.name.split("[")[0]


def _resolve_metadata_value(
value: Any, metadata_type: PipesMetadataType
) -> MetadataValue:
Expand Down Expand Up @@ -108,14 +112,21 @@ class PipesTestSuite:
BASE_ARGS = ["change", "me"]

@parametrize("metadata", METADATA_CASES)
def test_context_reconstruction(
def test_components(
self,
request,
metadata: Dict[str, Any],
context_injector: PipesContextInjector,
message_reader: PipesMessageReader,
tmpdir_factory,
capsys,
):
"""This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly."""
"""This test checks if the external process can access the context and the message writer correctly.
It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly.
It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly.
"""
work_dir = tmpdir_factory.mktemp("work_dir")

extras_path = work_dir / "extras.json"
Expand All @@ -128,43 +139,13 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--env",
f"--extras={str(extras_path)}",
f"--job-name={job_name}",
]

return pipes_subprocess_client.run(
context=context,
command=args,
extras=metadata,
).get_materialize_result()

result = materialize(
[my_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
raise_on_error=False,
)

assert result.success

def test_components(
self,
context_injector: PipesContextInjector,
message_reader: PipesMessageReader,
tmpdir_factory,
capsys,
):
@asset
def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
args = self.BASE_ARGS + [
"--env",
"--full",
"--extras",
str(extras_path),
"--job-name",
context.run.job_name,
"--test-name",
_get_current_test_name(request),
]

if isinstance(context_injector, PipesS3ContextInjector):
Expand Down Expand Up @@ -203,13 +184,13 @@ def my_asset(
)
def test_extras(
self,
request,
context_injector: PipesContextInjector,
metadata: Dict[str, Any],
tmpdir_factory,
capsys,
):
"""This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly."""

work_dir = tmpdir_factory.mktemp("work_dir")

metadata_path = work_dir / "metadata.json"
Expand All @@ -222,13 +203,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--extras={metadata_path}",
f"--job-name={job_name}",
"--extras",
metadata_path,
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -262,11 +241,11 @@ def my_asset(

def test_error_reporting(
self,
request,
tmpdir_factory,
capsys,
):
"""This test checks if the external process sends an exception message correctly."""

if not PIPES_CONFIG.general.error_reporting:
pytest.skip("general.error_reporting is not enabled in pipes.toml")

Expand All @@ -282,8 +261,8 @@ def my_asset(
pipes_subprocess_client: PipesSubprocessClient,
):
args = self.BASE_ARGS + [
"--full",
"--throw-error",
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -328,6 +307,7 @@ def my_asset(

def test_message_log(
self,
request,
tmpdir_factory,
capsys,
):
Expand All @@ -346,8 +326,8 @@ def my_asset(
pipes_subprocess_client: PipesSubprocessClient,
):
args = self.BASE_ARGS + [
"--full",
"--logging",
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -384,7 +364,7 @@ def my_asset(
if f"{level.lower().capitalize()} message" in line:
assert level in line
logged_levels.add(level)

assert logged_levels == expected_levels
assert (
"[pipes] did not receive any messages from external process"
Expand All @@ -394,6 +374,7 @@ def my_asset(
@parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES)
def test_message_report_custom_message(
self,
request,
custom_message_payload: Any,
tmpdir_factory,
capsys,
Expand All @@ -415,14 +396,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--custom-payload-path",
"--custom-payload",
str(custom_payload_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -455,6 +433,7 @@ def my_asset(
@parametrize("asset_key", [None, ["my_asset"]])
def test_message_report_asset_materialization(
self,
request,
data_version: Optional[str],
asset_key: Optional[List[str]],
tmpdir_factory,
Expand Down Expand Up @@ -491,14 +470,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--report-asset-materialization",
str(asset_materialization_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -547,6 +523,7 @@ def my_asset(
@parametrize("asset_key", [None, ["my_asset"]])
def test_message_report_asset_check(
self,
request,
passed: bool,
asset_key: Optional[List[str]],
severity: PipesAssetCheckSeverity,
Expand All @@ -556,9 +533,7 @@ def test_message_report_asset_check(
"""This test checks if the external process sends asset checks correctly."""

if not PIPES_CONFIG.messages.report_asset_check:
pytest.skip(
"messages.report_asset_check is not enabled in pipes.toml"
)
pytest.skip("messages.report_asset_check is not enabled in pipes.toml")

work_dir = tmpdir_factory.mktemp("work_dir")

Expand Down Expand Up @@ -588,14 +563,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
):
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--report-asset-check",
str(report_asset_check_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# conftest.py
import pytest


def pytest_runtest_call(item):
# This hook is called immediately before the test function is actually run.
pytest.skip("Intentionally skipping test after setup.")
13 changes: 13 additions & 0 deletions libraries/pipes/tests/dagster-pipes-tests/tests/data/pipes.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[general]
error_reporting = true

[message_channel]
s3 = true
databricks = true

[messages]
log = true
report_custom_message = true
report_asset_materialization = true
report_asset_check = true
log_external_stream = true
5 changes: 5 additions & 0 deletions libraries/pipes/tests/dagster-pipes-tests/tests/data/suite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dagster_pipes_tests import PipesTestSuite


class TestingPipesTestSuite(PipesTestSuite):
BASE_ARGS = ["hello", "there"]
26 changes: 26 additions & 0 deletions libraries/pipes/tests/dagster-pipes-tests/tests/test_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
import shutil
import subprocess


def test_collection(tmpdir_factory):
current_dir = os.path.dirname(__file__)
workdir = tmpdir_factory.mktemp("test_collection")

# copy current_dir/data to workdir

shutil.copytree(
os.path.join(current_dir, "data"), os.path.join(workdir), dirs_exist_ok=True
)

# rename suite.py to test_suite.py so that pytest can find it

shutil.move(
os.path.join(workdir, "suite.py"), os.path.join(workdir, "test_suite.py")
)

# simply run pytest --collect-only from the current directory
# to check that all tests are loaded without errors

os.chdir(workdir)
subprocess.run(["pytest"], check=True)
Loading

0 comments on commit fe9d62a

Please sign in to comment.