Skip to content

Commit

Permalink
Test for issue nv-morpheus#360
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Aug 2, 2023
1 parent ef06972 commit 79d144b
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions cpp/mrc/tests/test_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <map>
#include <memory>
#include <ostream>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -160,6 +161,55 @@ TEST_F(TestPipeline, TwoSegment)
LOG(INFO) << "Done" << std::endl;
}

TEST_F(TestPipeline, InconsistentPipeline)
{
// Test to reproduce issue #360
auto pipeline = mrc::make_pipeline();

auto seg_1 =
pipeline->make_segment("seg_1", segment::EgressPorts<float>({"float_port"}), [](segment::IBuilder& seg) {
auto rx_source = seg.make_source<float>("rx_source", [](rxcpp::subscriber<float> s) {
LOG(INFO) << "emit 1";
s.on_next(1.0F);
LOG(INFO) << "emit 2";
s.on_next(2.0F);
LOG(INFO) << "emit 3";
s.on_next(3.0F);
LOG(INFO) << "issuing complete";
s.on_completed();
});

auto my_float_egress = seg.get_egress<float>("float_port");

seg.make_edge(rx_source, my_float_egress);
});

auto seg_2 =
pipeline->make_segment("seg_2", segment::IngressPorts<float>({"float_port"}), [&](segment::IBuilder& seg) {
auto my_float_ingress = seg.get_ingress<float>("float_port");

auto rx_sink = seg.make_sink<float>("rx_sink",
rxcpp::make_observer_dynamic<float>(
[&](float x) {
DVLOG(1) << x << std::endl;
},
[&]() {
DVLOG(1) << "Completed" << std::endl;
}));

seg.make_edge(my_float_ingress, rx_sink);
throw std::runtime_error("Error in initializer");
});

Executor exec(std::move(m_options));

exec.register_pipeline(std::move(pipeline));

exec.start();

EXPECT_THROW(exec.join(), std::runtime_error);
}

/*
TEST_F(TestPipeline, TwoSegmentManualTag)
{
Expand Down

0 comments on commit 79d144b

Please sign in to comment.