From 700b0358062fe06e77638f393372b1d4a6e83fd7 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Tue, 18 Jun 2024 13:39:11 -0700 Subject: [PATCH] Fixes for C++ impl for `DeserializeStage` and add missing `get_info` overloads to `SlicedMessageMeta` (#1749) * Replace incorrect usage of `self.supports_cpp_node()` with `self._build_cpp_node()` in `DeserializeStage` to determine if a C++ impl should be used. * Chose either `_stages.DeserializeControlMessageStage` or `_stages.DeserializeMultiMessageStage` based on the message type (likely this was a merge error). * Add missing `get_info(const std::string&)` and `get_info(const std::vector&)` to `SlicedMessageMeta` which indirectly fixes the cuda memory errors in #1747 Closes #1747 ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1749 --- .../_lib/include/morpheus/messages/meta.hpp | 4 +++ morpheus/_lib/src/messages/meta.cpp | 14 +++++++++ morpheus/_lib/src/stages/preprocess_fil.cpp | 25 ++++++++-------- .../messages/test_sliced_message_meta.cpp | 30 +++++++++++++++++-- .../stages/preprocess/deserialize_stage.py | 29 ++++++++++++------ 5 files changed, 77 insertions(+), 25 deletions(-) diff --git a/morpheus/_lib/include/morpheus/messages/meta.hpp b/morpheus/_lib/include/morpheus/messages/meta.hpp index 750236df4a..3ee4863af3 100644 --- a/morpheus/_lib/include/morpheus/messages/meta.hpp +++ b/morpheus/_lib/include/morpheus/messages/meta.hpp @@ -189,6 +189,10 @@ class MORPHEUS_EXPORT SlicedMessageMeta : public MessageMeta TableInfo get_info() const override; + TableInfo get_info(const std::string& col_name) const override; + + TableInfo get_info(const std::vector& column_names) const override; + MutableTableInfo get_mutable_info() const override; std::optional ensure_sliceable_index() override; diff --git a/morpheus/_lib/src/messages/meta.cpp b/morpheus/_lib/src/messages/meta.cpp index b141a0e6f0..c9de4eda7f 100644 --- a/morpheus/_lib/src/messages/meta.cpp +++ b/morpheus/_lib/src/messages/meta.cpp @@ -535,6 +535,20 @@ TableInfo SlicedMessageMeta::get_info() const return this->m_data->get_info().get_slice(m_start, m_stop, m_column_names); } +TableInfo SlicedMessageMeta::get_info(const std::string& col_name) const +{ + auto full_info = this->m_data->get_info(); + + return full_info.get_slice(m_start, m_stop, {col_name}); +} + +TableInfo SlicedMessageMeta::get_info(const std::vector& column_names) const +{ + auto full_info = this->m_data->get_info(); + + return full_info.get_slice(m_start, m_stop, column_names); +} + MutableTableInfo SlicedMessageMeta::get_mutable_info() const { return this->m_data->get_mutable_info().get_slice(m_start, m_stop, m_column_names); diff --git a/morpheus/_lib/src/stages/preprocess_fil.cpp b/morpheus/_lib/src/stages/preprocess_fil.cpp index ad1e09c1b4..bcc30283c8 100644 --- a/morpheus/_lib/src/stages/preprocess_fil.cpp +++ b/morpheus/_lib/src/stages/preprocess_fil.cpp @@ -229,16 +229,18 @@ std::shared_ptr PreprocessFILStage std::shared_ptr PreprocessFILStage::on_control_message( std::shared_ptr x) + { - auto num_rows = x->payload()->get_info().num_rows(); + auto df_meta = this->fix_bad_columns(x); + const auto num_rows = df_meta.num_rows(); + auto packed_data = std::make_shared(m_fea_cols.size() * num_rows * sizeof(float), rmm::cuda_stream_per_thread); - auto df_meta = this->fix_bad_columns(x); + for (size_t i = 0; i < df_meta.num_columns(); ++i) { auto curr_col = df_meta.get_column(i); - - auto curr_ptr = static_cast(packed_data->data()) + i * df_meta.num_rows(); + auto curr_ptr = static_cast(packed_data->data()) + i * num_rows; // Check if we are something other than float if (curr_col.type().id() != cudf::type_id::FLOAT32) @@ -246,15 +248,13 @@ std::shared_ptr PreprocessFILStagerelease(); // Do the copy here before it goes out of scope - MRC_CHECK_CUDA(cudaMemcpy( - curr_ptr, float_data.data->data(), df_meta.num_rows() * sizeof(float), cudaMemcpyDeviceToDevice)); + MRC_CHECK_CUDA( + cudaMemcpy(curr_ptr, float_data.data->data(), num_rows * sizeof(float), cudaMemcpyDeviceToDevice)); } else { - MRC_CHECK_CUDA(cudaMemcpy(curr_ptr, - curr_col.template data(), - df_meta.num_rows() * sizeof(float), - cudaMemcpyDeviceToDevice)); + MRC_CHECK_CUDA(cudaMemcpy( + curr_ptr, curr_col.template data(), num_rows * sizeof(float), cudaMemcpyDeviceToDevice)); } } @@ -279,10 +279,9 @@ std::shared_ptr PreprocessFILStage(num_rows); memory->set_tensor("input__0", std::move(input__0)); memory->set_tensor("seq_ids", std::move(seq_ids)); - auto next = x; - next->tensors(memory); + x->tensors(memory); - return next; + return x; } template class PreprocessFILStage; diff --git a/morpheus/_lib/tests/messages/test_sliced_message_meta.cpp b/morpheus/_lib/tests/messages/test_sliced_message_meta.cpp index d7e18d3bd9..96c6f255c7 100644 --- a/morpheus/_lib/tests/messages/test_sliced_message_meta.cpp +++ b/morpheus/_lib/tests/messages/test_sliced_message_meta.cpp @@ -29,14 +29,14 @@ #include // for std::filesystem::path #include // for shared_ptr #include // for move +#include using namespace morpheus; using TestSlicedMessageMeta = morpheus::test::TestMessages; // NOLINT(readability-identifier-naming) -TEST_F(TestSlicedMessageMeta, TestCount) +std::shared_ptr create_test_meta() { - // Test for issue #970 auto test_data_dir = test::get_morpheus_root() / "tests/tests_data"; auto input_file{test_data_dir / "filter_probs.csv"}; @@ -44,7 +44,13 @@ TEST_F(TestSlicedMessageMeta, TestCount) auto table = load_table_from_file(input_file); auto index_col_count = prepare_df_index(table); - auto meta = MessageMeta::create_from_cpp(std::move(table), index_col_count); + return MessageMeta::create_from_cpp(std::move(table), index_col_count); +} + +TEST_F(TestSlicedMessageMeta, TestCount) +{ + // Test for issue #970 + auto meta = create_test_meta(); EXPECT_EQ(meta->count(), 20); SlicedMessageMeta sliced_meta(meta, 5, 15); @@ -60,3 +66,21 @@ TEST_F(TestSlicedMessageMeta, TestCount) EXPECT_EQ(p_meta->count(), 10); EXPECT_EQ(p_meta->get_info().num_rows(), p_meta->count()); } + +TEST_F(TestSlicedMessageMeta, TestGetInfo) +{ + // Test for bug #1747 where get_info() wasn't being overridden for column overloads + auto meta = create_test_meta(); + std::unique_ptr sliced_meta = std::make_unique(meta, 5, 15); + + const auto num_rows = sliced_meta->count(); + + pybind11::gil_scoped_release no_gil; + EXPECT_EQ(num_rows, sliced_meta->get_info().num_rows()); + + std::string column_name("v1"); + EXPECT_EQ(num_rows, sliced_meta->get_info(column_name).num_rows()); + + std::vector column_names{"v1", "v2"}; + EXPECT_EQ(num_rows, sliced_meta->get_info(column_names).num_rows()); +} diff --git a/morpheus/stages/preprocess/deserialize_stage.py b/morpheus/stages/preprocess/deserialize_stage.py index 10518f2887..c8861cb4bb 100644 --- a/morpheus/stages/preprocess/deserialize_stage.py +++ b/morpheus/stages/preprocess/deserialize_stage.py @@ -18,7 +18,6 @@ import mrc -import morpheus._lib.stages as _stages from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes @@ -63,8 +62,7 @@ def __init__(self, c: Config, *, ensure_sliceable_index: bool = True, - message_type: typing.Union[typing.Literal[MultiMessage], - typing.Literal[ControlMessage]] = MultiMessage, + message_type: type[MultiMessage] | type[ControlMessage] = MultiMessage, task_type: str = None, task_payload: dict = None): super().__init__(c) @@ -81,10 +79,10 @@ def __init__(self, self._task_type = task_type self._task_payload = task_payload - if (self._message_type == ControlMessage): + if (self._message_type is ControlMessage): if ((self._task_type is None) != (self._task_payload is None)): raise ValueError("Both `task_type` and `task_payload` must be specified if either is specified.") - elif (self._message_type == MultiMessage): + elif (self._message_type is MultiMessage): if (self._task_type is not None or self._task_payload is not None): raise ValueError("Cannot specify `task_type` or `task_payload` for non-control messages.") else: @@ -92,7 +90,7 @@ def __init__(self, self._module_config = { "ensure_sliceable_index": self._ensure_sliceable_index, - "message_type": "MultiMessage" if self._message_type == MultiMessage else "ControlMessage", + "message_type": "MultiMessage" if self._message_type is MultiMessage else "ControlMessage", "task_type": self._task_type, "task_payload": self._task_payload, "batch_size": self._batch_size, @@ -113,14 +111,27 @@ def accepted_types(self) -> typing.Tuple: def supports_cpp_node(self): # Enable support by default - return False + return True def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(self._message_type) def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: - if (self.supports_cpp_node()): - out_node = _stages.DeserializeStage(builder, self.unique_name, self._batch_size) + if (self._build_cpp_node()): + import morpheus._lib.stages as _stages + if (self._message_type is ControlMessage): + out_node = _stages.DeserializeControlMessageStage(builder, + self.unique_name, + batch_size=self._batch_size, + ensure_sliceable_index=self._ensure_sliceable_index, + task_type=self._task_type, + task_payload=self._task_payload) + else: + out_node = _stages.DeserializeMultiMessageStage(builder, + self.unique_name, + batch_size=self._batch_size, + ensure_sliceable_index=self._ensure_sliceable_index) + builder.make_edge(input_node, out_node) else: module_loader = DeserializeLoaderFactory.get_instance(module_name=f"deserialize_{self.unique_name}",