Skip to content

Commit

Permalink
Fixing linting errors which could not be resolved in 23.07 (nv-morphe…
Browse files Browse the repository at this point in the history
…us#1082)

There were too many linting errors to fix in 23.07 burndown so a PR has been created to fix this in 23.11.

Closes nv-morpheus#1050

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)
  - Tad ZeMicheal (https://github.com/tzemicheal)

URL: nv-morpheus#1082
  • Loading branch information
mdemoret-nv authored Aug 23, 2023
1 parent c285380 commit a343c77
Show file tree
Hide file tree
Showing 106 changed files with 919 additions and 681 deletions.
4 changes: 2 additions & 2 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ dependencies:
- python-confluent-kafka=1.9.2
- python-graphviz
- python=3.10
- pytorch=2.0.1
- pytorch-cuda
- pytorch=2.0.1
- rapidjson=1.1.0
- scikit-build=0.17.1
- scikit-learn=1.2.2
- sphinx
- sphinx_rtd_theme
- sqlalchemy<2.0 # 2.0 is incompatible with pandas=1.3
- sysroot_linux-64=2.17
- tritonclient=2.26 # Required by NvTabular, force the version, so we get protobufs compatible with 4.21
- tqdm=4
- tritonclient=2.26 # Required by NvTabular, force the version, so we get protobufs compatible with 4.21
- typing_utils=0.1
- watchdog=2.1
- yapf=0.40.1
Expand Down
15 changes: 7 additions & 8 deletions examples/abp_pcap_detection/abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import cudf

import morpheus._lib.stages as _stages
from morpheus.cli.register_stage import register_stage
from morpheus.common import TypeId
from morpheus.config import Config
Expand Down Expand Up @@ -98,17 +97,17 @@ def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str],
df["timestamp"] = x.get_meta("timestamp").astype("int64")

def round_time_kernel(timestamp, rollup_time, secs):
for i, ts in enumerate(timestamp):
x = ts % secs
for i, time in enumerate(timestamp):
x = time % secs
y = 1 - (x / secs)
delta = y * secs
rollup_time[i] = ts + delta
rollup_time[i] = time + delta

df = df.apply_rows(
round_time_kernel,
incols=["timestamp"],
outcols=dict(rollup_time=np.int64),
kwargs=dict(secs=60000000),
outcols={"rollup_time": np.int64},
kwargs={"secs": 60000000},
)

df["rollup_time"] = cudf.to_datetime(df["rollup_time"], unit="us").dt.strftime("%Y-%m-%d %H:%M")
Expand Down Expand Up @@ -149,7 +148,7 @@ def round_time_kernel(timestamp, rollup_time, secs):
# syn/all - Number of flows with SYN flag to all flows
# fin/all - Number of flows with FIN flag to all flows
for col in ["rst", "syn", "fin"]:
dst_col = "{}/all".format(col)
dst_col = f"{col}/all"
grouped_df[dst_col] = grouped_df[col] / grouped_df["all"]

# Adding index column to retain the order of input messages.
Expand Down Expand Up @@ -197,4 +196,4 @@ def _get_preprocess_fn(self) -> typing.Callable[[MultiMessage], MultiInferenceMe
req_cols=self.req_cols)

def _get_preprocess_node(self, builder: mrc.Builder):
return _stages.AbpPcapPreprocessingStage(builder, self.unique_name)
raise NotImplementedError("C++ node not implemented for this stage")
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, config: Config, sep_token: str = '[SEP]'):
if config.mode != PipelineModes.NLP:
raise RuntimeError("RecipientFeaturesStage must be used in a pipeline configured for NLP")

