Skip to content

Commit

Permalink
Merge branch 'branch-23.07' of github.com:nv-morpheus/MRC into david-…
Browse files Browse the repository at this point in the history
…pybind11-961
  • Loading branch information
dagardner-nv committed Jun 21, 2023
2 parents 2eb217b + 0d0ab70 commit 1ae67e2
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cpp/mrc/src/internal/control_plane/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ void Server::do_service_await_join()
/**
* @brief Stream Acceptor
*
* The while loop of this method says active as long as the grpc server is still accepting connections.
* The while loop of this method stays active as long as the grpc server is still accepting connections.
* There are multiple way this can be implemented depending the service requirements, one might choose
* to preallocate N number of streams and issues them all to the CQ. This is an alternative method which
* creates a single stream and waits for it to get initialized, then creates another. The current implementation is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Role;
class ClientInstance;

/**
* @brief A specialize TaggedManager to synchronize tag and instance_id information across between a collection of
* @brief A specialized TaggedIssuer to synchronize tag and instance_id information across between a collection of
* client-side objects with common linkages, e.g. the Publisher/Subscriber services which form the building blocks for
* Ingress/EgressPorts use instances of SubscriptionService for Publishers to get control plane updates to the list of
* Subscribers.
Expand Down
4 changes: 2 additions & 2 deletions cpp/mrc/src/internal/control_plane/server/tagged_issuer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class Tagged
* This is the primary base class for a control plane server-side stateful service which can be updated by the client
* and state updates driven independently via the issue_update() method.
*
* TaggedManager is not thread-safe or protected in anyway. The global state mutex should protect all TaggedManagers.
* TaggedIssuer is not thread-safe or protected in anyway. The global state mutex should protect all TaggedIssuers.
*
* In most scenarios, the service side will have a batched updated which will periodically visit each TaggedManager and
* In most scenarios, the service side will have a batched updated which will periodically visit each TaggedIssuer and
* call issue_update(); however, depending on the service request/update message, the call may also require an immediate
* update.
*/
Expand Down
4 changes: 2 additions & 2 deletions cpp/mrc/src/public/segment/builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ void Builder::init_module(std::shared_ptr<mrc::modules::SegmentModule> smodule)
VLOG(2) << "Initializing module: " << m_namespace_prefix;
smodule->m_module_instance_registered_namespace = m_namespace_prefix;
smodule->initialize(*this);
ns_pop();

// TODO(Devin): Maybe a better way to do this with compile time type ledger.
if (std::dynamic_pointer_cast<modules::PersistentModule>(smodule) != nullptr)
{
VLOG(2) << "Registering persistent module -> '" << smodule->component_prefix() << "'";
VLOG(2) << "Registering persistent module -> '" << m_namespace_prefix << "'";
m_backend.add_module(m_namespace_prefix, smodule);
}
ns_pop();
}

std::shared_ptr<ObjectProperties> Builder::get_ingress(std::string name, std::type_index type_index)
Expand Down
49 changes: 44 additions & 5 deletions python/tests/test_segment_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def on_complete():


def test_py_constructor():

config = {"config_key_1": True}

registry = mrc.ModuleRegistry
Expand Down Expand Up @@ -312,11 +311,41 @@ def on_complete():

def test_py_module_nesting():

def gen_data():
for i in range(0, 43):
yield True
def init_wrapper(builder: mrc.Builder):
global packet_count
packet_count = 0

def on_next(input):
global packet_count
packet_count += 1
logging.info("Sinking {}".format(input))

def on_error():
pass

def on_complete():
pass

nested_mod = builder.load_module("NestedModule", "mrc_unittest", "ModuleNestingTest_mod1", {})
nested_sink = builder.make_sink("nested_sink", on_next, on_error, on_complete)

builder.make_edge(nested_mod.output_port("nested_module_output"), nested_sink)

pipeline = mrc.Pipeline()
pipeline.make_segment("ModuleNesting_Segment", init_wrapper)

options = mrc.Options()
options.topology.user_cpuset = "0-1"

executor = mrc.Executor(options)
executor.register_pipeline(pipeline)
executor.start()
executor.join()

assert packet_count == 4


def test_py_modules_dont_overwrite():

def init_wrapper(builder: mrc.Builder):
global packet_count
Expand All @@ -334,9 +363,18 @@ def on_complete():
pass

nested_mod = builder.load_module("NestedModule", "mrc_unittest", "ModuleNestingTest_mod1", {})

# Make sure we can't re-register the same name
with pytest.raises(RuntimeError):
this_should_fail = builder.load_module( # noqa
"NestedModule", "mrc_unittest", "ModuleNestingTest_mod1", {})

nested_mod2 = builder.load_module("NestedModule", "mrc_unittest", "ModuleNestingTest_mod2", {})
nested_sink = builder.make_sink("nested_sink", on_next, on_error, on_complete)
nested_sink2 = builder.make_sink("nested_sink2", on_next, on_error, on_complete)

builder.make_edge(nested_mod.output_port("nested_module_output"), nested_sink)
builder.make_edge(nested_mod2.output_port("nested_module_output"), nested_sink2)

pipeline = mrc.Pipeline()
pipeline.make_segment("ModuleNesting_Segment", init_wrapper)
Expand All @@ -349,7 +387,7 @@ def on_complete():
executor.start()
executor.join()

assert packet_count == 4
assert packet_count == 8


if (__name__ in ("__main__", )):
Expand All @@ -358,5 +396,6 @@ def on_complete():
test_py_module_as_sink()
test_py_module_chaining()
test_py_module_nesting()
test_py_modules_dont_overwrite()
test_py_constructor()
test_py_module_initialization()

0 comments on commit 1ae67e2

Please sign in to comment.