Skip to content

Commit

Permalink
[pipes] refactor Pipes tests interface
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Dec 26, 2024
1 parent 62ff166 commit 9127a52
Showing 1 changed file with 42 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
METADATA = json.loads(METADATA_PATH.read_text())



def _get_current_test_name(request):
return request.node.name.split("[")[0]

def _resolve_metadata_value(
value: Any, metadata_type: PipesMetadataType
) -> MetadataValue:
Expand Down Expand Up @@ -107,64 +111,40 @@ class PipesTestSuite:
# to run all the tests
BASE_ARGS = ["change", "me"]

@parametrize("metadata", METADATA_CASES)
def test_context_reconstruction(
def test_components(
self,
request,
metadata: Dict[str, Any],
context_injector: PipesContextInjector,
message_reader: PipesMessageReader,
tmpdir_factory,
capsys,
):
"""This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly."""
"""This test checks if the external process can access the context and the message writer correctly.
It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly.
It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly.
"""
work_dir = tmpdir_factory.mktemp("work_dir")

extras_path = work_dir / "extras.json"

with open(str(extras_path), "w") as f:
json.dump(metadata, f)

@asset
def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--env",
f"--extras={str(extras_path)}",
f"--job-name={job_name}",
]

return pipes_subprocess_client.run(
context=context,
command=args,
extras=metadata,
).get_materialize_result()

result = materialize(
[my_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
raise_on_error=False,
)

assert result.success

def test_components(
self,
context_injector: PipesContextInjector,
message_reader: PipesMessageReader,
tmpdir_factory,
capsys,
):
@asset
def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
args = self.BASE_ARGS + [
"--env",
"--full",
"--extras",
str(extras_path),
"--job-name",
context.run.job_name,
"--test-name",
_get_current_test_name(request),
]

if isinstance(context_injector, PipesS3ContextInjector):
Expand Down Expand Up @@ -203,13 +183,13 @@ def my_asset(
)
def test_extras(
self,
request,
context_injector: PipesContextInjector,
metadata: Dict[str, Any],
tmpdir_factory,
capsys,
):
"""This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly."""

work_dir = tmpdir_factory.mktemp("work_dir")

metadata_path = work_dir / "metadata.json"
Expand All @@ -222,13 +202,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--extras={metadata_path}",
f"--job-name={job_name}",
"--extras",
metadata_path,
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -262,11 +240,11 @@ def my_asset(

def test_error_reporting(
self,
request,
tmpdir_factory,
capsys,
):
"""This test checks if the external process sends an exception message correctly."""

if not PIPES_CONFIG.general.error_reporting:
pytest.skip("general.error_reporting is not enabled in pipes.toml")

Expand All @@ -282,8 +260,8 @@ def my_asset(
pipes_subprocess_client: PipesSubprocessClient,
):
args = self.BASE_ARGS + [
"--full",
"--throw-error",
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -328,6 +306,7 @@ def my_asset(

def test_message_log(
self,
request,
tmpdir_factory,
capsys,
):
Expand All @@ -346,8 +325,8 @@ def my_asset(
pipes_subprocess_client: PipesSubprocessClient,
):
args = self.BASE_ARGS + [
"--full",
"--logging",
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -384,7 +363,7 @@ def my_asset(
if f"{level.lower().capitalize()} message" in line:
assert level in line
logged_levels.add(level)

assert logged_levels == expected_levels
assert (
"[pipes] did not receive any messages from external process"
Expand All @@ -394,6 +373,7 @@ def my_asset(
@parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES)
def test_message_report_custom_message(
self,
request,
custom_message_payload: Any,
tmpdir_factory,
capsys,
Expand All @@ -415,14 +395,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--custom-payload-path",
"--custom-payload",
str(custom_payload_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -455,6 +432,7 @@ def my_asset(
@parametrize("asset_key", [None, ["my_asset"]])
def test_message_report_asset_materialization(
self,
request,
data_version: Optional[str],
asset_key: Optional[List[str]],
tmpdir_factory,
Expand Down Expand Up @@ -491,14 +469,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--report-asset-materialization",
str(asset_materialization_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down Expand Up @@ -547,6 +522,7 @@ def my_asset(
@parametrize("asset_key", [None, ["my_asset"]])
def test_message_report_asset_check(
self,
request,
passed: bool,
asset_key: Optional[List[str]],
severity: PipesAssetCheckSeverity,
Expand All @@ -556,9 +532,7 @@ def test_message_report_asset_check(
"""This test checks if the external process sends asset checks correctly."""

if not PIPES_CONFIG.messages.report_asset_check:
pytest.skip(
"messages.report_asset_check is not enabled in pipes.toml"
)
pytest.skip("messages.report_asset_check is not enabled in pipes.toml")

work_dir = tmpdir_factory.mktemp("work_dir")

Expand Down Expand Up @@ -588,14 +562,11 @@ def my_asset(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
):
job_name = context.run.job_name

args = self.BASE_ARGS + [
"--full",
"--env",
f"--job-name={job_name}",
"--report-asset-check",
str(report_asset_check_path),
"--test-name",
_get_current_test_name(request),
]

invocation_result = pipes_subprocess_client.run(
Expand Down

0 comments on commit 9127a52

Please sign in to comment.