Skip to content

Commit

Permalink
Add test to reproduce morpheus issue #1838
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Aug 15, 2024
1 parent 86c950a commit c39f5ee
Showing 1 changed file with 111 additions and 5 deletions.
116 changes: 111 additions & 5 deletions python/tests/test_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -14,6 +14,8 @@
# limitations under the License.

import asyncio
import os
import time
import typing

import pytest
Expand All @@ -30,6 +32,53 @@ def pairwise(t):
node_fn_type = typing.Callable[[mrc.Builder], mrc.SegmentObject]


@pytest.fixture
def source():

def build(builder: mrc.Builder):

def gen_data():
yield 1
yield 2
yield 3

return builder.make_source("source", gen_data)

return build


@pytest.fixture
def endless_source():

def build(builder: mrc.Builder):

def gen_data():
i = 0
while True:
yield i
i += 1
time.sleep(0.1)

return builder.make_source("endless_source", gen_data())

return build


@pytest.fixture
def blocking_source():

def build(builder: mrc.Builder):

def gen_data():
yield 1
while True:
time.sleep(0.1)

return builder.make_source("blocking_source", gen_data)

return build


@pytest.fixture
def source_pyexception():

Expand Down Expand Up @@ -64,13 +113,27 @@ def gen_data_and_raise():
return build


@pytest.fixture
def node_exception():

def build(builder: mrc.Builder):

def on_next(data):
print("Received value: {}".format(data), flush=True)
raise RuntimeError("unittest")

return builder.make_node("node", mrc.core.operators.map(on_next))

return build


@pytest.fixture
def sink():

def build(builder: mrc.Builder):

def sink_on_next(data):
print("Got value: {}".format(data))
print("Got value: {}".format(data), flush=True)

return builder.make_sink("sink", sink_on_next, None, None)

Expand Down Expand Up @@ -109,10 +172,11 @@ def init_segment(builder: mrc.Builder):
@pytest.fixture
def build_executor():

def inner(pipe: mrc.Pipeline):
def inner(pipe: mrc.Pipeline, callback_fn=None):
options = mrc.Options()

executor = mrc.Executor(options)
options.topology.user_cpuset = f"0-{os.cpu_count() - 1}"
options.engine_factories.default_engine_type = mrc.core.options.EngineType.Thread
executor = mrc.Executor(options, callback_fn)
executor.register_pipeline(pipe)

executor.start()
Expand Down Expand Up @@ -183,5 +247,47 @@ async def run_pipeline():
asyncio.run(run_pipeline())


@pytest.mark.parametrize("souce_name", ["source", "endless_source", "blocking_source"])
def test_pyexception_in_node(source: node_fn_type,
endless_source: node_fn_type,
blocking_source: node_fn_type,
node_exception: node_fn_type,
build_pipeline: build_pipeline_type,
build_executor: build_executor_type,
souce_name: str):
"""
Test to reproduce Morpheus issue #1838 where an exception raised in a node doesn't always shutdown the executor
when the source is intended to run indefinitely.
"""

if souce_name == "endless_source":
source_fn = endless_source
elif souce_name == "blocking_source":
source_fn = blocking_source
else:
source_fn = source

pipe = build_pipeline(source_fn, node_exception)

executor: mrc.Executor = None

def state_change_cb(state: mrc.State):
print(f"state_change_cb: {state}", flush=True)
nonlocal state_change_cb_called
state_change_cb_called = True
if state in (mrc.State.Stop, mrc.State.Kill):
print("Calling stop", flush=True)
executor.stop()

executor = build_executor(pipe, state_change_cb)

state_change_cb_called = False

with pytest.raises(RuntimeError):
executor.join()

assert state_change_cb_called


if (__name__ in ("__main__", )):
test_pyexception_in_source()

0 comments on commit c39f5ee

Please sign in to comment.