# -*- coding: utf-8 -*-
import os
from typing import Dict, Optional, Union
import numpy as np
import pandas as pd
from rich import print
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from ..constants import MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH
from ..utils.base import clear_output, save_data, save_fig
from ._base import WorkflowBase
from .func.algo_anomalydetection._common import density_estimation, scatter2d, scatter3d
from .func.algo_anomalydetection._enum import AnormalyDetectionCommonFunction, LocalOutlierFactorSpecialFunction
from .func.algo_anomalydetection._iforest import isolation_forest_manual_hyper_parameters
from .func.algo_anomalydetection._local_outlier_factor import local_outlier_factor_manual_hyper_parameters, plot_lof_scores
[docs]
class AnomalyDetectionWorkflowBase(WorkflowBase):
"""The base workflow class of anomaly detection algorithms."""
common_function = [func.value for func in AnormalyDetectionCommonFunction]
def __init__(self) -> None:
super().__init__()
self.mode = "Anomaly Detection"
self.anomaly_detection_result = None
[docs]
def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> None:
"""Fit the model by Scikit-learn framework."""
self.X = X
self.model.fit(X)
[docs]
def predict(self, X: pd.DataFrame) -> np.ndarray:
"""Perform Anomaly Detection on samples in X by Scikit-learn framework."""
y_predict = self.model.predict(X)
return y_predict
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
return dict()
@staticmethod
def _detect_data(X: pd.DataFrame, name_column: str, detect_label: np.ndarray) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Merge the detection results into the source data.
Parameters
----------
X : pd.DataFrame
The original data.
name_column: str
Name of data.
detect_label : np.ndarray
The detection labels for each data point.
Returns
-------
X_anomaly_detection : pd.DataFrame
DataFrame containing the original data with detection results.
X_normal : pd.DataFrame
DataFrame containing the normal data points.
X_anomaly : pd.DataFrame
DataFrame containing the anomaly data points.
name_normal : str
Name of normal data.
name_abnormal
Name of anomaly data.
"""
X_anomaly_detection = X.copy()
# Merge detection results into the source data
X_anomaly_detection["is_abnormal"] = detect_label
X_normal = X_anomaly_detection[X_anomaly_detection["is_abnormal"] == 1]
name_normal = name_column[X_anomaly_detection["is_abnormal"] == 1]
X_abnormal = X_anomaly_detection[X_anomaly_detection["is_abnormal"] == -1]
name_abnormal = name_column[X_anomaly_detection["is_abnormal"] == -1]
return X_anomaly_detection, X_normal, X_abnormal, name_normal, name_abnormal
@staticmethod
def _density_estimation(data: pd.DataFrame, name_column: str, labels: pd.DataFrame, graph_name: str, algorithm_name: str, local_path: str, mlflow_path: str) -> None:
"""Plot the density estimation diagram of the anomaly detection result."""
print(f"-----* {graph_name} *-----")
density_estimation(data, labels, algorithm_name=algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, name_column, f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
@staticmethod
def _scatter2d(data: pd.DataFrame, name_column: str, labels: pd.DataFrame, algorithm_name: str, graph_name: str, local_path: str, mlflow_path: str) -> None:
"""Plot the two-dimensional diagram of the anomaly detection result."""
print(f"-----* {graph_name} *-----")
scatter2d(data, labels, algorithm_name=algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, name_column, f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
@staticmethod
def _scatter3d(data: pd.DataFrame, name_column: str, labels: pd.DataFrame, algorithm_name: str, graph_name: str, local_path: str, mlflow_path: str) -> None:
"""Plot the three-dimensional diagram of the anomaly detection result."""
print(f"-----* {graph_name} *-----")
scatter3d(data, labels, algorithm_name=algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, name_column, f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
[docs]
def common_components(self) -> None:
"""Invoke all common application functions for anomaly detection algorithms by Scikit-learn framework."""
GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH")
if self.X.shape[1] >= 3:
two_dimen_axis_index, two_dimen_data = self.choose_dimension_data(self.X, 2)
self._scatter2d(
data=two_dimen_data,
name_column=self.name_all,
labels=self.anomaly_detection_result,
algorithm_name=self.naming,
graph_name=AnormalyDetectionCommonFunction.PLOT_SCATTER_2D.value,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
three_dimen_axis_index, three_dimen_data = self.choose_dimension_data(self.X, 3)
self._scatter3d(
data=three_dimen_data,
name_column=self.name_all,
labels=self.anomaly_detection_result,
algorithm_name=self.naming,
graph_name=AnormalyDetectionCommonFunction.PLOT_SCATTER_3D.value,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
self._density_estimation(
data=self.X,
name_column=self.name_all,
labels=self.anomaly_detection_result,
algorithm_name=self.naming,
graph_name=AnormalyDetectionCommonFunction.DENSITY_ESTIMATION.value,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
[docs]
class IsolationForestAnomalyDetection(AnomalyDetectionWorkflowBase):
"""The automation workflow of using Isolation Forest algorithm to make insightful products."""
name = "Isolation Forest"
special_function = []
def __init__(
self,
n_estimators: int = 100,
max_samples: Union[str, int, float] = "auto",
contamination: Union[str, float] = "auto",
max_features: Union[int, float] = 1.0,
bootstrap: bool = False,
n_jobs: Optional[int] = None,
random_state: Optional[int] = None,
verbose: int = 0,
warm_start: bool = False,
) -> None:
"""
Isolation Forest Algorithm.
Return the anomaly score of each sample using the IsolationForest algorithm
The IsolationForest 'isolates' observations by randomly selecting a feature
and then randomly selecting a split value between the maximum and minimum
values of the selected feature.
Since recursive partitioning can be represented by a tree structure, the
number of splittings required to isolate a sample is equivalent to the path
length from the root node to the terminating node.
This path length, averaged over a forest of such random trees, is a
measure of normality and our decision function.
Random partitioning produces noticeably shorter paths for anomalies.
Hence, when a forest of random trees collectively produce shorter path
lengths for particular samples, they are highly likely to be anomalies.
Read more in the :ref:`User Guide <isolation_forest>`.
.. versionadded:: 0.18
Parameters
----------
n_estimators : int, default=100
The number of base estimators in the ensemble.
max_samples : "auto", int or float, default="auto"
The number of samples to draw from X to train each base estimator.
- If int, then draw `max_samples` samples.
- If float, then draw `max_samples * X.shape[0]` samples.
- If "auto", then `max_samples=min(256, n_samples)`.
If max_samples is larger than the number of samples provided,
all samples will be used for all trees (no sampling).
contamination : 'auto' or float, default='auto'
The amount of contamination of the data set, i.e. the proportion
of outliers in the data set. Used when fitting to define the threshold
on the scores of the samples.
- If 'auto', the threshold is determined as in the
original paper.
- If float, the contamination should be in the range (0, 0.5].
.. versionchanged:: 0.22
The default value of ``contamination`` changed from 0.1
to ``'auto'``.
max_features : int or float, default=1.0
The number of features to draw from X to train each base estimator.
- If int, then draw `max_features` features.
- If float, then draw `max(1, int(max_features * n_features_in_))` features.
Note: using a float number less than 1.0 or integer less than number of
features will enable feature subsampling and leads to a longer runtime.
bootstrap : bool, default=False
If True, individual trees are fit on random subsets of the training
data sampled with replacement. If False, sampling without replacement
is performed.
n_jobs : int, default=None
The number of jobs to run in parallel for both :meth:`fit` and
:meth:`predict`. ``None`` means 1 unless in a
:obj:`joblib.parallel_backend` context. ``-1`` means using all
processors. See :term:`Glossary <n_jobs>` for more details.
random_state : int, RandomState instance or None, default=None
Controls the pseudo-randomness of the selection of the feature
and split values for each branching step and each tree in the forest.
Pass an int for reproducible results across multiple function calls.
See :term:`Glossary <random_state>`.
verbose : int, default=0
Controls the verbosity of the tree building process.
warm_start : bool, default=False
When set to ``True``, reuse the solution of the previous call to fit
and add more estimators to the ensemble, otherwise, just fit a whole
new forest. See :term:`the Glossary <warm_start>`.
.. versionadded:: 0.21
References
----------
Scikit-learn API: sklearn.ensemble.IsolationForest
https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html#
"""
super().__init__()
self.n_estimators = n_estimators
self.max_samples = max_samples
self.contamination = contamination
self.max_features = max_features
self.bootstrap = bootstrap
self.n_jobs = n_jobs
self.verbose = verbose
self.warm_start = warm_start
if random_state:
self.random_state = random_state
# If 'random_state' is None, 'self.random_state' comes from the parent class 'WorkflowBase'
self.model = IsolationForest(
n_estimators=self.n_estimators,
max_samples=self.max_samples,
contamination=self.contamination,
max_features=self.max_features,
bootstrap=self.bootstrap,
n_jobs=self.n_jobs,
random_state=self.random_state,
verbose=self.verbose,
warm_start=self.warm_start,
)
self.naming = IsolationForestAnomalyDetection.name
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"[bold green]-*-*- {cls.name} - Hyper-parameters Specification -*-*-[/bold green]")
hyper_parameters = isolation_forest_manual_hyper_parameters()
clear_output()
return hyper_parameters
[docs]
def special_components(self, **kwargs) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
pass
[docs]
class LocalOutlierFactorAnomalyDetection(AnomalyDetectionWorkflowBase):
"""The automation workflow of using Local Outlier Factor algorithm to make insightful products."""
name = "Local Outlier Factor"
special_function = [func.value for func in LocalOutlierFactorSpecialFunction]
def __init__(
self,
n_neighbors: int = 20,
algorithm: str = "auto",
leaf_size: int = 30,
metric: Union[str, callable] = "minkowski",
p: float = 2.0,
metric_params: dict = None,
contamination: Union[str, float] = "auto",
novelty: bool = True, # Change this variable from False to True inorder to make this function work
n_jobs: int = None,
) -> None:
"""
Unsupervised Outlier Detection using the Local Outlier Factor (LOF).
The anomaly score of each sample is called the Local Outlier Factor.
It measures the local deviation of the density of a given sample with respect
to its neighbors.
It is local in that the anomaly score depends on how isolated the object
is with respect to the surrounding neighborhood.
More precisely, locality is given by k-nearest neighbors, whose distance
is used to estimate the local density.
By comparing the local density of a sample to the local densities of its
neighbors, one can identify samples that have a substantially lower density
than their neighbors. These are considered outliers.
.. versionadded:: 0.19
Parameters
----------
n_neighbors : int, default=20
Number of neighbors to use by default for :meth:`kneighbors` queries.
If n_neighbors is larger than the number of samples provided,
all samples will be used.
algorithm : {'auto', 'ball_tree', 'kd_tree', 'brute'}, default='auto'
Algorithm used to compute the nearest neighbors:
- 'ball_tree' will use :class:`BallTree`
- 'kd_tree' will use :class:`KDTree`
- 'brute' will use a brute-force search.
- 'auto' will attempt to decide the most appropriate algorithm
based on the values passed to :meth:`fit` method.
Note: fitting on sparse input will override the setting of
this parameter, using brute force.
leaf_size : int, default=30
Leaf is size passed to :class:`BallTree` or :class:`KDTree`. This can
affect the speed of the construction and query, as well as the memory
required to store the tree. The optimal value depends on the
nature of the problem.
metric : str or callable, default='minkowski'
Metric to use for distance computation. Default is "minkowski", which
results in the standard Euclidean distance when p = 2. See the
documentation of `scipy.spatial.distance
<https://docs.scipy.org/doc/scipy/reference/spatial.distance.html>`_ and
the metrics listed in
:class:`~sklearn.metrics.pairwise.distance_metrics` for valid metric
values.
If metric is "precomputed", X is assumed to be a distance matrix and
must be square during fit. X may be a :term:`sparse graph`, in which
case only "nonzero" elements may be considered neighbors.
If metric is a callable function, it takes two arrays representing 1D
vectors as inputs and must return one value indicating the distance
between those vectors. This works for Scipy's metrics, but is less
efficient than passing the metric name as a string.
p : float, default=2
Parameter for the Minkowski metric from
:func:`sklearn.metrics.pairwise_distances`. When p = 1, this
is equivalent to using manhattan_distance (l1), and euclidean_distance
(l2) for p = 2. For arbitrary p, minkowski_distance (l_p) is used.
metric_params : dict, default=None
Additional keyword arguments for the metric function.
contamination : 'auto' or float, default='auto'
The amount of contamination of the data set, i.e. the proportion
of outliers in the data set. When fitting this is used to define the
threshold on the scores of the samples.
- if 'auto', the threshold is determined as in the
original paper,
- if a float, the contamination should be in the range (0, 0.5].
.. versionchanged:: 0.22
The default value of ``contamination`` changed from 0.1
to ``'auto'``.
novelty : bool, default=False
By default, LocalOutlierFactor is only meant to be used for outlier
detection (novelty=False). Set novelty to True if you want to use
LocalOutlierFactor for novelty detection. In this case be aware that
you should only use predict, decision_function and score_samples
on new unseen data and not on the training set; and note that the
results obtained this way may differ from the standard LOF results.
.. versionadded:: 0.20
n_jobs : int, default=None
The number of parallel jobs to run for neighbors search.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors. See :term:`Glossary <n_jobs>`
for more details.
References
----------
Scikit-learn API: sklearn.neighbors.LocalOutlierFactor
https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.LocalOutlierFactor.html#
"""
super().__init__()
self.n_neighbors = n_neighbors
self.algorithm = algorithm
self.leaf_size = leaf_size
self.metric = metric
self.p = p
self.metric_params = metric_params
self.contamination = contamination
self.novelty = novelty
self.n_jobs = n_jobs
self.model = LocalOutlierFactor(
n_neighbors=self.n_neighbors,
algorithm=self.algorithm,
leaf_size=self.leaf_size,
metric=self.metric,
p=self.p,
metric_params=self.metric_params,
contamination=self.contamination,
novelty=self.novelty,
n_jobs=self.n_jobs,
)
self.naming = LocalOutlierFactorAnomalyDetection.name
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"[bold green]-*-*- {cls.name} - Hyper-parameters Specification -*-*-[/bold green]")
hyper_parameters = local_outlier_factor_manual_hyper_parameters()
clear_output()
return hyper_parameters
@staticmethod
def _plot_lof_scores(X_train: pd.DataFrame, name_column_train: str, lof_scores: np.ndarray, graph_name: str, image_config: dict, algorithm_name: str, local_path: str, mlflow_path: str) -> None:
"""Draw the LOF scores bar diagram."""
print(f"-----* {graph_name} *-----")
columns_name = X_train.index
data = plot_lof_scores(columns_name, lof_scores, image_config)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
save_data(data, name_column_train, f"{graph_name} - {algorithm_name}", local_path, mlflow_path, True)
[docs]
def special_components(self, **kwargs) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH")
lof_scores = self.model.negative_outlier_factor_
self._plot_lof_scores(
X_train=self.X_train,
name_column_train=self.name_all,
lof_scores=lof_scores,
image_config=self.image_config,
algorithm_name=self.naming,
graph_name=LocalOutlierFactorSpecialFunction.PLOT_LOF_SCORE.value,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)