Skip to content

Commit

Permalink
Add parser_kwargs to FileSourceStage to support json files (#1137)
Browse files Browse the repository at this point in the history
Closes #1132

Authors:
  - Christopher Harris (https://github.com/cwharris)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1137
  • Loading branch information
cwharris authored Aug 22, 2023
1 parent 5af1cdb commit c285380
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 24 deletions.
5 changes: 4 additions & 1 deletion morpheus/_lib/include/morpheus/io/deserializers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/io/types.hpp>
#include <pybind11/pytypes.h> // for pybind11::object

#include <optional>
#include <string>
#include <vector>

Expand All @@ -48,7 +49,9 @@ std::vector<std::string> get_column_names_from_table(const cudf::io::table_with_
* @param filename : Name of the file that should be loaded into a table
* @return cudf::io::table_with_metadata
*/
cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type = FileTypes::Auto);
cudf::io::table_with_metadata load_table_from_file(const std::string& filename,
FileTypes file_type = FileTypes::Auto,
std::optional<bool> json_lines = std::nullopt);

/**
* @brief Returns the number of index columns in `data_table`, in practice this will be a `0` or `1`
Expand Down
10 changes: 8 additions & 2 deletions morpheus/_lib/include/morpheus/stages/file_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <mrc/types.hpp>
#include <pybind11/pytypes.h>
#include <pymrc/node.hpp>
#include <rxcpp/rx.hpp> // for apply, make_subscriber, observable_member, is_on_error<>::not_void, is_on_next_of<>::not_void, trace_activity

#include <map>
#include <memory>
#include <optional>
#include <string>
#include <thread>
#include <vector> // for vector
Expand Down Expand Up @@ -63,14 +65,16 @@ class FileSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageM
*
* @param filename : Name of the file from which the messages will be read
* @param repeat : Repeats the input dataset multiple times. Useful to extend small datasets for debugging
* @param json_lines: Whether to force json or jsonlines parsing
*/
FileSourceStage(std::string filename, int repeat = 1);
FileSourceStage(std::string filename, int repeat = 1, std::optional<bool> json_lines = std::nullopt);

private:
subscriber_fn_t build();

std::string m_filename;
int m_repeat{1};
std::optional<bool> m_json_lines;
};

/****** FileSourceStageInterfaceProxy***********************/
Expand All @@ -86,12 +90,14 @@ struct FileSourceStageInterfaceProxy
* @param name : Name of a stage reference
* @param filename : Name of the file from which the messages will be read.
* @param repeat : Repeats the input dataset multiple times. Useful to extend small datasets for debugging.
* @param parser_kwargs : Optional arguments to pass to the file parser.
* @return std::shared_ptr<mrc::segment::Object<FileSourceStage>>
*/
static std::shared_ptr<mrc::segment::Object<FileSourceStage>> init(mrc::segment::Builder& builder,
const std::string& name,
std::string filename,
int repeat = 1);
int repeat = 1,
pybind11::dict parser_kwargs = pybind11::dict());
};
#pragma GCC visibility pop
/** @} */ // end of group
Expand Down
7 changes: 5 additions & 2 deletions morpheus/_lib/src/io/deserializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ std::vector<std::string> get_column_names_from_table(const cudf::io::table_with_
return foreach_map(table.metadata.schema_info, [](auto schema) { return schema.name; });
}

cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type)
cudf::io::table_with_metadata load_table_from_file(const std::string& filename,
FileTypes file_type,
std::optional<bool> json_lines)
{
if (file_type == FileTypes::Auto)
{
Expand All @@ -65,7 +67,8 @@ cudf::io::table_with_metadata load_table_from_file(const std::string& filename,
switch (file_type)
{
case FileTypes::JSON: {
auto options = cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(true);
auto options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(json_lines.value_or(true));
table = cudf::io::read_json(options.build());
break;
}
Expand Down
25 changes: 20 additions & 5 deletions morpheus/_lib/src/stages/file_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@
#include "pymrc/node.hpp"

#include "morpheus/io/deserializers.hpp"
#include "morpheus/objects/file_types.hpp"
#include "morpheus/objects/table_info.hpp"
#include "morpheus/utilities/cudf_util.hpp"

#include <cudf/types.hpp>
#include <glog/logging.h>
#include <mrc/segment/builder.hpp>
#include <pybind11/cast.h>
#include <pybind11/gil.h>
#include <pybind11/pybind11.h> // for str_attr_accessor
#include <pybind11/pytypes.h> // for pybind11::int_

#include <functional>
#include <memory>
#include <optional>
#include <sstream>
#include <utility>
// IWYU thinks we need __alloc_traits<>::value_type for vector assignments
Expand All @@ -45,16 +48,17 @@
namespace morpheus {
// Component public implementations
// ************ FileSourceStage ************* //
FileSourceStage::FileSourceStage(std::string filename, int repeat) :
FileSourceStage::FileSourceStage(std::string filename, int repeat, std::optional<bool> json_lines) :
PythonSource(build()),
m_filename(std::move(filename)),
m_repeat(repeat)
m_repeat(repeat),
m_json_lines(json_lines)
{}

FileSourceStage::subscriber_fn_t FileSourceStage::build()
{
return [this](rxcpp::subscriber<source_type_t> output) {
auto data_table = load_table_from_file(m_filename);
auto data_table = load_table_from_file(m_filename, FileTypes::Auto, m_json_lines);
int index_col_count = prepare_df_index(data_table);

// Next, create the message metadata. This gets reused for repeats
Expand Down Expand Up @@ -112,9 +116,20 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build()

// ************ FileSourceStageInterfaceProxy ************ //
std::shared_ptr<mrc::segment::Object<FileSourceStage>> FileSourceStageInterfaceProxy::init(
mrc::segment::Builder& builder, const std::string& name, std::string filename, int repeat)
mrc::segment::Builder& builder,
const std::string& name,
std::string filename,
int repeat,
pybind11::dict parser_kwargs)
{
auto stage = builder.construct_object<FileSourceStage>(name, filename, repeat);
std::optional<bool> json_lines = std::nullopt;

if (parser_kwargs.contains("lines"))
{
json_lines = parser_kwargs["lines"].cast<bool>();
}

auto stage = builder.construct_object<FileSourceStage>(name, filename, repeat, json_lines);

return stage;
}
Expand Down
2 changes: 1 addition & 1 deletion morpheus/_lib/stages/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DeserializeStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, batch_size: int, ensure_sliceable_index: bool = True) -> None: ...
pass
class FileSourceStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int) -> None: ...
def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, parser_kwargs: dict) -> None: ...
pass
class FilterDetectionsStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ...
Expand Down
3 changes: 2 additions & 1 deletion morpheus/_lib/stages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ PYBIND11_MODULE(stages, _module)
py::arg("builder"),
py::arg("name"),
py::arg("filename"),
py::arg("repeat"));
py::arg("repeat"),
py::arg("parser_kwargs"));

py::class_<mrc::segment::Object<FilterDetectionsStage>,
mrc::segment::ObjectProperties,
Expand Down
2 changes: 1 addition & 1 deletion morpheus/_lib/tests/test_file_in_out.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ TEST_F(TestFileInOut, RoundTripCSV)
}
}

