Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified file source stage #1184

Open
wants to merge 20 commits into
base: branch-23.11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ def post_pipeline(ctx: click.Context, *args, **kwargs):
add_command("delay", "morpheus.stages.general.delay_stage.DelayStage", modes=ALL)
add_command("deserialize", "morpheus.stages.preprocess.deserialize_stage.DeserializeStage", modes=NOT_AE)
add_command("dropna", "morpheus.stages.preprocess.drop_null_stage.DropNullStage", modes=NOT_AE)
add_command("file-source", "morpheus.stages.input.file_source.FileSource", modes=NOT_AE)
add_command("filter", "morpheus.stages.postprocess.filter_detections_stage.FilterDetectionsStage", modes=ALL)
add_command("from-azure", "morpheus.stages.input.azure_source_stage.AzureSourceStage", modes=AE_ONLY)
add_command("from-appshield", "morpheus.stages.input.appshield_source_stage.AppShieldSourceStage", modes=FIL_ONLY)
Expand Down
319 changes: 319 additions & 0 deletions morpheus/stages/input/file_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""File source stage."""

import logging
import time
import typing
from functools import partial
from urllib.parse import urlsplit

import fsspec
import mrc
from mrc.core import operators as ops

from morpheus.cli import register_stage
from morpheus.common import FileTypes
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.directory_watcher import DirectoryWatcher

logger = logging.getLogger(__name__)


@register_stage("file-source", modes=[PipelineModes.FIL, PipelineModes.NLP, PipelineModes.OTHER])
class FileSource(PreallocatorMixin, SingleOutputSource):
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
"""
Load messages from a file.

Source stage is used to load messages from a file and dumping the contents into the pipeline immediately. Useful for
testing performance and accuracy of a pipeline.
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
config : `morpheus.config.Config`
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
Pipeline configuration instance.
files : List[str]
List of paths to be read from, can be a list of S3 URLs (`s3://path`) and can include wildcard characters `*`
as defined by `fsspec`:
https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
watch : bool, default = False
When True, will check `files` for new files and emit them as they appear. (Note: `watch_interval` is
applicable when `watch` is True and there are no remote paths in `files`.)
watch_interval : float, default = 1.0
When `watch` is True, this is the time in seconds between polling the paths in `files` for new files.
(Note: Applicable when path in `files` are remote and when `watch` is True)
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
sort_glob : bool, default = False
If true, the list of files matching `input_glob` will be processed in sorted order.
(Note: Applicable when all paths in `files` are local.)
recursive : bool, default = True
If true, events will be emitted for the files in subdirectories matching `input_glob`.
(Note: Applicable when all paths in `files` are local.)
queue_max_size : int, default = 128
Maximum queue size to hold the file paths to be processed that match `input_glob`.
(Note: Applicable when all paths in `files` are local.)
batch_timeout : float, default = 5.0
Timeout to retrieve batch messages from the queue.
(Note: Applicable when all paths in `files` are local.)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Several pieces of functionality diverge depending on whether or not we have local or remote files. It makes me wonder if we actually need separate classes for local/remote cases. It also begs the question: what happens if some of the files are local and some are remote?

Remark: My hunch is that what we really want is:

  • A local FileSource (with watching capability)
  • An S3FileSource (with watching capability)
  • A MergeStage to merge both Sources
  • A RepeatMessageStage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We perform a validation step for remote host paths, selecting the fsspec route if they exist, and subsequently confirming that they share the same protocol; otherwise, an error is raised.
It is implemented based on the description mentioned in this issue: #976

