From 77e7fb3875cda5b34f4aa1e14f62d40298f25f51 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Mon, 24 Jun 2024 13:56:48 -0700 Subject: [PATCH 01/33] Add pytest-benchmark workflow --- .github/workflows/benchmark.yml | 57 +++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 .github/workflows/benchmark.yml diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml new file mode 100644 index 000000000..35d4d7d89 --- /dev/null +++ b/.github/workflows/benchmark.yml @@ -0,0 +1,57 @@ +name: Benchmark + +on: + push: + branches: ["main"] + paths: ["tracecat/**"] + pull_request: + branches: ["main"] + paths: [".github/workflows/stress-test.yml"] + +jobs: + stress-test: + runs-on: ubuntu-latest-8-cores + timeout-minutes: 30 + strategy: + matrix: + n_workers: ["1", "2", "4", "8"] + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v3 + with: + python-version: "3.12" + cache: "pip" + cache-dependency-path: pyproject.toml + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Run environment setup script + run: bash env.sh + + - name: Start Docker services + env: + N_WORKERS: ${{ matrix.n_workers }} + run: docker compose up --scale worker=$N_WORKERS -d + + - name: Verify Tracecat API is running + run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' + + - name: pip install Tracecat + run: python -m pip install --upgrade pip && pip install ".[dev,cli]" + + - name: Verify Tracecat CLI installation + run: tracecat --help + + - name: Run tests (headless mode) + env: + TRACECAT__IMAGE_TAG: main + LOG_LEVEL: WARNING + run: | + pytest -k "test_stress_workflow" --temporal-no-restart --tracecat-no-restart + --benchmark-name=short \ + --benchmark-group-by=param \ + --benchmark-warmup=off \ + --benchmark-columns=min,max,mean,median,stddev,iterations \ From 36d2522f5657ecce84430b75515a0d61ebf11734 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Mon, 24 Jun 2024 13:58:36 -0700 Subject: [PATCH 02/33] test: Parameterize stress test --- .github/workflows/benchmark.yml | 6 +-- tests/unit/test_workflows.py | 93 ++++++++++++++++++--------------- 2 files changed, 53 insertions(+), 46 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 35d4d7d89..846c6564d 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -14,7 +14,7 @@ jobs: timeout-minutes: 30 strategy: matrix: - n_workers: ["1", "2", "4", "8"] + num_workers: ["1", "2", "4", "8"] steps: - uses: actions/checkout@v4 @@ -33,8 +33,8 @@ jobs: - name: Start Docker services env: - N_WORKERS: ${{ matrix.n_workers }} - run: docker compose up --scale worker=$N_WORKERS -d + NUM_WORKERS: ${{ matrix.num_workers }} + run: docker compose up --scale worker=$NUM_WORKERS -d - name: Verify Tracecat API is running run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py index 72d6694a1..fd82e9b2d 100644 --- a/tests/unit/test_workflows.py +++ b/tests/unit/test_workflows.py @@ -225,27 +225,24 @@ def dsl_with_expected(request: pytest.FixtureRequest) -> DSLInput: return dsl, expected -correctness_test_cases = [ - "unit_conditional_adder_tree_skips", - "unit_conditional_adder_tree_continues", - "unit_conditional_adder_tree_skip_propagates", - "unit_conditional_adder_diamond_skip_with_join_weak_dep", - "unit_transform_forwarder_loop", - "unit_transform_forwarder_loop_chained", - "unit_transform_forwarder_arrange", - "unit_transform_forwarder_arrange_loop", - "unit_transform_forwarder_zip", - "unit_transform_forwarder_map_loop", - "unit_runtime_test_adder_tree", - "unit_runtime_test_chain", -] - - @pytest.mark.parametrize( "dsl_with_expected", - correctness_test_cases, + [ + "unit_conditional_adder_tree_skips", + "unit_conditional_adder_tree_continues", + "unit_conditional_adder_tree_skip_propagates", + "unit_conditional_adder_diamond_skip_with_join_weak_dep", + "unit_transform_forwarder_loop", + "unit_transform_forwarder_loop_chained", + "unit_transform_forwarder_arrange", + "unit_transform_forwarder_arrange_loop", + "unit_transform_forwarder_zip", + "unit_transform_forwarder_map_loop", + "unit_runtime_test_adder_tree", + "unit_runtime_test_chain", + ], indirect=True, - ids=correctness_test_cases, + ids=lambda x: x, ) @pytest.mark.asyncio async def test_workflow_completes_and_correct( @@ -277,40 +274,50 @@ async def test_workflow_completes_and_correct( @pytest.mark.parametrize( "dsl", [DATA_PATH / "stress_adder_tree.yml"], + ids=lambda x: x.split("/")[-1].split(".")[0], indirect=True, ) +@pytest.mark.parametrize( + "num_workflows", [10, 100, 1000], ids=lambda x: f"num_workflows={x}" +) +@pytest.mark.slow @pytest.mark.slow @pytest.mark.asyncio -async def test_stress_workflow(dsl, temporal_cluster, mock_registry, auth_sandbox): - """Test that we can have multiple executions of the same workflow running at the same time.""" +async def test_stress_workflow( + dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark +): + """Multiple executions of the same workflow run at the same time.""" test_name = f"test_stress_workflow-{dsl.title}" client = await get_temporal_client() - tasks: list[asyncio.Task] = [] - async with ( - Worker( - client, - task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], - activities=DSLActivities.load(), - workflows=[DSLWorkflow], - workflow_runner=new_sandbox_runner(), - ), - ): - async with asyncio.TaskGroup() as tg: - # We can have multiple executions of the same workflow running at the same time - for i in range(100): - wf_exec_id = generate_test_exec_id(test_name + f"-{i}") - task = tg.create_task( - client.execute_workflow( - DSLWorkflow.run, - DSLRunArgs(dsl=dsl, role=ctx_role.get(), wf_id=TEST_WF_ID), - id=wf_exec_id, - task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], - retry_policy=RetryPolicy(maximum_attempts=1), + async def run_worklows(): + tasks: list[asyncio.Task] = [] + async with ( + Worker( + client, + task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], + activities=DSLActivities.load(), + workflows=[DSLWorkflow], + workflow_runner=new_sandbox_runner(), + ), + ): + async with asyncio.TaskGroup() as tg: + # We can have multiple executions of the same workflow running at the same time + for i in range(num_workflows): + wf_exec_id = generate_test_exec_id(test_name + f"-{i}") + task = tg.create_task( + client.execute_workflow( + DSLWorkflow.run, + DSLRunArgs(dsl=dsl, role=ctx_role.get(), wf_id=TEST_WF_ID), + id=wf_exec_id, + task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], + retry_policy=RetryPolicy(maximum_attempts=1), + ) ) - ) - tasks.append(task) + tasks.append(task) + return tasks + tasks = benchmark.pedantic(run_worklows, iterations=3, rounds=1) assert all(task.done() for task in tasks) From 2bc999fa88087d626b087289163b9560dd88bc4a Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Mon, 24 Jun 2024 14:31:15 -0700 Subject: [PATCH 03/33] fix paths Signed-off-by: Chris Lo <46541035+topher-lo@users.noreply.github.com> --- .github/workflows/benchmark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 846c6564d..4d5ddf2c0 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -6,7 +6,7 @@ on: paths: ["tracecat/**"] pull_request: branches: ["main"] - paths: [".github/workflows/stress-test.yml"] + paths: [".github/workflows/benchmark.yml"] jobs: stress-test: From 48a88586ac3fd7db0e424b7c7475a920d9eb6d5c Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Mon, 24 Jun 2024 14:34:10 -0700 Subject: [PATCH 04/33] Ignore frontend in benchmark runs --- .github/workflows/benchmark.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 4d5ddf2c0..b2dc6e1c1 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -34,7 +34,9 @@ jobs: - name: Start Docker services env: NUM_WORKERS: ${{ matrix.num_workers }} - run: docker compose up --scale worker=$NUM_WORKERS -d + run: | + docker compose up --scale worker=$NUM_WORKERS \ + --no-deps api worker postgres_db temporal_elasticsearch temporal_ui temporal -d - name: Verify Tracecat API is running run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' From 9455bfa3be19c65be4c63a8456ea8900708c0aad Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 16:27:38 -0400 Subject: [PATCH 05/33] fix: posix path stem getter --- tests/unit/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py index fd82e9b2d..29a06c60a 100644 --- a/tests/unit/test_workflows.py +++ b/tests/unit/test_workflows.py @@ -274,7 +274,7 @@ async def test_workflow_completes_and_correct( @pytest.mark.parametrize( "dsl", [DATA_PATH / "stress_adder_tree.yml"], - ids=lambda x: x.split("/")[-1].split(".")[0], + ids=lambda path: path.stem, indirect=True, ) @pytest.mark.parametrize( From 74be767662245025283631e8131e7b07e8f83fef Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 16:43:19 -0400 Subject: [PATCH 06/33] fix: drop elastic and temporal ui services --- .github/workflows/benchmark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index b2dc6e1c1..89e548aae 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -36,7 +36,7 @@ jobs: NUM_WORKERS: ${{ matrix.num_workers }} run: | docker compose up --scale worker=$NUM_WORKERS \ - --no-deps api worker postgres_db temporal_elasticsearch temporal_ui temporal -d + --no-deps api worker postgres_db temporal -d - name: Verify Tracecat API is running run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' From 0cbe479d4685efde10553420b85870f3d80b30af Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 17:10:42 -0400 Subject: [PATCH 07/33] build: Missing pytest-benchmark --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index edc88b5f7..31c97a100 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,7 @@ Repository = "https://github.com/TracecatHQ/tracecat" dev = [ "respx", "pytest", + "pytest-benchmark", "python-dotenv", "pytest-asyncio", "pytest-mock==3.14.0", From 02b3419b734934247d57a18814684ad04c76f2ba Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 17:38:23 -0400 Subject: [PATCH 08/33] Disable benchmarks in test --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8674afb11..f5f90224a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -202,4 +202,4 @@ jobs: URLSCAN_API_KEY: ${{ secrets.INTEGRATION__URLSCAN_API_KEY }} VT_API_KEY: ${{ secrets.INTEGRATION__VT_API_KEY }} LOG_LEVEL: WARNING - run: pytest -k "test_playbooks" --temporal-no-restart --tracecat-no-restart + run: pytest -k "test_playbooks" --benchmark-disable --temporal-no-restart --tracecat-no-restart From 9cf3239e804c2d67f3fa1db58bddc98df7f98c25 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 17:41:56 -0400 Subject: [PATCH 09/33] asyncio run --- .github/workflows/benchmark.yml | 3 +-- tests/unit/test_workflows.py | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 89e548aae..13faa3bcd 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -35,8 +35,7 @@ jobs: env: NUM_WORKERS: ${{ matrix.num_workers }} run: | - docker compose up --scale worker=$NUM_WORKERS \ - --no-deps api worker postgres_db temporal -d + docker compose up --scale worker=$NUM_WORKERS --no-deps api worker postgres_db temporal -d - name: Verify Tracecat API is running run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py index 29a06c60a..4b0808125 100644 --- a/tests/unit/test_workflows.py +++ b/tests/unit/test_workflows.py @@ -281,7 +281,6 @@ async def test_workflow_completes_and_correct( "num_workflows", [10, 100, 1000], ids=lambda x: f"num_workflows={x}" ) @pytest.mark.slow -@pytest.mark.slow @pytest.mark.asyncio async def test_stress_workflow( dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark @@ -317,7 +316,9 @@ async def run_worklows(): tasks.append(task) return tasks - tasks = benchmark.pedantic(run_worklows, iterations=3, rounds=1) + tasks = benchmark.pedantic( + lambda: asyncio.run(run_worklows), iterations=3, rounds=1 + ) assert all(task.done() for task in tasks) From 9ea1221e645e084aebfce6be9fc2b8e32ad7129a Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 17:55:37 -0400 Subject: [PATCH 10/33] Missing image tag in benchmark --- .github/workflows/benchmark.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 13faa3bcd..f30983609 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -31,6 +31,15 @@ jobs: - name: Run environment setup script run: bash env.sh + - name: Get image tag + id: set-image-tag + run: | + if [ "${{ github.event_name }}" == "push" ] && [ "${{ github.ref }}" == "refs/heads/main" ]; then + echo "TRACECAT__IMAGE_TAG=latest" >> $GITHUB_ENV + else + echo "TRACECAT__IMAGE_TAG=pr-${{ github.event.pull_request.number }}" >> $GITHUB_ENV + fi + - name: Start Docker services env: NUM_WORKERS: ${{ matrix.num_workers }} From 9eaf36c4bb2be37529b729f352e6e06d57872467 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 18:01:38 -0400 Subject: [PATCH 11/33] Use asyncio run --- tests/unit/test_workflows.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py index 4b0808125..5099c07f7 100644 --- a/tests/unit/test_workflows.py +++ b/tests/unit/test_workflows.py @@ -281,16 +281,15 @@ async def test_workflow_completes_and_correct( "num_workflows", [10, 100, 1000], ids=lambda x: f"num_workflows={x}" ) @pytest.mark.slow -@pytest.mark.asyncio -async def test_stress_workflow( +def test_stress_workflow( dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark ): """Multiple executions of the same workflow run at the same time.""" test_name = f"test_stress_workflow-{dsl.title}" - client = await get_temporal_client() async def run_worklows(): tasks: list[asyncio.Task] = [] + client = await get_temporal_client() async with ( Worker( client, @@ -317,7 +316,7 @@ async def run_worklows(): return tasks tasks = benchmark.pedantic( - lambda: asyncio.run(run_worklows), iterations=3, rounds=1 + lambda: asyncio.run(run_worklows()), iterations=3, rounds=1 ) assert all(task.done() for task in tasks) From 2748b878cbfb4ce23192e6a8771e036eb251d509 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 18:06:34 -0400 Subject: [PATCH 12/33] test: Remove --scale worker --- .github/workflows/benchmark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index f30983609..bad7106fd 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -44,7 +44,7 @@ jobs: env: NUM_WORKERS: ${{ matrix.num_workers }} run: | - docker compose up --scale worker=$NUM_WORKERS --no-deps api worker postgres_db temporal -d + docker compose up --no-deps api worker postgres_db temporal -d - name: Verify Tracecat API is running run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' From af279b14009107392b590a3258cffce24b259c04 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 18:48:37 -0400 Subject: [PATCH 13/33] fix: Use main image on push to main --- .github/workflows/benchmark.yml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index bad7106fd..497a1fbe9 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -35,7 +35,7 @@ jobs: id: set-image-tag run: | if [ "${{ github.event_name }}" == "push" ] && [ "${{ github.ref }}" == "refs/heads/main" ]; then - echo "TRACECAT__IMAGE_TAG=latest" >> $GITHUB_ENV + echo "TRACECAT__IMAGE_TAG=main" >> $GITHUB_ENV else echo "TRACECAT__IMAGE_TAG=pr-${{ github.event.pull_request.number }}" >> $GITHUB_ENV fi @@ -49,12 +49,6 @@ jobs: - name: Verify Tracecat API is running run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' - - name: pip install Tracecat - run: python -m pip install --upgrade pip && pip install ".[dev,cli]" - - - name: Verify Tracecat CLI installation - run: tracecat --help - - name: Run tests (headless mode) env: TRACECAT__IMAGE_TAG: main From 772ed52d6075b5dd2b2042c7bb2fb124865478e1 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 18:49:43 -0400 Subject: [PATCH 14/33] ci: Remove build image on main Redundant as image is built in test --- .github/workflows/build-push-images.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/build-push-images.yml b/.github/workflows/build-push-images.yml index 3448fc406..ac846b574 100644 --- a/.github/workflows/build-push-images.yml +++ b/.github/workflows/build-push-images.yml @@ -2,8 +2,6 @@ name: Publish Images on: push: - branches: - - "main" tags: - "*.*.*" From c87c69a0caf69ed079a9913a0d773c727a0fbac1 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 18:53:03 -0400 Subject: [PATCH 15/33] ci: Update paths --- .github/workflows/benchmark.yml | 8 +++++++- .github/workflows/test.yml | 22 ++++++++++++++-------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 497a1fbe9..47715ed16 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -3,7 +3,13 @@ name: Benchmark on: push: branches: ["main"] - paths: ["tracecat/**"] + paths: + - "docker-compose.yml" + - "Dockerfile" + - "pyproject.toml" + - "tests/**" + - "tracecat-cli/**" + - "tracecat/**" pull_request: branches: ["main"] paths: [".github/workflows/benchmark.yml"] diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f5f90224a..85d65ed9e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,16 +3,22 @@ name: Tests on: push: branches: ["main"] - paths-ignore: - - "frontend/**" - - "docs/**" - - "README.md" + paths: + - "docker-compose.yml" + - "Dockerfile" + - "pyproject.toml" + - "tests/**" + - "tracecat-cli/**" + - "tracecat/**" pull_request: branches: ["main"] - paths-ignore: - - "frontend/**" - - "docs/**" - - "README.md" + paths: + - "docker-compose.yml" + - "Dockerfile" + - "pyproject.toml" + - "tests/**" + - "tracecat-cli/**" + - "tracecat/**" permissions: contents: read From a6afffa456a28471262fe370657f2b2694abe7d2 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Tue, 2 Jul 2024 19:09:40 -0400 Subject: [PATCH 16/33] missing pip install tracecat --- .github/workflows/benchmark.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 47715ed16..6a35ac1d8 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -55,6 +55,9 @@ jobs: - name: Verify Tracecat API is running run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' + - name: pip install Tracecat + run: python -m pip install --upgrade pip && pip install ".[dev,cli]" + - name: Run tests (headless mode) env: TRACECAT__IMAGE_TAG: main From 4b956b7bf7e5033f9e0fcb6bf1d38c8f1dce4d59 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 14:03:16 -0400 Subject: [PATCH 17/33] fix: Outdated cli command --- .github/workflows/benchmark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 6a35ac1d8..0badbb0ad 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -56,7 +56,7 @@ jobs: run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' - name: pip install Tracecat - run: python -m pip install --upgrade pip && pip install ".[dev,cli]" + run: python -m pip install --upgrade pip && pip install ".[dev]" && pip install ./cli - name: Run tests (headless mode) env: From 14ead74875c8996cc22253132907055b65eba26d Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 14:31:07 -0400 Subject: [PATCH 18/33] fix: Change temporal cluster url in pytests to localhost --- .github/workflows/benchmark.yml | 4 +++- .github/workflows/test.yml | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 0badbb0ad..a26dff6e2 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -60,7 +60,9 @@ jobs: - name: Run tests (headless mode) env: - TRACECAT__IMAGE_TAG: main + # To call temporal from outside docker service (otherwise defaults to http://temporal:7233) + TEMPORAL__CLUSTER_URL: http://localhost:7233 + TRACECAT__IMAGE_TAG: ${{ env.TRACECAT__IMAGE_TAG }} LOG_LEVEL: WARNING run: | pytest -k "test_stress_workflow" --temporal-no-restart --tracecat-no-restart diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 108da48ff..442568574 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -192,6 +192,8 @@ jobs: - name: Run playbooks env: + # To call temporal from outside docker service (otherwise defaults to http://temporal:7233) + TEMPORAL__CLUSTER_URL: http://localhost:7233 TRACECAT__IMAGE_TAG: ${{ env.TRACECAT__IMAGE_TAG }} ABUSECH_API_KEY: ${{ secrets.INTEGRATION__ABUSECH_API_KEY }} ABUSEIPDB_API_KEY: ${{ secrets.INTEGRATION__ABUSEIPDB_API_KEY }} From 74fc8179c6f0574cf294596b448fe76c247548b5 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 14:36:08 -0400 Subject: [PATCH 19/33] feat: Add temporal health endpoint to tracecat api --- tracecat/api/app.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tracecat/api/app.py b/tracecat/api/app.py index 0728e3468..d251e193e 100644 --- a/tracecat/api/app.py +++ b/tracecat/api/app.py @@ -56,6 +56,7 @@ WorkflowRun, ) from tracecat.dsl import dispatcher, schedules +from tracecat.dsl.client import get_temporal_client from tracecat.dsl.common import DSLInput # TODO: Clean up API params / response "zoo" @@ -225,12 +226,24 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE @app.get("/", include_in_schema=False) def root() -> dict[str, str]: - return {"message": "Hello world. I am the API."} + return {"message": "ok"} @app.get("/health") def check_health() -> dict[str, str]: - return {"message": "Hello world. I am the API. This is the health endpoint."} + return {"message": "ok"} + + +@app.get("/temporal-health") +async def check_temporal_health() -> dict[str, str]: + try: + await get_temporal_client() + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Error connecting to Temporal", + ) from e + return {"message": "ok"} # ----- Trigger handlers ----- # From ff074b93ae3a6871fc631d74a29cd6b1b39518d7 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 14:37:06 -0400 Subject: [PATCH 20/33] verify TC-temporal connection --- .github/workflows/benchmark.yml | 3 +++ .github/workflows/test.yml | 3 +++ 2 files changed, 6 insertions(+) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index a26dff6e2..b44c47374 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -55,6 +55,9 @@ jobs: - name: Verify Tracecat API is running run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' + - name: Verify Tracecat-Temporal connection + run: curl -s http://localhost:8000/temporal-health | jq -e '.status == "ok"' + - name: pip install Tracecat run: python -m pip install --upgrade pip && pip install ".[dev]" && pip install ./cli diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 442568574..60e57658f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -109,6 +109,9 @@ jobs: - name: Verify Tracecat API is running run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' + - name: Verify Tracecat-Temporal connection + run: curl -s http://localhost:8000/temporal-health | jq -e '.status == "ok"' + - name: pip install Tracecat run: python -m pip install --upgrade pip && pip install ".[dev]" && pip install ./cli From ff66a7892ea9c3473c130e5d96d0eb8fc2644eb0 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 14:51:35 -0400 Subject: [PATCH 21/33] revert: Don't need temporal url in playbooks test --- .github/workflows/test.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 60e57658f..16507620e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -107,10 +107,10 @@ jobs: run: docker compose up --no-deps api worker postgres_db -d - name: Verify Tracecat API is running - run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' + run: curl http://localhost:8000/health - name: Verify Tracecat-Temporal connection - run: curl -s http://localhost:8000/temporal-health | jq -e '.status == "ok"' + run: curl http://localhost:8000/temporal-health - name: pip install Tracecat run: python -m pip install --upgrade pip && pip install ".[dev]" && pip install ./cli @@ -195,8 +195,6 @@ jobs: - name: Run playbooks env: - # To call temporal from outside docker service (otherwise defaults to http://temporal:7233) - TEMPORAL__CLUSTER_URL: http://localhost:7233 TRACECAT__IMAGE_TAG: ${{ env.TRACECAT__IMAGE_TAG }} ABUSECH_API_KEY: ${{ secrets.INTEGRATION__ABUSECH_API_KEY }} ABUSEIPDB_API_KEY: ${{ secrets.INTEGRATION__ABUSEIPDB_API_KEY }} From 1a6afa5bc559217cd9ef6631f21265e72f53e762 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:13:14 -0400 Subject: [PATCH 22/33] Move stress tests into integration folder --- .github/workflows/benchmark.yml | 9 +++-- tests/integration/test_stress.py | 67 ++++++++++++++++++++++++++++++++ tests/unit/test_workflows.py | 50 ------------------------ 3 files changed, 73 insertions(+), 53 deletions(-) create mode 100644 tests/integration/test_stress.py diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index b44c47374..656d163cc 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -53,10 +53,13 @@ jobs: docker compose up --no-deps api worker postgres_db temporal -d - name: Verify Tracecat API is running - run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' + run: curl -s http://localhost:8000/health - name: Verify Tracecat-Temporal connection - run: curl -s http://localhost:8000/temporal-health | jq -e '.status == "ok"' + run: curl -s http://localhost:8000/temporal-health + + - name: Verify Temporal CLI commands + run: temporal --version - name: pip install Tracecat run: python -m pip install --upgrade pip && pip install ".[dev]" && pip install ./cli @@ -68,7 +71,7 @@ jobs: TRACECAT__IMAGE_TAG: ${{ env.TRACECAT__IMAGE_TAG }} LOG_LEVEL: WARNING run: | - pytest -k "test_stress_workflow" --temporal-no-restart --tracecat-no-restart + pytest -k "test_stress" --temporal-no-restart --tracecat-no-restart --benchmark-name=short \ --benchmark-group-by=param \ --benchmark-warmup=off \ diff --git a/tests/integration/test_stress.py b/tests/integration/test_stress.py new file mode 100644 index 000000000..20f225639 --- /dev/null +++ b/tests/integration/test_stress.py @@ -0,0 +1,67 @@ +import asyncio +import os +from pathlib import Path + +import pytest +from temporalio.common import RetryPolicy +from temporalio.worker import Worker + +from tracecat.contexts import ctx_role +from tracecat.dsl.client import get_temporal_client +from tracecat.dsl.worker import new_sandbox_runner +from tracecat.dsl.workflow import DSLActivities, DSLRunArgs, DSLWorkflow + +from ..unit.test_workflows import generate_test_exec_id + +DATA_PATH = Path(__file__).parent.parent.joinpath("data/workflows") +TEST_WF_ID = "wf-00000000000000000000000000000000" + + +@pytest.mark.parametrize( + "dsl", + [DATA_PATH / "stress_adder_tree.yml"], + ids=lambda path: path.stem, + indirect=True, +) +@pytest.mark.parametrize( + "num_workflows", [10, 100, 1000], ids=lambda x: f"num_workflows={x}" +) +@pytest.mark.slow +def test_concurrent_workflows( + dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark +): + """Multiple executions of the same workflow run at the same time.""" + test_name = f"test_stress_workflow-{dsl.title}" + + async def run_worklows(): + tasks: list[asyncio.Task] = [] + client = await get_temporal_client() + async with ( + Worker( + client, + task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], + activities=DSLActivities.load(), + workflows=[DSLWorkflow], + workflow_runner=new_sandbox_runner(), + ), + ): + async with asyncio.TaskGroup() as tg: + # We can have multiple executions of the same workflow running at the same time + for i in range(num_workflows): + wf_exec_id = generate_test_exec_id(test_name + f"-{i}") + task = tg.create_task( + client.execute_workflow( + DSLWorkflow.run, + DSLRunArgs(dsl=dsl, role=ctx_role.get(), wf_id=TEST_WF_ID), + id=wf_exec_id, + task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], + retry_policy=RetryPolicy(maximum_attempts=1), + ) + ) + tasks.append(task) + return tasks + + tasks = benchmark.pedantic( + lambda: asyncio.run(run_worklows()), iterations=3, rounds=1 + ) + assert all(task.done() for task in tasks) diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py index 5099c07f7..055ba66ff 100644 --- a/tests/unit/test_workflows.py +++ b/tests/unit/test_workflows.py @@ -271,56 +271,6 @@ async def test_workflow_completes_and_correct( assert result == expected -@pytest.mark.parametrize( - "dsl", - [DATA_PATH / "stress_adder_tree.yml"], - ids=lambda path: path.stem, - indirect=True, -) -@pytest.mark.parametrize( - "num_workflows", [10, 100, 1000], ids=lambda x: f"num_workflows={x}" -) -@pytest.mark.slow -def test_stress_workflow( - dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark -): - """Multiple executions of the same workflow run at the same time.""" - test_name = f"test_stress_workflow-{dsl.title}" - - async def run_worklows(): - tasks: list[asyncio.Task] = [] - client = await get_temporal_client() - async with ( - Worker( - client, - task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], - activities=DSLActivities.load(), - workflows=[DSLWorkflow], - workflow_runner=new_sandbox_runner(), - ), - ): - async with asyncio.TaskGroup() as tg: - # We can have multiple executions of the same workflow running at the same time - for i in range(num_workflows): - wf_exec_id = generate_test_exec_id(test_name + f"-{i}") - task = tg.create_task( - client.execute_workflow( - DSLWorkflow.run, - DSLRunArgs(dsl=dsl, role=ctx_role.get(), wf_id=TEST_WF_ID), - id=wf_exec_id, - task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], - retry_policy=RetryPolicy(maximum_attempts=1), - ) - ) - tasks.append(task) - return tasks - - tasks = benchmark.pedantic( - lambda: asyncio.run(run_worklows()), iterations=3, rounds=1 - ) - assert all(task.done() for task in tasks) - - @pytest.mark.parametrize( "dsl", [DATA_PATH / "unit_conditional_adder_diamond_skip_with_join_strong_dep_fails.yml"], From ad4d4d3844040330bf802442a4936f5a3fa3f7ad Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:19:02 -0400 Subject: [PATCH 23/33] fix: Stress test via calling tracecat API --- .github/workflows/benchmark.yml | 2 -- tests/integration/test_stress.py | 61 ++++++++++---------------------- 2 files changed, 18 insertions(+), 45 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 656d163cc..99cc6684b 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -66,8 +66,6 @@ jobs: - name: Run tests (headless mode) env: - # To call temporal from outside docker service (otherwise defaults to http://temporal:7233) - TEMPORAL__CLUSTER_URL: http://localhost:7233 TRACECAT__IMAGE_TAG: ${{ env.TRACECAT__IMAGE_TAG }} LOG_LEVEL: WARNING run: | diff --git a/tests/integration/test_stress.py b/tests/integration/test_stress.py index 20f225639..e1ceb08b7 100644 --- a/tests/integration/test_stress.py +++ b/tests/integration/test_stress.py @@ -1,24 +1,18 @@ import asyncio -import os +import uuid from pathlib import Path import pytest -from temporalio.common import RetryPolicy -from temporalio.worker import Worker -from tracecat.contexts import ctx_role -from tracecat.dsl.client import get_temporal_client -from tracecat.dsl.worker import new_sandbox_runner -from tracecat.dsl.workflow import DSLActivities, DSLRunArgs, DSLWorkflow - -from ..unit.test_workflows import generate_test_exec_id +from tracecat.dsl.common import DSLInput +from tracecat.dsl.dispatcher import dispatch_workflow DATA_PATH = Path(__file__).parent.parent.joinpath("data/workflows") TEST_WF_ID = "wf-00000000000000000000000000000000" @pytest.mark.parametrize( - "dsl", + "path_to_dsl", [DATA_PATH / "stress_adder_tree.yml"], ids=lambda path: path.stem, indirect=True, @@ -28,40 +22,21 @@ ) @pytest.mark.slow def test_concurrent_workflows( - dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark + path_to_dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark ): """Multiple executions of the same workflow run at the same time.""" - test_name = f"test_stress_workflow-{dsl.title}" - - async def run_worklows(): - tasks: list[asyncio.Task] = [] - client = await get_temporal_client() - async with ( - Worker( - client, - task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], - activities=DSLActivities.load(), - workflows=[DSLWorkflow], - workflow_runner=new_sandbox_runner(), - ), - ): - async with asyncio.TaskGroup() as tg: - # We can have multiple executions of the same workflow running at the same time - for i in range(num_workflows): - wf_exec_id = generate_test_exec_id(test_name + f"-{i}") - task = tg.create_task( - client.execute_workflow( - DSLWorkflow.run, - DSLRunArgs(dsl=dsl, role=ctx_role.get(), wf_id=TEST_WF_ID), - id=wf_exec_id, - task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], - retry_policy=RetryPolicy(maximum_attempts=1), - ) - ) - tasks.append(task) - return tasks - tasks = benchmark.pedantic( - lambda: asyncio.run(run_worklows()), iterations=3, rounds=1 + def generate_wf_id(): + return f"wf-{uuid.uuid4()}" + + tasks = [ + dispatch_workflow( + dsl=DSLInput.from_yaml(path_to_dsl), + wf_id=generate_wf_id(), + ) + for _ in range(num_workflows) + ] + workflow_runs = benchmark.pedantic( + lambda: asyncio.gather(tasks()), iterations=3, rounds=1 ) - assert all(task.done() for task in tasks) + assert all(run.done() for run in workflow_runs) From 87256364cd4eb725fcba08db4dc80af0ea11df42 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:22:51 -0400 Subject: [PATCH 24/33] Remove health checks --- .github/workflows/benchmark.yml | 9 --------- .github/workflows/test.yml | 9 --------- 2 files changed, 18 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 99cc6684b..b1781e011 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -52,15 +52,6 @@ jobs: run: | docker compose up --no-deps api worker postgres_db temporal -d - - name: Verify Tracecat API is running - run: curl -s http://localhost:8000/health - - - name: Verify Tracecat-Temporal connection - run: curl -s http://localhost:8000/temporal-health - - - name: Verify Temporal CLI commands - run: temporal --version - - name: pip install Tracecat run: python -m pip install --upgrade pip && pip install ".[dev]" && pip install ./cli diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 16507620e..ca7cf0af9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -106,12 +106,6 @@ jobs: - name: Start Docker services run: docker compose up --no-deps api worker postgres_db -d - - name: Verify Tracecat API is running - run: curl http://localhost:8000/health - - - name: Verify Tracecat-Temporal connection - run: curl http://localhost:8000/temporal-health - - name: pip install Tracecat run: python -m pip install --upgrade pip && pip install ".[dev]" && pip install ./cli @@ -163,9 +157,6 @@ jobs: - name: Start Docker services run: docker compose up --no-deps api worker postgres_db temporal -d - - name: Verify Tracecat API is running - run: curl -s http://localhost:8000/health | jq -e '.status == "ok"' - - name: pip install Tracecat run: python -m pip install --upgrade pip && pip install ".[dev]" && pip install ./cli From 101a00790659e87c374d0feaca7ac45beb80763f Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:29:47 -0400 Subject: [PATCH 25/33] fix: dsl --- tests/integration/test_stress.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_stress.py b/tests/integration/test_stress.py index e1ceb08b7..0f58c8390 100644 --- a/tests/integration/test_stress.py +++ b/tests/integration/test_stress.py @@ -4,7 +4,6 @@ import pytest -from tracecat.dsl.common import DSLInput from tracecat.dsl.dispatcher import dispatch_workflow DATA_PATH = Path(__file__).parent.parent.joinpath("data/workflows") @@ -12,7 +11,7 @@ @pytest.mark.parametrize( - "path_to_dsl", + "dsl", [DATA_PATH / "stress_adder_tree.yml"], ids=lambda path: path.stem, indirect=True, @@ -22,7 +21,7 @@ ) @pytest.mark.slow def test_concurrent_workflows( - path_to_dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark + dsl, num_workflows, temporal_cluster, mock_registry, auth_sandbox, benchmark ): """Multiple executions of the same workflow run at the same time.""" @@ -30,11 +29,7 @@ def generate_wf_id(): return f"wf-{uuid.uuid4()}" tasks = [ - dispatch_workflow( - dsl=DSLInput.from_yaml(path_to_dsl), - wf_id=generate_wf_id(), - ) - for _ in range(num_workflows) + dispatch_workflow(dsl=dsl, wf_id=generate_wf_id()) for _ in range(num_workflows) ] workflow_runs = benchmark.pedantic( lambda: asyncio.gather(tasks()), iterations=3, rounds=1 From 0e6e000d6f9fc78351c1060a2c952cc3d66d0278 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:41:30 -0400 Subject: [PATCH 26/33] Move dsl fixture into conftest --- tests/conftest.py | 10 ++++++++++ tests/unit/test_workflows.py | 33 +++++---------------------------- 2 files changed, 15 insertions(+), 28 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 542c60c9e..f8a139d09 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,15 @@ import os import subprocess import time +from pathlib import Path from uuid import uuid4 import pytest from cryptography.fernet import Fernet from loguru import logger +from tracecat.dsl.common import DSLInput + def pytest_addoption(parser: pytest.Parser): parser.addoption( @@ -171,3 +174,10 @@ def tracecat_worker(env_sandbox): ["docker", "compose", "down", "--remove-orphans", "worker"], check=True ) logger.info("Stopped Tracecat Temporal worker") + + +@pytest.fixture +def dsl(request: pytest.FixtureRequest) -> DSLInput: + path: list[Path] = request.param + dsl = DSLInput.from_yaml(path) + return dsl diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py index 055ba66ff..246b4a128 100644 --- a/tests/unit/test_workflows.py +++ b/tests/unit/test_workflows.py @@ -11,7 +11,6 @@ import asyncio import os from pathlib import Path -from typing import Any import pytest import yaml @@ -94,24 +93,6 @@ async def passthrough(num: int) -> int: counter_gen = counter() # Reset the counter generator -# Fixture to load workflow DSLs from YAML files -@pytest.fixture -def dsl(request: pytest.FixtureRequest) -> DSLInput: - path: list[Path] = request.param - dsl = DSLInput.from_yaml(path) - return dsl - - -# Fixture to load yaml files from name -@pytest.fixture -def expected(request: pytest.FixtureRequest) -> dict[str, Any]: - path: Path = request.param - with path.open() as f: - yaml_data = f.read() - data = yaml.safe_load(yaml_data) - return {key: (value or {}) for key, value in data.items()} - - @pytest.mark.parametrize("dsl", SHARED_TEST_DEFNS, indirect=True) @pytest.mark.asyncio async def test_workflow_can_run_from_yaml( @@ -207,13 +188,6 @@ async def test_workflow_ordering_is_correct( assert_respectful_exec_order(dsl, result) -def _get_expected(path: Path) -> dict[str, Any]: - with path.open() as f: - yaml_data = f.read() - data = yaml.safe_load(yaml_data) - return {key: (value or {}) for key, value in data.items()} - - # Get the paths from the test name @pytest.fixture def dsl_with_expected(request: pytest.FixtureRequest) -> DSLInput: @@ -221,8 +195,11 @@ def dsl_with_expected(request: pytest.FixtureRequest) -> DSLInput: data_path = DATA_PATH / f"{test_name}.yml" expected_path = DATA_PATH / f"{test_name}_expected.yml" dsl = DSLInput.from_yaml(data_path) - expected = _get_expected(expected_path) - return dsl, expected + with expected_path.open() as f: + yaml_data = f.read() + data = yaml.safe_load(yaml_data) + expected_result = {key: (value or {}) for key, value in data.items()} + return dsl, expected_result @pytest.mark.parametrize( From efa60924f721800123b88235822603d496300d48 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:47:45 -0400 Subject: [PATCH 27/33] Move all dsl fixtures into conftest --- tests/conftest.py | 71 +++++++++++++++++++++++++++++++++++- tests/unit/test_workflows.py | 66 +-------------------------------- 2 files changed, 70 insertions(+), 67 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f8a139d09..15c882df1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import asyncio import os import subprocess import time @@ -5,11 +6,14 @@ from uuid import uuid4 import pytest +import yaml from cryptography.fernet import Fernet from loguru import logger from tracecat.dsl.common import DSLInput +DATA_PATH = Path(__file__).parent.parent.joinpath("data/workflows") + def pytest_addoption(parser: pytest.Parser): parser.addoption( @@ -176,8 +180,71 @@ def tracecat_worker(env_sandbox): logger.info("Stopped Tracecat Temporal worker") +@pytest.fixture +def mock_registry(): + """Mock registry for testing UDFs. + + Note + ---- + - This fixture is used to test the integration of UDFs with the workflow. + - It's unreachable by an external worker, as the worker will not have access + to these functions when it starts up. + """ + from tracecat.registry import registry + + # NOTE!!!!!!!: Didn't want to spend too much time figuring out how + # to grab the actual execution order using the client, so I'm using a + # hacky way to get the order of execution. TO FIX LATER + # The counter doesn't get reset properly so you should never use this outside + # of the 'ordering' tests + def counter(): + i = 0 + while True: + yield i + i += 1 + + counter_gen = counter() + if "integration_test.count" not in registry: + + @registry.register( + description="Counts up from 0", + namespace="integration_test", + ) + def count(arg: str | None = None) -> int: + order = next(counter_gen) + return order + + if "integration_test.passthrough" not in registry: + + @registry.register( + description="passes through", + namespace="integration_test", + ) + async def passthrough(num: int) -> int: + await asyncio.sleep(0.1) + return num + + registry.init() + yield registry + counter_gen = counter() # Reset the counter generator + + @pytest.fixture def dsl(request: pytest.FixtureRequest) -> DSLInput: - path: list[Path] = request.param - dsl = DSLInput.from_yaml(path) + test_name = request.param + data_path = DATA_PATH / f"{test_name}.yml" + dsl = DSLInput.from_yaml(data_path) return dsl + + +@pytest.fixture +def dsl_with_expected(request: pytest.FixtureRequest) -> DSLInput: + test_name = request.param + data_path = DATA_PATH / f"{test_name}.yml" + expected_path = DATA_PATH / f"{test_name}_expected.yml" + dsl = DSLInput.from_yaml(data_path) + with expected_path.open() as f: + yaml_data = f.read() + data = yaml.safe_load(yaml_data) + expected_result = {key: (value or {}) for key, value in data.items()} + return dsl, expected_result diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py index 246b4a128..f4d54226e 100644 --- a/tests/unit/test_workflows.py +++ b/tests/unit/test_workflows.py @@ -13,7 +13,6 @@ from pathlib import Path import pytest -import yaml from loguru import logger from slugify import slugify from temporalio.common import RetryPolicy @@ -44,55 +43,6 @@ def generate_test_exec_id(name: str) -> str: ) -@pytest.fixture -def mock_registry(): - """Mock registry for testing UDFs. - - Note - ---- - - This fixture is used to test the integration of UDFs with the workflow. - - It's unreachable by an external worker, as the worker will not have access - to these functions when it starts up. - """ - from tracecat.registry import registry - - # NOTE!!!!!!!: Didn't want to spend too much time figuring out how - # to grab the actual execution order using the client, so I'm using a - # hacky way to get the order of execution. TO FIX LATER - # The counter doesn't get reset properly so you should never use this outside - # of the 'ordering' tests - def counter(): - i = 0 - while True: - yield i - i += 1 - - counter_gen = counter() - if "integration_test.count" not in registry: - - @registry.register( - description="Counts up from 0", - namespace="integration_test", - ) - def count(arg: str | None = None) -> int: - order = next(counter_gen) - return order - - if "integration_test.passthrough" not in registry: - - @registry.register( - description="passes through", - namespace="integration_test", - ) - async def passthrough(num: int) -> int: - await asyncio.sleep(0.1) - return num - - registry.init() - yield registry - counter_gen = counter() # Reset the counter generator - - @pytest.mark.parametrize("dsl", SHARED_TEST_DEFNS, indirect=True) @pytest.mark.asyncio async def test_workflow_can_run_from_yaml( @@ -188,20 +138,6 @@ async def test_workflow_ordering_is_correct( assert_respectful_exec_order(dsl, result) -# Get the paths from the test name -@pytest.fixture -def dsl_with_expected(request: pytest.FixtureRequest) -> DSLInput: - test_name = request.param - data_path = DATA_PATH / f"{test_name}.yml" - expected_path = DATA_PATH / f"{test_name}_expected.yml" - dsl = DSLInput.from_yaml(data_path) - with expected_path.open() as f: - yaml_data = f.read() - data = yaml.safe_load(yaml_data) - expected_result = {key: (value or {}) for key, value in data.items()} - return dsl, expected_result - - @pytest.mark.parametrize( "dsl_with_expected", [ @@ -250,7 +186,7 @@ async def test_workflow_completes_and_correct( @pytest.mark.parametrize( "dsl", - [DATA_PATH / "unit_conditional_adder_diamond_skip_with_join_strong_dep_fails.yml"], + ["unit_conditional_adder_diamond_skip_with_join_strong_dep_fails"], indirect=True, ) @pytest.mark.asyncio From 05b873a3ca0de1a5c60052b04d98a38b365b77ca Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:53:00 -0400 Subject: [PATCH 28/33] fix: path to dsl yaml --- tests/integration/test_stress.py | 2 +- tests/integration/test_validation.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/test_stress.py b/tests/integration/test_stress.py index 0f58c8390..2e34c1f94 100644 --- a/tests/integration/test_stress.py +++ b/tests/integration/test_stress.py @@ -12,7 +12,7 @@ @pytest.mark.parametrize( "dsl", - [DATA_PATH / "stress_adder_tree.yml"], + ["stress_adder_tree"], ids=lambda path: path.stem, indirect=True, ) diff --git a/tests/integration/test_validation.py b/tests/integration/test_validation.py index 96e0adb03..3ff57e4e5 100644 --- a/tests/integration/test_validation.py +++ b/tests/integration/test_validation.py @@ -20,7 +20,6 @@ def filename(request: pytest.FixtureRequest) -> Path: ) @pytest.mark.asyncio async def test_workflow_commit(filename, auth_sandbox): - print(filename) title = f"Test workflow: {filename}" workflow_result = await shared.create_workflow(title) await shared.commit_workflow(filename, workflow_result["id"]) From 8182ba6f9c7d3b6fe086c29ffc8e71ddbbb086ec Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:56:08 -0400 Subject: [PATCH 29/33] remove ids --- tests/integration/test_stress.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_stress.py b/tests/integration/test_stress.py index 2e34c1f94..e170d885a 100644 --- a/tests/integration/test_stress.py +++ b/tests/integration/test_stress.py @@ -13,7 +13,6 @@ @pytest.mark.parametrize( "dsl", ["stress_adder_tree"], - ids=lambda path: path.stem, indirect=True, ) @pytest.mark.parametrize( From 615f8884fe992c3812e7bfdaa79e41c6cd2b7502 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 16:00:44 -0400 Subject: [PATCH 30/33] Fix path to data --- tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 15c882df1..0eaf5f3f5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,7 @@ from tracecat.dsl.common import DSLInput -DATA_PATH = Path(__file__).parent.parent.joinpath("data/workflows") +DATA_PATH = Path(__file__).parent.joinpath("data/workflows") def pytest_addoption(parser: pytest.Parser): From e95f1dca3c4f8d1633bd8670f6ec97c5a4e82997 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 16:07:28 -0400 Subject: [PATCH 31/33] Only use stem --- tests/unit/test_workflows.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py index f4d54226e..7777c0e8d 100644 --- a/tests/unit/test_workflows.py +++ b/tests/unit/test_workflows.py @@ -28,8 +28,8 @@ from tracecat.types.exceptions import TracecatExpressionError DATA_PATH = Path(__file__).parent.parent.joinpath("data/workflows") -SHARED_TEST_DEFNS = list(DATA_PATH.glob("shared_*.yml")) -ORDERING_TEST_DEFNS = list(DATA_PATH.glob("unit_ordering_*.yml")) +SHARED_TEST_DEFNS = [path.stem for path in DATA_PATH.glob("shared_*.yml")] +ORDERING_TEST_DEFNS = [path.stem for path in DATA_PATH.glob("unit_ordering_*.yml")] TEST_WF_ID = "wf-00000000000000000000000000000000" From 5ad3abb6a7a87e7f05fe8223b0d39c0c9bc0fc85 Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 16:07:55 -0400 Subject: [PATCH 32/33] Fix cannot call list in asyncio gather --- tests/integration/test_stress.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_stress.py b/tests/integration/test_stress.py index e170d885a..6bee6ed5a 100644 --- a/tests/integration/test_stress.py +++ b/tests/integration/test_stress.py @@ -31,6 +31,6 @@ def generate_wf_id(): dispatch_workflow(dsl=dsl, wf_id=generate_wf_id()) for _ in range(num_workflows) ] workflow_runs = benchmark.pedantic( - lambda: asyncio.gather(tasks()), iterations=3, rounds=1 + lambda: asyncio.gather(tasks), iterations=3, rounds=1 ) assert all(run.done() for run in workflow_runs) From 4620cb6411c0cbdf2b2d620dd0ecd7068d4b199f Mon Sep 17 00:00:00 2001 From: Chris Lo <46541035+topher-lo@users.noreply.github.com> Date: Wed, 3 Jul 2024 16:33:48 -0400 Subject: [PATCH 33/33] fix: run gather --- tests/integration/test_stress.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_stress.py b/tests/integration/test_stress.py index 6bee6ed5a..214106b43 100644 --- a/tests/integration/test_stress.py +++ b/tests/integration/test_stress.py @@ -27,10 +27,11 @@ def test_concurrent_workflows( def generate_wf_id(): return f"wf-{uuid.uuid4()}" - tasks = [ - dispatch_workflow(dsl=dsl, wf_id=generate_wf_id()) for _ in range(num_workflows) - ] - workflow_runs = benchmark.pedantic( - lambda: asyncio.gather(tasks), iterations=3, rounds=1 - ) - assert all(run.done() for run in workflow_runs) + async def run_workflows(): + tasks = [ + dispatch_workflow(dsl=dsl, wf_id=generate_wf_id()) + for _ in range(num_workflows) + ] + return await asyncio.gather(*tasks) + + benchmark.pedantic(lambda: asyncio.run(run_workflows()), iterations=3, rounds=1)