From c285380dde4384d88a1430a079c1eee2f5140dbb Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 21 Aug 2023 22:35:49 -0500 Subject: [PATCH] Add `parser_kwargs` to `FileSourceStage` to support json files (#1137) Closes #1132 Authors: - Christopher Harris (https://github.com/cwharris) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1137 --- .../include/morpheus/io/deserializers.hpp | 5 ++- .../include/morpheus/stages/file_source.hpp | 10 ++++- morpheus/_lib/src/io/deserializers.cpp | 7 +++- morpheus/_lib/src/stages/file_source.cpp | 25 ++++++++++--- morpheus/_lib/stages/__init__.pyi | 2 +- morpheus/_lib/stages/module.cpp | 3 +- morpheus/_lib/tests/test_file_in_out.cpp | 2 +- morpheus/stages/input/file_source_stage.py | 13 ++++++- tests/test_file_in_out.py | 37 ++++++++++++++----- tests/tests_data/simple.json | 3 ++ 10 files changed, 83 insertions(+), 24 deletions(-) create mode 100644 tests/tests_data/simple.json diff --git a/morpheus/_lib/include/morpheus/io/deserializers.hpp b/morpheus/_lib/include/morpheus/io/deserializers.hpp index 2a845a9b5b..05910fa857 100644 --- a/morpheus/_lib/include/morpheus/io/deserializers.hpp +++ b/morpheus/_lib/include/morpheus/io/deserializers.hpp @@ -22,6 +22,7 @@ #include #include // for pybind11::object +#include #include #include @@ -48,7 +49,9 @@ std::vector get_column_names_from_table(const cudf::io::table_with_ * @param filename : Name of the file that should be loaded into a table * @return cudf::io::table_with_metadata */ -cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type = FileTypes::Auto); +cudf::io::table_with_metadata load_table_from_file(const std::string& filename, + FileTypes file_type = FileTypes::Auto, + std::optional json_lines = std::nullopt); /** * @brief Returns the number of index columns in `data_table`, in practice this will be a `0` or `1` diff --git a/morpheus/_lib/include/morpheus/stages/file_source.hpp b/morpheus/_lib/include/morpheus/stages/file_source.hpp index 248d2112f2..4e6fb2f541 100644 --- a/morpheus/_lib/include/morpheus/stages/file_source.hpp +++ b/morpheus/_lib/include/morpheus/stages/file_source.hpp @@ -27,11 +27,13 @@ #include #include #include +#include #include #include // for apply, make_subscriber, observable_member, is_on_error<>::not_void, is_on_next_of<>::not_void, trace_activity #include #include +#include #include #include #include // for vector @@ -63,14 +65,16 @@ class FileSourceStage : public mrc::pymrc::PythonSource json_lines = std::nullopt); private: subscriber_fn_t build(); std::string m_filename; int m_repeat{1}; + std::optional m_json_lines; }; /****** FileSourceStageInterfaceProxy***********************/ @@ -86,12 +90,14 @@ struct FileSourceStageInterfaceProxy * @param name : Name of a stage reference * @param filename : Name of the file from which the messages will be read. * @param repeat : Repeats the input dataset multiple times. Useful to extend small datasets for debugging. + * @param parser_kwargs : Optional arguments to pass to the file parser. * @return std::shared_ptr> */ static std::shared_ptr> init(mrc::segment::Builder& builder, const std::string& name, std::string filename, - int repeat = 1); + int repeat = 1, + pybind11::dict parser_kwargs = pybind11::dict()); }; #pragma GCC visibility pop /** @} */ // end of group diff --git a/morpheus/_lib/src/io/deserializers.cpp b/morpheus/_lib/src/io/deserializers.cpp index cda1a25235..0cd129f191 100644 --- a/morpheus/_lib/src/io/deserializers.cpp +++ b/morpheus/_lib/src/io/deserializers.cpp @@ -53,7 +53,9 @@ std::vector get_column_names_from_table(const cudf::io::table_with_ return foreach_map(table.metadata.schema_info, [](auto schema) { return schema.name; }); } -cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type) +cudf::io::table_with_metadata load_table_from_file(const std::string& filename, + FileTypes file_type, + std::optional json_lines) { if (file_type == FileTypes::Auto) { @@ -65,7 +67,8 @@ cudf::io::table_with_metadata load_table_from_file(const std::string& filename, switch (file_type) { case FileTypes::JSON: { - auto options = cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(true); + auto options = + cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(json_lines.value_or(true)); table = cudf::io::read_json(options.build()); break; } diff --git a/morpheus/_lib/src/stages/file_source.cpp b/morpheus/_lib/src/stages/file_source.cpp index 0bceb33623..0bb9870fea 100644 --- a/morpheus/_lib/src/stages/file_source.cpp +++ b/morpheus/_lib/src/stages/file_source.cpp @@ -25,18 +25,21 @@ #include "pymrc/node.hpp" #include "morpheus/io/deserializers.hpp" +#include "morpheus/objects/file_types.hpp" #include "morpheus/objects/table_info.hpp" #include "morpheus/utilities/cudf_util.hpp" #include #include #include +#include #include #include // for str_attr_accessor #include // for pybind11::int_ #include #include +#include #include #include // IWYU thinks we need __alloc_traits<>::value_type for vector assignments @@ -45,16 +48,17 @@ namespace morpheus { // Component public implementations // ************ FileSourceStage ************* // -FileSourceStage::FileSourceStage(std::string filename, int repeat) : +FileSourceStage::FileSourceStage(std::string filename, int repeat, std::optional json_lines) : PythonSource(build()), m_filename(std::move(filename)), - m_repeat(repeat) + m_repeat(repeat), + m_json_lines(json_lines) {} FileSourceStage::subscriber_fn_t FileSourceStage::build() { return [this](rxcpp::subscriber output) { - auto data_table = load_table_from_file(m_filename); + auto data_table = load_table_from_file(m_filename, FileTypes::Auto, m_json_lines); int index_col_count = prepare_df_index(data_table); // Next, create the message metadata. This gets reused for repeats @@ -112,9 +116,20 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build() // ************ FileSourceStageInterfaceProxy ************ // std::shared_ptr> FileSourceStageInterfaceProxy::init( - mrc::segment::Builder& builder, const std::string& name, std::string filename, int repeat) + mrc::segment::Builder& builder, + const std::string& name, + std::string filename, + int repeat, + pybind11::dict parser_kwargs) { - auto stage = builder.construct_object(name, filename, repeat); + std::optional json_lines = std::nullopt; + + if (parser_kwargs.contains("lines")) + { + json_lines = parser_kwargs["lines"].cast(); + } + + auto stage = builder.construct_object(name, filename, repeat, json_lines); return stage; } diff --git a/morpheus/_lib/stages/__init__.pyi b/morpheus/_lib/stages/__init__.pyi index b62fba68e4..6c0679dede 100644 --- a/morpheus/_lib/stages/__init__.pyi +++ b/morpheus/_lib/stages/__init__.pyi @@ -40,7 +40,7 @@ class DeserializeStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, batch_size: int, ensure_sliceable_index: bool = True) -> None: ... pass class FileSourceStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int) -> None: ... + def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, parser_kwargs: dict) -> None: ... pass class FilterDetectionsStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ... diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index caac2e9246..a173359259 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -105,7 +105,8 @@ PYBIND11_MODULE(stages, _module) py::arg("builder"), py::arg("name"), py::arg("filename"), - py::arg("repeat")); + py::arg("repeat"), + py::arg("parser_kwargs")); py::class_, mrc::segment::ObjectProperties, diff --git a/morpheus/_lib/tests/test_file_in_out.cpp b/morpheus/_lib/tests/test_file_in_out.cpp index 612d074330..6f8a89c685 100644 --- a/morpheus/_lib/tests/test_file_in_out.cpp +++ b/morpheus/_lib/tests/test_file_in_out.cpp @@ -90,7 +90,7 @@ TEST_F(TestFileInOut, RoundTripCSV) } } -TEST_F(TestFileInOut, RoundTripJSON) +TEST_F(TestFileInOut, RoundTripJSONLines) { using nlohmann::json; auto input_file = test::get_morpheus_root() / "tests/tests_data/filter_probs.jsonlines"; diff --git a/morpheus/stages/input/file_source_stage.py b/morpheus/stages/input/file_source_stage.py index 8a44397500..d6c396a660 100644 --- a/morpheus/stages/input/file_source_stage.py +++ b/morpheus/stages/input/file_source_stage.py @@ -57,6 +57,8 @@ class FileSourceStage(PreallocatorMixin, SingleOutputSource): filter_null : bool, default = True Whether or not to filter rows with null 'data' column. Null values in the 'data' column can cause issues down the line with processing. Setting this to True is recommended. + parser_kwargs : dict, default = {} + Extra options to pass to the file parser. """ def __init__(self, @@ -65,7 +67,8 @@ def __init__(self, iterative: bool = False, file_type: FileTypes = FileTypes.Auto, repeat: int = 1, - filter_null: bool = True): + filter_null: bool = True, + parser_kwargs: dict = None): super().__init__(c) @@ -74,6 +77,7 @@ def __init__(self, self._filename = filename self._file_type = file_type self._filter_null = filter_null + self._parser_kwargs = parser_kwargs or {} self._input_count = None self._max_concurrent = c.num_threads @@ -101,7 +105,11 @@ def _build_source(self, builder: mrc.Builder) -> StreamPair: if self._build_cpp_node(): import morpheus._lib.stages as _stages - out_stream = _stages.FileSourceStage(builder, self.unique_name, self._filename, self._repeat_count) + out_stream = _stages.FileSourceStage(builder, + self.unique_name, + self._filename, + self._repeat_count, + self._parser_kwargs) else: out_stream = builder.make_source(self.unique_name, self._generate_frames()) @@ -115,6 +123,7 @@ def _generate_frames(self) -> typing.Iterable[MessageMeta]: self._filename, self._file_type, filter_nulls=self._filter_null, + parser_kwargs=self._parser_kwargs, df_type="cudf", ) diff --git a/tests/test_file_in_out.py b/tests/test_file_in_out.py index a52549de66..61e7d10c6d 100755 --- a/tests/test_file_in_out.py +++ b/tests/test_file_in_out.py @@ -28,6 +28,7 @@ from morpheus.messages import MultiMessage from morpheus.pipeline import LinearPipeline from morpheus.stages.input.file_source_stage import FileSourceStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage @@ -42,9 +43,9 @@ @pytest.mark.parametrize("flush", [False, True], ids=["no_flush", "flush"]) @pytest.mark.parametrize("repeat", [1, 2, 5], ids=["repeat1", "repeat2", "repeat5"]) def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat: int): - input_file = os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.{}'.format(input_type)) + input_file = os.path.join(TEST_DIRS.tests_data_dir, f'filter_probs.{input_type}') validation_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - out_file = os.path.join(tmp_path, 'results.{}'.format(output_type)) + out_file = os.path.join(tmp_path, f'results.{output_type}') pipe = LinearPipeline(config) pipe.set_source(FileSourceStage(config, filename=input_file, repeat=repeat)) @@ -62,7 +63,7 @@ def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat: # The output data will contain an additional id column that we will need to slice off output_data = np.loadtxt(out_file, delimiter=",", skiprows=1) output_data = output_data[:, 1:] - elif output_type == "json" or output_type == "jsonlines": # assume json + elif output_type in ("json", "jsonlines"): # assume json df = read_file_to_df(out_file, file_type=FileTypes.Auto) output_data = df.values elif output_type == "parquet": @@ -76,6 +77,24 @@ def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat: assert output_data.tolist() == validation_data.tolist() +def test_file_read_json(config): + src_file = os.path.join(TEST_DIRS.tests_data_dir, "simple.json") + + pipe = LinearPipeline(config) + pipe.set_source(FileSourceStage(config, filename=src_file, parser_kwargs={"lines": False})) + sink_stage = pipe.add_stage(InMemorySinkStage(config)) + pipe.run() + + messages = sink_stage.get_messages() + + assert (len(messages) == 1) + + meta = messages[0] + + assert (len(meta.df) == 4) + assert (len(meta.df.columns) == 3) + + @pytest.mark.slow @pytest.mark.use_python @pytest.mark.usefixtures("chdir_tmpdir") @@ -101,9 +120,9 @@ def test_to_file_no_path(tmp_path, config): @pytest.mark.parametrize("input_type", ["csv", "jsonlines", "parquet"]) @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) def test_file_rw_multi_segment_pipe(tmp_path, config, input_type, output_type): - input_file = os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.{}'.format(input_type)) + input_file = os.path.join(TEST_DIRS.tests_data_dir, f'filter_probs.{input_type}') validation_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - out_file = os.path.join(tmp_path, 'results.{}'.format(output_type)) + out_file = os.path.join(tmp_path, f'results.{output_type}') if (input_type == "parquet"): CppConfig.set_should_use_cpp(False) @@ -163,10 +182,10 @@ def test_file_rw_index_pipe(tmp_path, config, input_file): "include_header": True }), (os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.jsonlines"), {})], ids=["CSV", "CSV_ID", "JSON"]) -def test_file_roundtrip(use_cpp, tmp_path, input_file, extra_kwargs): +def test_file_roundtrip(tmp_path, input_file, extra_kwargs): # Output file should be same type as input - out_file = os.path.join(tmp_path, 'results{}'.format(os.path.splitext(input_file)[1])) + out_file = os.path.join(tmp_path, f'results{os.path.splitext(input_file)[1]}') # Read the dataframe df = read_file_to_df(input_file, df_type='cudf') @@ -204,7 +223,7 @@ def test_read_cpp_compare(input_file: str): @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) def test_file_rw_serialize_deserialize_pipe(tmp_path, config, output_type): input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - out_file = os.path.join(tmp_path, 'results.{}'.format(output_type)) + out_file = os.path.join(tmp_path, f'results.{output_type}') pipe = LinearPipeline(config) pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False)) @@ -234,7 +253,7 @@ def test_file_rw_serialize_deserialize_pipe(tmp_path, config, output_type): @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) def test_file_rw_serialize_deserialize_multi_segment_pipe(tmp_path, config, output_type): input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - out_file = os.path.join(tmp_path, 'results.{}'.format(output_type)) + out_file = os.path.join(tmp_path, f'results.{output_type}') pipe = LinearPipeline(config) pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False)) diff --git a/tests/tests_data/simple.json b/tests/tests_data/simple.json new file mode 100644 index 0000000000..3a492210d4 --- /dev/null +++ b/tests/tests_data/simple.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:29679e130a150265fbadb79367714bf5fcc4a6f845b388bae992e5a160deb2a7 +size 391