Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi dimension plots to analysis.outliers module. #805

Merged
merged 5 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ disable=raw-checker-failed,
deprecated-pragma,
use-symbolic-message-instead,
too-many-positional-arguments,
too-many-arguments,
too-many-statements,

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand Down
179 changes: 132 additions & 47 deletions msticpy/analysis/outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

import math
from typing import List, Tuple
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
Expand All @@ -33,12 +33,15 @@
) from imp_err

__version__ = VERSION
__author__ = "Ian Hellen"
__author__ = "Ian Hellen, Tatsuya Hasegawa"


# pylint: disable=invalid-name
def identify_outliers(
x: np.ndarray, x_predict: np.ndarray, contamination: float = 0.05
x: np.ndarray,
x_predict: np.ndarray,
contamination: float = 0.05,
max_features: Optional[Union[int, float]] = None,
) -> Tuple[IsolationForest, np.ndarray, np.ndarray]:
"""
Identify outlier items using SkLearn IsolationForest.
Expand All @@ -51,6 +54,10 @@ def identify_outliers(
Model
contamination : float
Percentage contamination (default: {0.05})
max_features : int or float, optional
Specifies max num or max rate of features
to be randomly selected when building each tree.
default: None => {math.floor(math.sqrt(cols))}

Returns
-------
Expand All @@ -64,8 +71,10 @@ def identify_outliers(

# fit the model
rows, cols = x.shape
max_samples = min(100, cols)
max_features = math.floor(math.sqrt(rows))
max_samples = min(100, rows)
if not max_features:
max_features = math.floor(math.sqrt(cols))

clf = IsolationForest(
max_samples=max_samples,
max_features=max_features,
Expand Down Expand Up @@ -109,52 +118,128 @@ def plot_outlier_results(
list of feature columns to display
plt_title : str
Plot title

"""
# plot the line, the samples, and the nearest vectors to the plane
x_max_x = x[:, 0].max() + (x[:, 0].max() / 10)
x_min_x = -x[:, 0].max() / 10
x_max_y = x[:, 1].max() + (x[:, 1].max() / 10)
x_min_y = -x[:, 1].max() / 10
xx, yy = np.meshgrid(
np.linspace(x_min_x, x_max_x, 100), np.linspace(x_min_y, x_max_y, 100)
)
z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
z = z.reshape(xx.shape)
if len(feature_columns) == 2:
# two dimension plot: mostly remain original codes from msticpy 2.14.0
# plot the line, the samples, and the nearest vectors to the plane
x_max_x = x[:, 0].max() + (x[:, 0].max() / 10)
x_min_x = -x[:, 0].max() / 10
x_max_y = x[:, 1].max() + (x[:, 1].max() / 10)
x_min_y = -x[:, 1].max() / 10
xx, yy = np.meshgrid(
np.linspace(x_min_x, x_max_x, 100), np.linspace(x_min_y, x_max_y, 100)
)
z = clf.decision_function(
np.c_[
xx.ravel(),
yy.ravel(),
np.zeros(
(xx.ravel().shape[0], clf.n_features_in_ - len(feature_columns))
),
]
)
z = z.reshape(xx.shape)

plt.rcParams["figure.figsize"] = (20, 10)
plt.rcParams["figure.figsize"] = (20, 10)

plt.title(plt_title)
# pylint: disable=no-member
plt.contourf(xx, yy, z, cmap=plt.cm.Blues_r) # type: ignore
plt.title(plt_title)
# pylint: disable=no-member
plt.contourf(xx, yy, z, cmap=plt.cm.Blues_r) # type: ignore

b1 = plt.scatter(x[:, 0], x[:, 1], c="white", s=20, edgecolor="k")
b2 = plt.scatter(x_predict[:, 0], x_predict[:, 1], c="green", s=40, edgecolor="k")
c = plt.scatter(
x_outliers[:, 0], x_outliers[:, 1], c="red", marker="x", s=200, edgecolor="k"
)
plt.axis("tight")

