Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipes] refactor Pipes tests interface #76

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/libraries-pipes-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: dagster-pipes-tests

on:
pull_request:
types: [opened, synchronize, reopened, closed]
paths:
- "libraries/pipes/tests/dagster-pipes-tests/**"
- ".github/workflows/libraries-pipes-tests.yml"

defaults:
run:
working-directory: ./libraries/pipes/tests/dagster-pipes-tests

jobs:
test:
name: Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: "Install uv"
uses: astral-sh/setup-uv@v4

- name: "Run Tests"
run: uv run pytest
43 changes: 4 additions & 39 deletions libraries/pipes/implementations/rust/src/bin/pipes_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use clap::ArgAction;
use clap::Parser;
use dagster_pipes_rust::{open_dagster_pipes, DagsterPipesError};
use dagster_pipes_rust::{DAGSTER_PIPES_CONTEXT_ENV_VAR, DAGSTER_PIPES_MESSAGES_ENV_VAR};
Expand All @@ -7,56 +6,22 @@ use std::fs::File;

#[derive(Parser)]
struct Cli {
#[arg(long)]
test_name: String,
#[arg(long)]
context: Option<String>,
#[arg(long)]
messages: Option<String>,
#[arg(
long,
action = ArgAction::Set,
default_value_t = false,
default_missing_value = "false",
num_args=0..=1,
require_equals = false,
)]
env: bool,
#[arg(long = "job-name")]
#[arg(long)]
job_name: Option<String>,
#[arg(long)]
extras: Option<String>,
#[arg(
long,
action = ArgAction::Set,
default_value_t = false,
default_missing_value = "false",
num_args=0..=1,
require_equals = false,
)]
full: bool,
#[arg(long)]
custom_payload_path: Option<String>,
custom_payload: Option<String>,
#[arg(long)]
report_asset_check: Option<String>,
#[arg(long)]
report_asset_materialization: Option<String>,
#[arg(
long,
action = ArgAction::Set,
default_value_t = false,
default_missing_value = "false",
num_args=0..=1,
require_equals = false,
)]
throw_error: bool,
#[arg(
long,
action = ArgAction::Set,
default_value_t = false,
default_missing_value = "false",
num_args=0..=1,
require_equals = false,
)]
logging: bool,
#[arg(long)]
message_writer: Option<String>,
#[arg(long)]
Expand Down
25 changes: 19 additions & 6 deletions libraries/pipes/tests/dagster-pipes-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ See the [pipes_config.py](src/dagster_pipes_tests/pipes_config.py) class for mor

In order to run the tests, follow these steps:

1. Install `pytest` and `dagster-pipes-tests`:
1. Install `pytest` and `dagster-pipes-tests`. This can be done with [uv](https://docs.astral.sh/uv/):

```shell
uv pip install pytest
# TODO: publish the package to PyPI
uv pip install <path-to-pipes-tests>
# assuming the command is run in libraries/pipes/implementations/<language>
uv add --group dev pytest --editable ../../tests/dagster-pipes-tests
```

2. Import the test suite in your `pytest` code and configure it with the base arguments (usually containing the testing executable). The executable will be invoked with various arguments, and the test suite will assert certain side effects produced by the executable. Base arguments will be concatenated with additional arguments provided by the test suite.
> [!NOTE]
> To install `dagster-pipes-tests` in a repository other than this one, replace `--editable ../../tests/dagster-pipes-tests` with `git+https://github.com/dagster-io/communioty-integrations.git#subdirectory=libraries/pipes/tests/dagster-pipes-tests`

2. Import the test suite in your `pytest` code (for example, in `tests/test_pipes.py`) and configure it with the base arguments (usually containing the testing executable). The executable will be invoked with various arguments, and the test suite will assert certain side effects produced by the executable. Base arguments will be concatenated with additional arguments provided by the test suite.

For example, for Java:

Expand All @@ -45,8 +47,13 @@ class TestJavaPipes(PipesTestSuite):
]
```

3 [Optional]. When working with compiled languages, it's recommended to setup a `pytest` fixture that compiles the executable before running the tests. This way, the executable is only compiled once, and the tests can be run multiple times without recompiling.
> [!NOTE]
> Each test has it's own `--test-name` argument which can be used to identify the test being run.

> [!WARNING]
> This code must be placed in a file that is discovered by `pytest`, e.g. starts with `test_`.

When working with compiled languages, it's recommended to setup a `pytest` fixture that compiles the executable before running the tests. This way, the executable is only compiled once, and the tests can be run multiple times without recompiling.

For example, for Java, put the following code in `conftest.py`:

Expand All @@ -59,3 +66,9 @@ import subprocess
def built_jar():
subprocess.run(["./gradlew", "build"], check=True)
```