file_type : `morpheus.common.FileTypes`, optional, case_sensitive = False
Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension.
Supported extensions: 'csv', 'json', 'jsonlines' and 'parquet'.
repeat : int, default = 1, min = 1
Repeats the input dataset multiple times. Useful to extend small datasets for debugging.
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
filter_null : bool, default = True
Whether or not to filter rows with a null 'data' column. Null values in the 'data' column can cause issues down
the line with processing. Setting this to True is recommended.
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
parser_kwargs : dict, default = {}
Extra options to pass to the file parser.
"""

def __init__(self,
config: Config,
files: typing.List[str],
watch: bool = False,
watch_interval: float = 1.0,
sort_glob: bool = False,
recursive: bool = True,
queue_max_size: int = 128,
batch_timeout: float = 5.0,
file_type: FileTypes = FileTypes.Auto,
repeat: int = 1,
filter_null: bool = True,
parser_kwargs: dict = None):

super().__init__(config)

if not files:
raise ValueError("The 'files' cannot be empty.")
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved

if watch and len(files) != 1:
raise ValueError("When 'watch' is True, the 'files' should contain exactly one file path.")

self._files = list(files)
self._watch = watch
self._sort_glob = sort_glob
self._recursive = recursive
self._queue_max_size = queue_max_size
self._batch_timeout = batch_timeout
self._file_type = file_type
self._filter_null = filter_null
self._parser_kwargs = parser_kwargs or {}
self._watch_interval = watch_interval
self._repeat_count = repeat

@property
def name(self) -> str:
"""Return the name of the stage"""
return "file-source"

def supports_cpp_node(self) -> bool:
"""Indicates whether or not this stage supports a C++ node"""
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Without breaking this Stage in to multiple Stages as mentioned in a separate comment, implementing this Stage in C++ might be a big headache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's discuss about this


def _has_remote_paths(self) -> bool:
return any(urlsplit(file).scheme for file in self._files if "://" in file)
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved

def _build_source(self, builder: mrc.Builder) -> StreamPair:

if self._build_cpp_node():
raise RuntimeError("Does not support C++ nodes")

if self._watch and not self._has_remote_paths():
input_glob = self._files[0]
watcher = DirectoryWatcher(
input_glob=input_glob,
watch_directory=self._watch,
max_files=None, # This is not being used in the latest version.
sort_glob=self._sort_glob,
recursive=self._recursive,
queue_max_size=self._queue_max_size,
batch_timeout=self._batch_timeout)

out_stream = watcher.build_node(self.unique_name, builder)
out_type = list[str]
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
else:
if self._watch:
generator_function = self._polling_generate_frames_fsspec
else:
generator_function = self._generate_frames_fsspec

out_stream = builder.make_source(self.unique_name, generator_function())
out_type = fsspec.core.OpenFiles

# Supposed to just return a source here
return out_stream, out_type

def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]:

files: fsspec.core.OpenFiles = fsspec.open_files(self._files)

if (len(files) == 0):
raise RuntimeError(f"No files matched input strings: '{self._files}'. "
"Check your input pattern and ensure any credentials are correct")

if self._sort_glob:
files = sorted(files, key=lambda f: f.full_name)

yield files

def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]:
files_seen = set()
curr_time = time.monotonic()
next_update_epoch = curr_time

while (True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: @mdemoret-nv Is it possible this prevents proper shutdown? Should we add a cancellation token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an updated version to request shutdown by calling stop method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cwharris you are right, the updated version didn't resolve the issue. The pipeline terminated abruptly with Ctrl+C only a few times, possibly due to unrelated factors. However, in general, it's still not shutting down properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't knowingly witnessed a clean shutdown of a Morpheus pipeline through the use of ctrl+c. I should check to see if that's even possible (it should be, but might have limits).

# Before doing any work, find the next update epoch after the current time
while (next_update_epoch <= curr_time):
# Only ever add `self._watch_interval` to next_update_epoch so all updates are at repeating intervals
next_update_epoch += self._watch_interval

file_set = set()
filtered_files = []

files = fsspec.open_files(self._files)
for file in files:
file_set.add(file.full_name)
if file.full_name not in files_seen:
filtered_files.append(file)

# Replace files_seen with the new set of files. This prevents a memory leak that could occurr if files are
# deleted from the input directory. In addition if a file with a given name was created, seen/processed by
# the stage, and then deleted, and a new file with the same name appeared sometime later, the stage will
# need to re-ingest that new file.
files_seen = file_set

if len(filtered_files) > 0:
if self._sort_glob:
filtered_files = sorted(filtered_files, key=lambda f: f.full_name)

yield fsspec.core.OpenFiles(filtered_files, fs=files.fs)

curr_time = time.monotonic()

# If we spent more than `self._watch_interval` doing work and/or yielding to the output channel blocked,
# then we should only sleep for the remaining time until the next update epoch.
sleep_duration = next_update_epoch - curr_time
if (sleep_duration > 0):
time.sleep(sleep_duration)
curr_time = time.monotonic()

@staticmethod
def generate_frames(file: fsspec.core.OpenFile,
file_type: FileTypes,
filter_null: bool,
parser_kwargs: dict,
repeat_count: int) -> list[MessageMeta]:
"""
Generate message frames from a file.

This function reads data from a file and generates message frames (MessageMeta) based on the file's content.
It can be used to load and process messages from a file for testing and analysis within a Morpheus pipeline.

Parameters
----------
file : fsspec.core.OpenFile
An open file object using fsspec.
file_type : FileTypes
Indicates the type of the file to read. Supported types include 'csv', 'json', 'jsonlines', and 'parquet'.
filter_null : bool
Determines whether to filter out rows with null values in the 'data' column. Filtering null values is
recommended to prevent potential issues during processing.
parser_kwargs : dict
Additional keyword arguments to pass to the file parser.
repeat_count : int
The number of times to repeat the data reading process. Each repetition generates a new set of message
frames.

Returns
-------
List[MessageMeta]
MessageMeta objects, each containing a dataframe of messages from the file.
"""
df = read_file_to_df(
file.full_name,
file_type=file_type,
filter_nulls=filter_null,
parser_kwargs=parser_kwargs,
df_type="cudf",
)

metas = []

for i in range(repeat_count):

x = MessageMeta(df)

# If we are looping, copy the object. Do this before we push the object in case it changes
if (i + 1 < repeat_count):
df = df.copy()

# Shift the index to allow for unique indices without reading more data
df.index += len(df)

metas.append(x)

return metas

@staticmethod
def convert_to_fsspec_files(files: typing.Union[list[str], fsspec.core.OpenFiles]) -> fsspec.core.OpenFiles:
"""
Convert a list of file paths to fsspec OpenFiles.

This static method takes a list of file paths or an existing fsspec OpenFiles object and ensures that the
input is converted to an OpenFiles object for uniform handling in Morpheus pipeline stages.

Parameters
----------
files : Union[List[str], fsspec.core.OpenFiles]
A list of file paths or an existing fsspec OpenFiles object.

Returns
-------
fsspec.core.OpenFiles
An fsspec OpenFiles object representing the input files.
"""

# Convert fsspec open files
if not isinstance(files, fsspec.core.OpenFiles):
files = fsspec.open_files(files)

return files

def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair:

out_stream = out_pair[0]

post_node = builder.make_node(
self.unique_name + "-post",
ops.map(self.convert_to_fsspec_files),
ops.flatten(), # Flatten list of open fsspec files
ops.map(
partial(self.generate_frames,
file_type=self._file_type,
filter_null=self._filter_null,
parser_kwargs=self._parser_kwargs,
repeat_count=self._repeat_count)), # Generate dataframe for each file
ops.flatten())

builder.make_edge(out_stream, post_node)

out_stream = post_node
out_type = MessageMeta

return super()._post_build_single(builder, (out_stream, out_type))
Loading
Loading