xp_max_x = x_predict[:, 0].max() + (x_predict[:, 0].max() / 10)
xp_min_x = -x_predict[:, 0].max() / 10
xp_max_y = x_predict[:, 1].max() + (x_predict[:, 1].max() / 10)
xp_min_y = -x_predict[:, 1].max() / 10

plt.xlim((xp_min_x, xp_max_x))
plt.ylim((xp_min_y, xp_max_y))
plt.xlabel(feature_columns[0]) # type: ignore
plt.ylabel(feature_columns[1]) # type: ignore

plt.legend(
[b1, b2, c],
[
"training observations",
"new regular observations",
"new abnormal observations",
],
loc="upper right",
)
plt.show()
b1 = plt.scatter(x[:, 0], x[:, 1], c="white", s=20, edgecolor="k")
b2 = plt.scatter(
x_predict[:, 0], x_predict[:, 1], c="green", s=40, edgecolor="k"
)
c = plt.scatter(x_outliers[:, 0], x_outliers[:, 1], c="red", marker="x", s=200)
plt.axis("tight")

xp_max_x = x_predict[:, 0].max() + (x_predict[:, 0].max() / 10)
xp_min_x = -x_predict[:, 0].max() / 10
xp_max_y = x_predict[:, 1].max() + (x_predict[:, 1].max() / 10)
xp_min_y = -x_predict[:, 1].max() / 10

plt.xlim((xp_min_x, xp_max_x))
plt.ylim((xp_min_y, xp_max_y))
plt.xlabel(feature_columns[0]) # type: ignore
plt.ylabel(feature_columns[1]) # type: ignore

plt.legend(
[b1, b2, c],
[
"training observations",
"new regular observations",
"new abnormal observations",
],
loc="upper right",
)
plt.show()

elif len(feature_columns) > 2: # multi dimension subplots
dimension_num = x.shape[1]
fig, axes = plt.subplots(
dimension_num, dimension_num, figsize=(20, 20), constrained_layout=True
)
for i in range(dimension_num):
for j in range(dimension_num):
if i != j:
# plot the line, the samples, and the nearest vectors to the plane
x_max_x = x[:, j].max() + (x[:, j].max() / 10)
x_min_x = -x[:, j].max() / 10
x_max_y = x[:, i].max() + (x[:, i].max() / 10)
x_min_y = -x[:, i].max() / 10
xx, yy = np.meshgrid(
np.linspace(x_min_x, x_max_x, 100),
np.linspace(x_min_y, x_max_y, 100),
)
z = clf.decision_function(
np.c_[
xx.ravel(),
yy.ravel(),
np.zeros((xx.ravel().shape[0], len(feature_columns) - 2)),
]
)
z = z.reshape(xx.shape)

# pylint: disable=no-member
axes[i, j].contourf(xx, yy, z, cmap=plt.cm.Blues_r) # type: ignore

b1 = axes[i, j].scatter(x[:, j], x[:, i], c="white", edgecolor="k")
b2 = axes[i, j].scatter(
x_predict[:, j], x_predict[:, i], c="green", edgecolor="k"
)
c = axes[i, j].scatter(
x_outliers[:, j], x_outliers[:, i], c="red", marker="x"
)

xp_max_x = x_predict[:, 0].max() + (x_predict[:, 0].max() / 10)
xp_min_x = -x_predict[:, 0].max() / 10
xp_max_y = x_predict[:, 1].max() + (x_predict[:, 1].max() / 10)
xp_min_y = -x_predict[:, 1].max() / 10

axes[i, j].axis(xmin=xp_min_x, xmax=xp_max_x)
axes[i, j].axis(ymin=xp_min_y, ymax=xp_max_y)
axes[i, j].set_xlabel(f"{feature_columns[j]}")
axes[i, j].set_ylabel(f"{feature_columns[i]}")

else:
# do not show the same features x,y each other.
axes[i, j].axis("off")

fig.suptitle(plt_title)
plt.legend(
[b1, b2, c],
[
"training observations",
"new regular observations",
"new abnormal observations",
],
facecolor="#0072BD",
framealpha=0.3,
)
plt.show()

else:
raise ValueError("plot_outlier_results function needs more than two features.")


def remove_common_items(data: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
Expand Down
Loading