4. Run the tests with `pytest`:

```shell
uv run pytest
```
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
METADATA = json.loads(METADATA_PATH.read_text())


def _get_current_test_name(request):
return request.node.name.split("[")[0]


def _resolve_metadata_value(
value: Any, metadata_type: PipesMetadataType
) -> MetadataValue:
Expand Down Expand Up @@ -108,14 +112,21 @@ class PipesTestSuite:
BASE_ARGS = ["change", "me"]

@parametrize("metadata", METADATA_CASES)
def test_context_reconstruction(
def test_components(
self,
request,
metadata: Dict[str, Any],
context_injector: PipesContextInjector,
message_reader: PipesMessageReader,
tmpdir_factory,
capsys,
):
"""This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly."""
"""This test checks if the external process can access the context and the message writer correctly.

It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly.

It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly.
"""
work_dir = tmpdir_factory.mktemp("work_dir")

extras_path = work_dir / "extras.json"
Expand All @@ -128,43 +139,13 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--env",
f"--extras={str(extras_path)}",
f"--job-name={job_name}",
]

return pipes_subprocess_client.run(
context=context,
command=args,
extras=metadata,
).get_materialize_result()

result = materialize(
[my_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
raise_on_error=False,
)

assert result.success

def test_components(
self,
context_injector: PipesContextInjector,
message_reader: PipesMessageReader,
tmpdir_factory,
capsys,
):
@asset
def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
args = self.BASE_ARGS + [
"--env",
"--full",
"--extras",
str(extras_path),
"--job-name",
context.run.job_name,
"--test-name",
_get_current_test_name(request),
]

if isinstance(context_injector, PipesS3ContextInjector):
Expand Down Expand Up @@ -203,13 +184,13 @@ def my_asset(
)
def test_extras(
self,
request,
context_injector: PipesContextInjector,
metadata: Dict[str, Any],
tmpdir_factory,
capsys,
):
"""This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly."""

work_dir = tmpdir_factory.mktemp("work_dir")

metadata_path = work_dir / "metadata.json"
Expand All @@ -222,13 +203,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--extras={metadata_path}",
f"--job-name={job_name}",
"--extras",
metadata_path,
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -262,11 +241,11 @@ def my_asset(

def test_error_reporting(
self,
request,
tmpdir_factory,
capsys,
):
"""This test checks if the external process sends an exception message correctly."""

if not PIPES_CONFIG.general.error_reporting:
pytest.skip("general.error_reporting is not enabled in pipes.toml")

Expand All @@ -282,8 +261,8 @@ def my_asset(
pipes_subprocess_client: PipesSubprocessClient,
):
args = self.BASE_ARGS + [
"--full",
"--throw-error",
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -328,6 +307,7 @@ def my_asset(

def test_message_log(
self,
request,
tmpdir_factory,
capsys,
):
Expand All @@ -346,8 +326,8 @@ def my_asset(
pipes_subprocess_client: PipesSubprocessClient,
):
args = self.BASE_ARGS + [
"--full",
"--logging",
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -384,7 +364,7 @@ def my_asset(
if f"{level.lower().capitalize()} message" in line:
assert level in line
logged_levels.add(level)

assert logged_levels == expected_levels
assert (
"[pipes] did not receive any messages from external process"
Expand All @@ -394,6 +374,7 @@ def my_asset(
@parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES)
def test_message_report_custom_message(
self,
request,
custom_message_payload: Any,
tmpdir_factory,
capsys,
Expand All @@ -415,14 +396,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--custom-payload-path",
"--custom-payload",
str(custom_payload_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -455,6 +433,7 @@ def my_asset(
@parametrize("asset_key", [None, ["my_asset"]])
def test_message_report_asset_materialization(
self,
request,
data_version: Optional[str],
asset_key: Optional[List[str]],
tmpdir_factory,
Expand Down Expand Up @@ -491,14 +470,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--report-asset-materialization",
str(asset_materialization_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -547,6 +523,7 @@ def my_asset(
@parametrize("asset_key", [None, ["my_asset"]])
def test_message_report_asset_check(
self,
request,
passed: bool,
asset_key: Optional[List[str]],
severity: PipesAssetCheckSeverity,
Expand All @@ -556,9 +533,7 @@ def test_message_report_asset_check(
"""This test checks if the external process sends asset checks correctly."""

if not PIPES_CONFIG.messages.report_asset_check:
pytest.skip(
"messages.report_asset_check is not enabled in pipes.toml"
)
pytest.skip("messages.report_asset_check is not enabled in pipes.toml")

work_dir = tmpdir_factory.mktemp("work_dir")

Expand Down Expand Up @@ -588,14 +563,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
):
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--report-asset-check",
str(report_asset_check_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down
Empty file.
Loading
Loading