TEST_F(TestFileInOut, RoundTripJSON)
TEST_F(TestFileInOut, RoundTripJSONLines)
{
using nlohmann::json;
auto input_file = test::get_morpheus_root() / "tests/tests_data/filter_probs.jsonlines";
Expand Down
13 changes: 11 additions & 2 deletions morpheus/stages/input/file_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class FileSourceStage(PreallocatorMixin, SingleOutputSource):
filter_null : bool, default = True
Whether or not to filter rows with null 'data' column. Null values in the 'data' column can cause issues down
the line with processing. Setting this to True is recommended.
parser_kwargs : dict, default = {}
Extra options to pass to the file parser.
"""

def __init__(self,
Expand All @@ -65,7 +67,8 @@ def __init__(self,
iterative: bool = False,
file_type: FileTypes = FileTypes.Auto,
repeat: int = 1,
filter_null: bool = True):
filter_null: bool = True,
parser_kwargs: dict = None):

super().__init__(c)

Expand All @@ -74,6 +77,7 @@ def __init__(self,
self._filename = filename
self._file_type = file_type
self._filter_null = filter_null
self._parser_kwargs = parser_kwargs or {}

self._input_count = None
self._max_concurrent = c.num_threads
Expand Down Expand Up @@ -101,7 +105,11 @@ def _build_source(self, builder: mrc.Builder) -> StreamPair:

if self._build_cpp_node():
import morpheus._lib.stages as _stages
out_stream = _stages.FileSourceStage(builder, self.unique_name, self._filename, self._repeat_count)
out_stream = _stages.FileSourceStage(builder,
self.unique_name,
self._filename,
self._repeat_count,
self._parser_kwargs)
else:
out_stream = builder.make_source(self.unique_name, self._generate_frames())

Expand All @@ -115,6 +123,7 @@ def _generate_frames(self) -> typing.Iterable[MessageMeta]:
self._filename,
self._file_type,
filter_nulls=self._filter_null,
parser_kwargs=self._parser_kwargs,
df_type="cudf",
)

Expand Down
37 changes: 28 additions & 9 deletions tests/test_file_in_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from morpheus.messages import MultiMessage
from morpheus.pipeline import LinearPipeline
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
Expand All @@ -42,9 +43,9 @@
@pytest.mark.parametrize("flush", [False, True], ids=["no_flush", "flush"])
@pytest.mark.parametrize("repeat", [1, 2, 5], ids=["repeat1", "repeat2", "repeat5"])
def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat: int):
input_file = os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.{}'.format(input_type))
input_file = os.path.join(TEST_DIRS.tests_data_dir, f'filter_probs.{input_type}')
validation_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")
out_file = os.path.join(tmp_path, 'results.{}'.format(output_type))
out_file = os.path.join(tmp_path, f'results.{output_type}')

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=input_file, repeat=repeat))
Expand All @@ -62,7 +63,7 @@ def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat:
# The output data will contain an additional id column that we will need to slice off
output_data = np.loadtxt(out_file, delimiter=",", skiprows=1)
output_data = output_data[:, 1:]
elif output_type == "json" or output_type == "jsonlines": # assume json
elif output_type in ("json", "jsonlines"): # assume json
df = read_file_to_df(out_file, file_type=FileTypes.Auto)
output_data = df.values
elif output_type == "parquet":
Expand All @@ -76,6 +77,24 @@ def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat:
assert output_data.tolist() == validation_data.tolist()


def test_file_read_json(config):
src_file = os.path.join(TEST_DIRS.tests_data_dir, "simple.json")

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=src_file, parser_kwargs={"lines": False}))
sink_stage = pipe.add_stage(InMemorySinkStage(config))
pipe.run()

messages = sink_stage.get_messages()

assert (len(messages) == 1)

meta = messages[0]

assert (len(meta.df) == 4)
assert (len(meta.df.columns) == 3)


@pytest.mark.slow
@pytest.mark.use_python
@pytest.mark.usefixtures("chdir_tmpdir")
Expand All @@ -101,9 +120,9 @@ def test_to_file_no_path(tmp_path, config):
@pytest.mark.parametrize("input_type", ["csv", "jsonlines", "parquet"])
@pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"])
def test_file_rw_multi_segment_pipe(tmp_path, config, input_type, output_type):
input_file = os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.{}'.format(input_type))
input_file = os.path.join(TEST_DIRS.tests_data_dir, f'filter_probs.{input_type}')
validation_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")
out_file = os.path.join(tmp_path, 'results.{}'.format(output_type))
out_file = os.path.join(tmp_path, f'results.{output_type}')

if (input_type == "parquet"):
CppConfig.set_should_use_cpp(False)
Expand Down Expand Up @@ -163,10 +182,10 @@ def test_file_rw_index_pipe(tmp_path, config, input_file):
"include_header": True
}), (os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.jsonlines"), {})],
ids=["CSV", "CSV_ID", "JSON"])
def test_file_roundtrip(use_cpp, tmp_path, input_file, extra_kwargs):
def test_file_roundtrip(tmp_path, input_file, extra_kwargs):

# Output file should be same type as input
out_file = os.path.join(tmp_path, 'results{}'.format(os.path.splitext(input_file)[1]))
out_file = os.path.join(tmp_path, f'results{os.path.splitext(input_file)[1]}')

# Read the dataframe
df = read_file_to_df(input_file, df_type='cudf')
Expand Down Expand Up @@ -204,7 +223,7 @@ def test_read_cpp_compare(input_file: str):
@pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"])
def test_file_rw_serialize_deserialize_pipe(tmp_path, config, output_type):
input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")
out_file = os.path.join(tmp_path, 'results.{}'.format(output_type))
out_file = os.path.join(tmp_path, f'results.{output_type}')

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False))
Expand Down Expand Up @@ -234,7 +253,7 @@ def test_file_rw_serialize_deserialize_pipe(tmp_path, config, output_type):
@pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"])
def test_file_rw_serialize_deserialize_multi_segment_pipe(tmp_path, config, output_type):
input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")
out_file = os.path.join(tmp_path, 'results.{}'.format(output_type))
out_file = os.path.join(tmp_path, f'results.{output_type}')

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False))
Expand Down
3 changes: 3 additions & 0 deletions tests/tests_data/simple.json
Git LFS file not shown

0 comments on commit c285380

Please sign in to comment.