if len(sep_token):
if len(sep_token) > 0:
self._sep_token = sep_token
else:
raise ValueError("sep_token cannot be an empty string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
import mrc
from mrc.core import operators as ops

from _lib import morpheus_example as morpheus_example_cpp
from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair


@register_stage("pass-thru")
@register_stage("pass-thru-cpp")
class PassThruStage(SinglePortStage):

@property
def name(self) -> str:
return "pass-thru"
return "pass-thru-cpp"

def accepted_types(self) -> typing.Tuple:
return (typing.Any, )
Expand All @@ -43,6 +42,9 @@ def on_data(self, message: typing.Any):

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
if self._build_cpp_node():
from . import morpheus_example as morpheus_example_cpp

# pylint: disable=c-extension-no-member
node = morpheus_example_cpp.PassThruStage(builder, self.unique_name)
else:
node = builder.make_node(self.unique_name, ops.map(self.on_data))
Expand Down
3 changes: 1 addition & 2 deletions examples/developer_guide/3_simple_cpp_stage/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import logging
import os

from pass_thru import PassThruStage

from _lib.pass_thru import PassThruStage
from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_json_lines_count(filename):
return len(lines)


def pytest_benchmark_update_json(config, benchmarks, output_json):
def pytest_benchmark_update_json(_, __, output_json):

curr_dir = path.dirname(path.abspath(__file__))

Expand Down Expand Up @@ -58,9 +58,9 @@ def pytest_benchmark_update_json(config, benchmarks, output_json):

elif "glob_path" in PIPELINES_CONF[bench["name"]]:
source_files_glob = path.join(curr_dir, PIPELINES_CONF[bench["name"]]["glob_path"])
for fn in glob.glob(source_files_glob):
line_count += get_json_lines_count(fn)
byte_count += path.getsize(fn)
for filename in glob.glob(source_files_glob):
line_count += get_json_lines_count(filename)
byte_count += path.getsize(filename)
elif "message_path" in PIPELINES_CONF[bench["name"]]:
source_message_glob = path.join(curr_dir, PIPELINES_CONF[bench["name"]]["message_path"])
for message_fn in glob.glob(source_message_glob):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import typing

import boto3
import dfp.modules # noqa: F401
import dfp.modules # noqa: F401 # pylint:disable=unused-import
import pytest
from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage
from dfp.stages.dfp_file_to_df import DFPFileToDataFrameStage
Expand All @@ -35,13 +35,13 @@
from dfp.utils.regex_utils import iso_date_regex
from dfp.utils.schema_utils import Schema

import morpheus.loaders # noqa: F401
import morpheus.modules # noqa: F401
import morpheus.loaders # noqa: F401 # pylint:disable=unused-import
import morpheus.modules # noqa: F401 # pylint:disable=unused-import
from benchmarks.benchmark_conf_generator import BenchmarkConfGenerator
from benchmarks.benchmark_conf_generator import load_json
from benchmarks.benchmark_conf_generator import set_mlflow_tracking_uri
from morpheus._lib.common import FileTypes
from morpheus._lib.common import FilterSource
from morpheus.common import FileTypes
from morpheus.common import FilterSource
from morpheus.config import Config
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.pipeline import Pipeline
Expand Down Expand Up @@ -77,7 +77,7 @@ def remove_cache(cache_dir: str):


def dfp_modules_pipeline(pipe_config: Config,
modules_conf: typing.Dict[str, any],
modules_conf: typing.Dict[str, typing.Any],
filenames: typing.List[str],
reuse_cache=False):

Expand All @@ -99,7 +99,7 @@ def dfp_modules_pipeline(pipe_config: Config,


def dfp_training_pipeline_stages(pipe_config: Config,
stages_conf: typing.Dict[str, any],
stages_conf: typing.Dict[str, typing.Any],
source_schema: DataFrameInputSchema,
preprocess_schema: DataFrameInputSchema,
filenames: typing.List[str],
Expand Down Expand Up @@ -150,7 +150,7 @@ def dfp_training_pipeline_stages(pipe_config: Config,


def dfp_inference_pipeline_stages(pipe_config: Config,
stages_conf: typing.Dict[str, any],
stages_conf: typing.Dict[str, typing.Any],
source_schema: DataFrameInputSchema,
preprocess_schema: DataFrameInputSchema,
filenames: typing.List[str],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def process_events(message: MultiAEMessage):
# df = message.get_meta()
# df['event_time'] = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
# df.replace(np.nan, 'NaN', regex=True, inplace=True)
# TODO figure out why we are not able to set meta for a whole dataframe, but works for single column.
# TODO(Devin): figure out why we are not able to set meta for a whole dataframe, but works for single column.
# message.set_meta(None, df)
message.set_meta("event_time", datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger(f"morpheus.{__name__}")

TimestampFileObj = namedtuple("TimestampFileObj", ["timestamp", "file_object"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ..utils.model_cache import ModelCache
from ..utils.model_cache import ModelManager

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger(f"morpheus.{__name__}")


class DFPInferenceStage(SinglePortStage):
Expand Down Expand Up @@ -93,12 +93,12 @@ def on_data(self, message: MultiDFPMessage) -> MultiDFPMessage:
model_cache = self.get_model(user_id)

if (model_cache is None):
raise RuntimeError("Could not find model for user {}".format(user_id))
raise RuntimeError(f"Could not find model for user {user_id}")

loaded_model = model_cache.load_model(self._client)

except Exception: # TODO
logger.exception("Error trying to get model")
except Exception:
logger.exception("Error trying to get model", exc_info=True)
return None

post_model_time = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
# Setup conda environment
conda_env = {
'channels': ['defaults', 'conda-forge'],
'dependencies': ['python={}'.format('3.10'), 'pip'],
'dependencies': ['python=3.10', 'pip'],
'pip': ['mlflow'],
'name': 'mlflow-env'
}
Expand Down Expand Up @@ -131,7 +131,8 @@ def _apply_model_permissions(self, reg_model_name: str):

get_registered_model_response = requests.get(url=get_registered_model_url,
headers=headers,
params={"name": reg_model_name})
params={"name": reg_model_name},
timeout=10)

registered_model_response = get_registered_model_response.json()

Expand All @@ -150,7 +151,8 @@ def _apply_model_permissions(self, reg_model_name: str):

requests.patch(url=patch_registered_model_permissions_url,
headers=headers,
json=patch_registered_model_permissions_body)
json=patch_registered_model_permissions_body,
timeout=10)

except Exception:
logger.exception("Error occurred trying to apply model permissions to model: %s",
Expand Down Expand Up @@ -194,14 +196,14 @@ def on_data(self, message: MultiAEMessage):
metrics_dict: typing.Dict[str, float] = {}

# Add info on the embeddings
for k, v in model.categorical_fts.items():
embedding = v.get("embedding", None)
for key, value in model.categorical_fts.items():
embedding = value.get("embedding", None)

if (embedding is None):
continue

metrics_dict[f"embedding-{k}-num_embeddings"] = embedding.num_embeddings
metrics_dict[f"embedding-{k}-embedding_dim"] = embedding.embedding_dim
metrics_dict[f"embedding-{key}-num_embeddings"] = embedding.num_embeddings
metrics_dict[f"embedding-{key}-embedding_dim"] = embedding.embedding_dim

mlflow.log_metrics(metrics_dict)

Expand Down Expand Up @@ -252,12 +254,12 @@ def on_data(self, message: MultiAEMessage):
}

# Now create the model version
mv = client.create_model_version(name=reg_model_name,
source=model_src,
run_id=run.info.run_id,
tags=tags)
model_version = client.create_model_version(name=reg_model_name,
source=model_src,
run_id=run.info.run_id,
tags=tags)

logger.debug("ML Flow model upload complete: %s:%s:%s", user, reg_model_name, mv.version)
logger.debug("ML Flow model upload complete: %s:%s:%s", user, reg_model_name, model_version.version)

except Exception:
logger.exception("Error uploading model to ML Flow", exc_info=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger(f"morpheus.{__name__}")


class DFPPostprocessingStage(SinglePortStage):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ..utils.cached_user_window import CachedUserWindow
from ..utils.logging_timer import log_time

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger(f"morpheus.{__name__}")


class DFPRollingWindowStage(SinglePortStage):
Expand Down Expand Up @@ -124,8 +124,8 @@ def _build_window(self, message: DFPMessageMeta) -> MultiDFPMessage:

if (not user_cache.append_dataframe(incoming_df=incoming_df)):
# Then our incoming dataframe wasnt even covered by the window. Generate warning
logger.warn(("Incoming data preceeded existing history. "
"Consider deleting the rolling window cache and restarting."))
logger.warning(("Incoming data preceeded existing history. "
"Consider deleting the rolling window cache and restarting."))
return None

# Exit early if we dont have enough data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ..messages.multi_dfp_message import DFPMessageMeta
from ..utils.logging_timer import log_time

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger(f"morpheus.{__name__}")


class DFPSplitUsersStage(SinglePortStage):
Expand Down Expand Up @@ -114,6 +114,7 @@ def extract_users(self, message: DataFrameType) -> typing.List[DFPMessageMeta]:

if (self._include_individual):

# pylint: disable=unnecessary-comprehension
split_dataframes.update({
username: user_df
for username,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from ..messages.multi_dfp_message import MultiDFPMessage

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger(f"morpheus.{__name__}")


class DFPTraining(SinglePortStage):
Expand Down Expand Up @@ -71,7 +71,7 @@ def __init__(self, c: Config, model_kwargs: dict = None, epochs=30, validation_s

self._epochs = epochs

if (validation_size >= 0.0 and validation_size < 1.0):
if (0.0 <= validation_size < 1.0):
self._validation_size = validation_size
else:
raise ValueError(f"validation_size={validation_size} should be a positive float in the (0, 1) range")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(self,
self._model_name_formatter = f"DFP-{source}-" + "{user_id}"
self._experiment_name_formatter = f"dfp/{source}/training/" + "{reg_model_name}"

def verify_init(func):
def verify_init(self, func):

def wrapper(self, *args, **kwargs):
if not self._initialized:
Expand Down Expand Up @@ -134,7 +134,7 @@ def experiment_name_formatter(self):
return self._experiment_name_formatter

def _set_include_generic(self):
self._include_generic = self._train_users == "all" or self._train_users == "generic"
self._include_generic = self._train_users in ('all', 'generic')

def _set_include_individual(self):
self._include_individual = self._train_users != "generic"
Expand All @@ -150,9 +150,9 @@ def _create_time_fields(self, duration) -> TimeFields:

end_time = self._start_time + duration

tf = TimeFields(self._start_time, end_time)
time_fields = TimeFields(self._start_time, end_time)

return tf
return time_fields

def _set_mlflow_tracking_uri(self):
if self._tracking_uri is None:
Expand Down
Loading

0 comments on commit a343c77

Please sign in to comment.