# -*- coding: utf-8 -*-
import json
import os
from typing import Dict, Optional, Union
import mlflow
import numpy as np
import pandas as pd
from numpy.typing import ArrayLike
from rich import print
from sklearn.cluster import DBSCAN, AffinityPropagation, AgglomerativeClustering, KMeans, MeanShift
from ..constants import MLFLOW_ARTIFACT_DATA_PATH, MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH
from ..utils.base import clear_output, save_data, save_fig, save_text
from ._base import ClusteringMetricsMixin, WorkflowBase
from .func.algo_clustering._affinitypropagation import affinitypropagation_manual_hyper_parameters
from .func.algo_clustering._agglomerative import agglomerative_manual_hyper_parameters
from .func.algo_clustering._common import plot_silhouette_diagram, plot_silhouette_value_diagram, scatter2d, scatter3d, score
from .func.algo_clustering._dbscan import dbscan_manual_hyper_parameters
from .func.algo_clustering._enum import ClusteringCommonFunction, KMeansSpecialFunction, MeanShiftSpecialFunction
from .func.algo_clustering._kmeans import kmeans_manual_hyper_parameters
from .func.algo_clustering._meanshift import meanshift_manual_hyper_parameters
[docs]
class ClusteringWorkflowBase(WorkflowBase):
"""The base workflow class of clustering algorithms."""
common_function = [func.value for func in ClusteringCommonFunction]
def __init__(self):
super().__init__()
self.cluster_labels = None
self.cluster_centers = None
self.mode = "Clustering"
[docs]
def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> None:
"""Fit the model according to the given training data."""
self.X = X
self.model.fit(X)
mlflow.log_params(self.model.get_params())
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
return dict()
@staticmethod
def _get_cluster_centers(func_name: str, trained_model: object, name_column: str, algorithm_name: str, local_path: str, mlflow_path: str) -> Optional[pd.DataFrame]:
"""Get the cluster centers."""
print(f"-----* {func_name} *-----")
cluster_centers = getattr(trained_model, "cluster_centers_", None)
if cluster_centers is None:
print("This algorithm does not provide cluster centers")
else:
column_name = []
for i in range(cluster_centers.shape[1]):
column_name.append(f"Dimension {i+1}")
print(cluster_centers)
cluster_centers = pd.DataFrame(cluster_centers, columns=column_name)
save_data(cluster_centers, name_column, f"{func_name} - {algorithm_name}", local_path, mlflow_path)
return cluster_centers
@staticmethod
def _get_cluster_labels(func_name: str, trained_model: object, name_column: str, algorithm_name: str, local_path: str, mlflow_path: str) -> pd.DataFrame:
"""Get the cluster labels."""
print(f"-----* {func_name} *-----")
cluster_label = pd.DataFrame(trained_model.labels_, columns=[func_name])
print(cluster_label)
save_data(cluster_label, name_column, f"{func_name} - {algorithm_name}", local_path, mlflow_path)
return cluster_label
@staticmethod
def _score(data: pd.DataFrame, labels: pd.Series, func_name: str, algorithm_name: str, store_path: str) -> None:
"""Calculate the score of the model."""
print(f"-----* {func_name} *-----")
scores = score(data, labels)
scores_str = json.dumps(scores, indent=4)
save_text(scores_str, f"{func_name} - {algorithm_name}", store_path)
mlflow.log_metrics(scores)
@staticmethod
def _scatter2d(data: pd.DataFrame, labels: pd.Series, name_column: str, cluster_centers_: pd.DataFrame, algorithm_name: str, local_path: str, mlflow_path: str, graph_name: str) -> None:
"""Plot the two-dimensional diagram of the clustering result."""
print(f"-----* {graph_name} *-----")
scatter2d(data, labels, cluster_centers_, algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, name_column, f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
@staticmethod
def _scatter3d(data: pd.DataFrame, labels: pd.Series, name_column: str, algorithm_name: str, local_path: str, mlflow_path: str, graph_name: str) -> None:
"""Plot the three-dimensional diagram of the clustering result."""
print(f"-----* {graph_name} *-----")
scatter3d(data, labels, algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, name_column, f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
@staticmethod
def _plot_silhouette_diagram(
data: pd.DataFrame, labels: pd.Series, name_column: str, model: object, cluster_centers_: np.ndarray, algorithm_name: str, local_path: str, mlflow_path: str, graph_name: str
) -> None:
"""Plot the silhouette diagram of the clustering result."""
print(f"-----* {graph_name} *-----")
plot_silhouette_diagram(data, labels, cluster_centers_, model, algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, name_column, f"{graph_name} - Data With Labels", local_path, mlflow_path)
if not isinstance(cluster_centers_, str):
cluster_center_data = pd.DataFrame(cluster_centers_, columns=data.columns)
save_data(cluster_center_data, name_column, f"{graph_name} - Cluster Centers", local_path, mlflow_path)
@staticmethod
def _plot_silhouette_value_diagram(data: pd.DataFrame, labels: pd.Series, name_column: str, algorithm_name: str, local_path: str, mlflow_path: str, graph_name: str) -> None:
"""Plot the silhouette value diagram of the clustering result."""
print(f"-----* {graph_name} *-----")
plot_silhouette_value_diagram(data, labels, algorithm_name)
save_fig(f"{graph_name} - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, labels], axis=1)
save_data(data_with_labels, name_column, f"{graph_name} - Data With Labels", local_path, mlflow_path)
[docs]
def common_components(self) -> None:
"""Invoke all common application functions for clustering algorithms."""
GEOPI_OUTPUT_METRICS_PATH = os.getenv("GEOPI_OUTPUT_METRICS_PATH")
GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH")
GEOPI_OUTPUT_ARTIFACTS_DATA_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_DATA_PATH")
self.cluster_centers = self._get_cluster_centers(
func_name=ClusteringCommonFunction.CLUSTER_CENTERS.value,
trained_model=self.model,
name_column=self.name_all,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_DATA_PATH,
mlflow_path=MLFLOW_ARTIFACT_DATA_PATH,
)
self.cluster_labels = self._get_cluster_labels(
func_name=ClusteringCommonFunction.CLUSTER_LABELS.value,
trained_model=self.model,
name_column=self.name_all,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_DATA_PATH,
mlflow_path=MLFLOW_ARTIFACT_DATA_PATH,
)
self._score(
data=self.X,
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
func_name=ClusteringCommonFunction.MODEL_SCORE.value,
algorithm_name=self.naming,
store_path=GEOPI_OUTPUT_METRICS_PATH,
)
if self.X.shape[1] >= 3:
# choose two of dimensions to draw
two_dimen_axis_index, two_dimen_data = self.choose_dimension_data(self.X, 2)
self._scatter2d(
data=two_dimen_data,
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
name_column=self.name_all,
cluster_centers_=self.cluster_centers,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
graph_name=ClusteringCommonFunction.CLUSTER_TWO_DIMENSIONAL_DIAGRAM.value,
)
# choose three of dimensions to draw
three_dimen_axis_index, three_dimen_data = self.choose_dimension_data(self.X, 3)
self._scatter3d(
data=three_dimen_data,
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
name_column=self.name_all,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
graph_name=ClusteringCommonFunction.CLUSTER_THREE_DIMENSIONAL_DIAGRAM.value,
)
elif self.X.shape[1] == 3:
# choose two of dimensions to draw
two_dimen_axis_index, two_dimen_data = self.choose_dimension_data(self.X, 2)
self._scatter2d(
data=two_dimen_data,
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
name_column=self.name_all,
cluster_centers_=self.cluster_centers,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
graph_name=ClusteringCommonFunction.CLUSTER_TWO_DIMENSIONAL_DIAGRAM.value,
)
# no need to choose
self._scatter3d(
data=self.X,
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
name_column=self.name_all,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
graph_name=ClusteringCommonFunction.CLUSTER_THREE_DIMENSIONAL_DIAGRAM.value,
)
elif self.X.shape[1] == 2:
self._scatter2d(
data=self.X,
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
name_column=self.name_all,
cluster_centers_=self.cluster_centers,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
graph_name=ClusteringCommonFunction.CLUSTER_TWO_DIMENSIONAL_DIAGRAM.value,
)
else:
pass
self._plot_silhouette_diagram(
data=self.X,
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
name_column=self.name_all,
cluster_centers_=self.cluster_centers,
model=self.model,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
graph_name=ClusteringCommonFunction.SILHOUETTE_DIAGRAM.value,
)
self._plot_silhouette_value_diagram(
data=self.X,
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
name_column=self.name_all,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
graph_name=ClusteringCommonFunction.SILHOUETTE_VALUE_DIAGRAM.value,
)
[docs]
class KMeansClustering(ClusteringWorkflowBase):
"""The automation workflow of using KMeans algorithm to make insightful products."""
name = "KMeans"
special_function = [func.value for func in KMeansSpecialFunction]
def __init__(
self,
n_clusters: int = 8,
init: str = "k-means++",
n_init: int = 10,
max_iter: int = 300,
tol: float = 1e-4,
verbose: int = 0,
random_state: Optional[int] = None,
copy_x: bool = True,
algorithm: str = "auto",
) -> None:
"""
Parameters
----------
n_clusters : int, default=8
The number of clusters to form as well as the number of
centroids to generate.
init : {'k-means++', 'random'}, callable or array-like of shape \
(n_clusters, n_features), default='k-means++'
Method for initialization:
'k-means++' : selects initial cluster centers for k-mean
clustering in a smart way to speed up convergence. See section
Notes in k_init for more details.
'random': choose `n_clusters` observations (rows) at random from data
for the initial centroids.
If an array is passed, it should be of shape (n_clusters, n_features)
and gives the initial centers.
If a callable is passed, it should take arguments X, n_clusters and a
random state and return an initialization.
n_init : int, default=10
Number of time the k-means algorithm will be run with different
centroid seeds. The final results will be the best output of
n_init consecutive runs in terms of inertia.
max_iter : int, default=300
Maximum number of iterations of the k-means algorithm for a
single run.
tol : float, default=1e-4
Relative tolerance with regards to Frobenius norm of the difference
in the cluster centers of two consecutive iterations to declare
convergence.
verbose : int, default=0
Verbosity mode.
random_state : int, RandomState instance or None, default=None
Determines random number generation for centroid initialization. Use
an int to make the randomness deterministic.
See :term:`Glossary <random_state>`.
copy_x : bool, default=True
When pre-computing distances it is more numerically accurate to center
the data first. If copy_x is True (default), then the original data is
not modified. If False, the original data is modified, and put back
before the function returns, but small numerical differences may be
introduced by subtracting and then adding the data mean. Note that if
the original data is not C-contiguous, a copy will be made even if
copy_x is False. If the original data is sparse, but not in CSR format,
a copy will be made even if copy_x is False.
algorithm : {"auto", "full", "elkan"}, default="auto"
K-means algorithm to use. The classical EM-style algorithm is "full".
The "elkan" variation is more efficient on data with well-defined
clusters, by using the triangle inequality. However it's more memory
intensive due to the allocation of an extra array of shape
(n_samples, n_clusters).
For now "auto" (kept for backward compatibility) chooses "elkan" but it
might change in the future for a better heuristic.
References
----------
Scikit-learn API: sklearn.cluster.KMeans
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html
"""
super().__init__()
self.n_clusters = n_clusters
self.init = init
self.max_iter = max_iter
self.tol = tol
self.n_init = n_init
self.verbose = verbose
self.copy_x = copy_x
self.algorithm = algorithm
if random_state:
self.random_state = random_state
# If 'random_state' is None, 'self.random_state' comes from the parent class 'WorkflowBase'
self.model = KMeans(
n_clusters=self.n_clusters,
init=self.init,
n_init=self.n_init,
max_iter=self.max_iter,
tol=self.tol,
verbose=self.verbose,
random_state=self.random_state,
copy_x=self.copy_x,
algorithm=self.algorithm,
)
self.naming = KMeansClustering.name
@staticmethod
def _get_inertia_scores(func_name: str, algorithm_name: str, trained_model: object, store_path: str) -> None:
"""Get the scores of the clustering result."""
print(f"-----* {func_name} *-----")
print(f"{func_name}: ", trained_model.inertia_)
inertia_scores = {f"{func_name}": trained_model.inertia_}
mlflow.log_metrics(inertia_scores)
inertia_scores_str = json.dumps(inertia_scores, indent=4)
save_text(inertia_scores_str, f"{func_name} - {algorithm_name}", store_path)
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"[bold green]-*-*- {cls.name} - Hyper-parameters Specification -*-*-[/bold green]")
hyper_parameters = kmeans_manual_hyper_parameters()
clear_output()
return hyper_parameters
[docs]
def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
GEOPI_OUTPUT_METRICS_PATH = os.getenv("GEOPI_OUTPUT_METRICS_PATH")
self._get_inertia_scores(
func_name=KMeansSpecialFunction.INERTIA_SCORE.value,
algorithm_name=self.naming,
trained_model=self.model,
store_path=GEOPI_OUTPUT_METRICS_PATH,
)
[docs]
class DBSCANClustering(ClusteringMetricsMixin, ClusteringWorkflowBase):
"""The automation workflow of using DBSCAN algorithm to make insightful products."""
name = "DBSCAN"
special_function = ["Num of Clusters"]
def __init__(
self,
eps: float = 0.5,
min_samples: int = 5,
metric: str = "euclidean",
metric_params: Optional[Dict] = None,
algorithm: str = "auto",
leaf_size: int = 30,
p: float = None,
n_jobs: int = None,
) -> None:
"""
Parameters
----------
eps : float, default=0.5
The maximum distance between two samples for one to be considered as in the neighborhood of the other. This is not a maximum bound on the distances of points within a cluster.
This is the most important DBSCAN parameter to choose appropriately for your data set and distance function.
min_samples : int, default=5
The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. This includes the point itself.
metric : str, or callable, default=`euclidean`
The metric to use when calculating distance between instances in a feature array. If metric is a string or callable, it must be one of the options allowed
by sklearn.metrics.pairwise_distances for its metric parameter.
If metric is “precomputed”, X is assumed to be a distance matrix and must be square. X may be a sparse graph, in which case only “nonzero” elements may be considered neighbors for DBSCAN.
New in version 0.17: metric precomputed to accept precomputed sparse matrix.
metric_params : dict, default=None
Additional keyword arguments for the metric function.
New in version 0.19.
algorithm : {`auto`, `ball_tree`, `kd_tree`, `brute`}, default=`auto`
The algorithm to be used by the NearestNeighbors module to compute pointwise distances and find nearest neighbors. See NearestNeighbors module documentation for details.
leaf_size : int, default=30
Leaf size passed to BallTree or cKDTree. This can affect the speed of the construction and query, as well as the memory required to store the tree. The optimal value depends
on the nature of the problem.
p : float, default=None
The power of the Minkowski metric to be used to calculate distance between points. If None, then p=2 (equivalent to the Euclidean distance).
n_jobs : int, default=None
The number of parallel jobs to run. None means 1 unless in a joblib.parallel_backend context. -1 means using all processors. See Glossary for more details.
References
----------
Scikit-learn API: sklearn.cluster.DBSCAN
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html
"""
super().__init__()
self.eps = eps
self.min_samples = min_samples
self.metric = metric
self.metric_params = metric_params
self.algorithm = algorithm
self.leaf_size = leaf_size
self.p = p
self.n_jobs = n_jobs
self.model = DBSCAN(
eps=self.eps,
min_samples=self.min_samples,
metric=self.metric,
metric_params=self.metric_params,
algorithm=self.algorithm,
leaf_size=self.leaf_size,
p=self.p,
n_jobs=self.n_jobs,
)
self.naming = DBSCANClustering.name
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"[bold green]-*-*- {cls.name} - Hyper-parameters Specification -*-*-[/bold green]")
hyper_parameters = dbscan_manual_hyper_parameters()
clear_output()
return hyper_parameters
[docs]
def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
"""Invoke all special application functions for this algorithm by Scikit-learn framework."""
GEOPI_OUTPUT_METRICS_PATH = os.getenv("GEOPI_OUTPUT_METRICS_PATH")
self._get_num_clusters(
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
func_name=MeanShiftSpecialFunction.NUM_CLUSTERS.value,
algorithm_name=self.naming,
store_path=GEOPI_OUTPUT_METRICS_PATH,
)
[docs]
class Agglomerative(ClusteringWorkflowBase):
"""The automation workflow of using Agglomerative Clustering to make insightful products."""
name = "Agglomerative"
special_function = []
def __init__(
self,
n_clusters: int = 2,
*,
affinity: str = "euclidean",
memory: str = None,
connectivity: ArrayLike = None,
compute_full_tree: str = "auto",
linkage: str = "ward",
distance_threshold: float = None,
compute_distances: bool = False,
) -> None:
"""
Parameters
----------
n_clusters : int or None, default=2
The number of clusters to find. It must be ``None`` if
``distance_threshold`` is not ``None``.
affinity : str or callable, default='euclidean'
Metric used to compute the linkage. Can be "euclidean", "l1", "l2",
"manhattan", "cosine", or "precomputed".
If linkage is "ward", only "euclidean" is accepted.
If "precomputed", a distance matrix (instead of a similarity matrix)
is needed as input for the fit method.
memory : str or object with the joblib.Memory interface, default=None
Used to cache the output of the computation of the tree.
By default, no caching is done. If a string is given, it is the
path to the caching directory.
connectivity : array-like or callable, default=None
Connectivity matrix. Defines for each sample the neighboring
samples following a given structure of the data.
This can be a connectivity matrix itself or a callable that transforms
the data into a connectivity matrix, such as derived from
`kneighbors_graph`. Default is ``None``, i.e, the
hierarchical clustering algorithm is unstructured.
compute_full_tree : 'auto' or bool, default='auto'
Stop early the construction of the tree at ``n_clusters``. This is
useful to decrease computation time if the number of clusters is not
small compared to the number of samples. This option is useful only
when specifying a connectivity matrix. Note also that when varying the
number of clusters and using caching, it may be advantageous to compute
the full tree. It must be ``True`` if ``distance_threshold`` is not
``None``. By default `compute_full_tree` is "auto", which is equivalent
to `True` when `distance_threshold` is not `None` or that `n_clusters`
is inferior to the maximum between 100 or `0.02 * n_samples`.
Otherwise, "auto" is equivalent to `False`.
linkage : {'ward', 'complete', 'average', 'single'}, default='ward'
Which linkage criterion to use. The linkage criterion determines which
distance to use between sets of observation. The algorithm will merge
the pairs of cluster that minimize this criterion.
- 'ward' minimizes the variance of the clusters being merged.
- 'average' uses the average of the distances of each observation of
the two sets.
- 'complete' or 'maximum' linkage uses the maximum distances between
all observations of the two sets.
- 'single' uses the minimum of the distances between all observations
of the two sets.
.. versionadded:: 0.20
Added the 'single' option
distance_threshold : float, default=None
The linkage distance threshold above which, clusters will not be
merged. If not ``None``, ``n_clusters`` must be ``None`` and
``compute_full_tree`` must be ``True``.
.. versionadded:: 0.21
compute_distances : bool, default=False
Computes distances between clusters even if `distance_threshold` is not
used. This can be used to make dendrogram visualization, but introduces
a computational and memory overhead.
.. versionadded:: 0.24
References
----------
sklearn.cluster.AgglomerativeClustering
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AgglomerativeClustering.html
"""
super().__init__()
self.n_clusters = n_clusters
self.affinity = affinity
self.distance_threshold = distance_threshold
self.memory = memory
self.connectivity = connectivity
self.compute_full_tree = compute_full_tree
self.linkage = linkage
self.compute_distances = compute_distances
self.model = AgglomerativeClustering(
n_clusters=self.n_clusters,
affinity=self.affinity,
memory=self.memory,
connectivity=self.connectivity,
compute_full_tree=self.compute_full_tree,
linkage=self.linkage,
distance_threshold=self.distance_threshold,
compute_distances=self.compute_distances,
)
self.naming = Agglomerative.name
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"[bold green]-*-*- {cls.name} - Hyper-parameters Specification -*-*-[/bold green]")
hyper_parameters = agglomerative_manual_hyper_parameters()
clear_output()
return hyper_parameters
[docs]
def special_components(self, **kwargs) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
pass
[docs]
class AffinityPropagationClustering(ClusteringWorkflowBase):
name = "AffinityPropagation"
def __init__(
self,
*,
damping: float = 0.5,
max_iter: int = 200,
convergence_iter: int = 15,
copy: bool = True,
preference: Optional[Dict] = None,
affinity: str = "euclidean",
verbose: bool = False,
random_state: Optional[Dict] = None,
) -> None:
"""
Parameters
----------
damping : float, default=0.5
Damping factor in the range `[0.5, 1.0)` is the extent to
which the current value is maintained relative to
incoming values (weighted 1 - damping). This in order
to avoid numerical oscillations when updating these
values (messages).
max_iter : int, default=200
Maximum number of iterations.
convergence_iter : int, default=15
Number of iterations with no change in the number
of estimated clusters that stops the convergence.
copy : bool, default=True
Make a copy of input data.
preference : array-like of shape (n_samples,) or float, default=None
Preferences for each point - points with larger values of
preferences are more likely to be chosen as exemplars. The number
of exemplars, ie of clusters, is influenced by the input
preferences value. If the preferences are not passed as arguments,
they will be set to the median of the input similarities.
affinity : {'euclidean', 'precomputed'}, default='euclidean'
Which affinity to use. At the moment 'precomputed' and
``euclidean`` are supported. 'euclidean' uses the
negative squared euclidean distance between points.
verbose : bool, default=False
Whether to be verbose.
random_state : int, RandomState instance or None, default=None
Pseudo-random number generator to control the starting state.
Use an int for reproducible results across function calls.
See the :term:`Glossary <random_state>`.
.. versionadded:: 0.23
this parameter was previously hardcoded as 0.
References
----------
Scikit-learn API: sklearn.cluster.AffinityPropagation
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AffinityPropagation
"""
super().__init__()
self.damping = damping
self.max_iter = max_iter
self.convergence_iter = convergence_iter
self.copy = copy
self.verbose = verbose
self.preference = preference
self.affinity = affinity
if random_state:
self.random_state = random_state
# If 'random_state' is None, 'self.random_state' comes from the parent class 'WorkflowBase'
self.model = AffinityPropagation(
damping=self.damping,
max_iter=self.max_iter,
convergence_iter=self.convergence_iter,
copy=self.copy,
preference=self.preference,
affinity=self.affinity,
verbose=self.verbose,
random_state=self.random_state,
)
self.naming = AffinityPropagationClustering.name
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"[bold green]-*-*- {cls.name} - Hyper-parameters Specification -*-*-[/bold green]")
hyper_parameters = affinitypropagation_manual_hyper_parameters()
clear_output()
return hyper_parameters
[docs]
def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
[docs]
class MeanShiftClustering(ClusteringMetricsMixin, ClusteringWorkflowBase):
name = "MeanShift"
special_function = ["Num of Clusters"]
def __init__(
self,
*,
bandwidth: Optional[float] = None,
seeds: Optional[Union[np.ndarray, list]] = None,
bin_seeding: bool = False,
min_bin_freq: int = 1,
cluster_all: bool = True,
n_jobs: Optional[int] = None,
max_iter: int = 300,
) -> None:
"""
Parameters
----------
bandwidth : float, default=None
Bandwidth used in the flat kernel.
If not given, the bandwidth is estimated using
sklearn.cluster.estimate_bandwidth; see the documentation for that
function for hints on scalability (see also the Notes, below).
seeds : array-like of shape (n_samples, n_features), default=None
Seeds used to initialize kernels. If not set,
the seeds are calculated by clustering.get_bin_seeds
with bandwidth as the grid size and default values for
other parameters.
bin_seeding : bool, default=False
If true, initial kernel locations are not locations of all
points, but rather the location of the discretized version of
points, where points are binned onto a grid whose coarseness
corresponds to the bandwidth. Setting this option to True will speed
up the algorithm because fewer seeds will be initialized.
The default value is False.
Ignored if seeds argument is not None.
min_bin_freq : int, default=1
To speed up the algorithm, accept only those bins with at least
min_bin_freq points as seeds.
cluster_all : bool, default=True
If true, then all points are clustered, even those orphans that are
not within any kernel. Orphans are assigned to the nearest kernel.
If false, then orphans are given cluster label -1.
n_jobs : int, default=None
The number of jobs to use for the computation. The following tasks benefit
from the parallelization:
- The search of nearest neighbors for bandwidth estimation and label
assignments. See the details in the docstring of the
``NearestNeighbors`` class.
- Hill-climbing optimization for all seeds.
See :term:`Glossary <n_jobs>` for more details.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors. See :term:`Glossary <n_jobs>`
for more details.
max_iter : int, default=300
Maximum number of iterations, per seed point before the clustering
operation terminates (for that seed point), if has not converged yet.
.. versionadded:: 0.22
References
----------
Scikit-learn API: sklearn.cluster.MeanShift
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.MeanShift
"""
super().__init__()
self.bandwidth = bandwidth
self.seeds = seeds
self.bin_seeding = bin_seeding
self.min_bin_freq = min_bin_freq
self.cluster_all = cluster_all
self.n_jobs = n_jobs
self.max_iter = max_iter
self.model = MeanShift(
bandwidth=self.bandwidth, seeds=self.seeds, bin_seeding=self.bin_seeding, min_bin_freq=self.min_bin_freq, cluster_all=self.cluster_all, n_jobs=self.n_jobs, max_iter=self.max_iter
)
self.naming = MeanShiftClustering.name
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"[bold green]-*-*- {cls.name} - Hyper-parameters Specification -*-*-[/bold green]")
hyper_parameters = meanshift_manual_hyper_parameters()
clear_output()
return hyper_parameters
[docs]
def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
"""Invoke all special application functions for this algorithm by Scikit-learn framework."""
GEOPI_OUTPUT_METRICS_PATH = os.getenv("GEOPI_OUTPUT_METRICS_PATH")
self._get_num_clusters(
labels=self.cluster_labels[ClusteringCommonFunction.CLUSTER_LABELS.value],
func_name=MeanShiftSpecialFunction.NUM_CLUSTERS.value,
algorithm_name=self.naming,
store_path=GEOPI_OUTPUT_METRICS_PATH,
)
[docs]
class SpectralClustering(ClusteringWorkflowBase):
name = "Spectral"
pass
[docs]
class WardHierarchicalClustering(ClusteringWorkflowBase):
name = "WardHierarchical"
pass
[docs]
class OPTICSClustering(ClusteringWorkflowBase):
name = "OPTICS"
pass
[docs]
class GaussianMixturesClustering(ClusteringWorkflowBase):
name = "GaussianMixtures"
pass
[docs]
class BIRCHClusteringClustering(ClusteringWorkflowBase):
name = "BIRCHClustering"
pass
[docs]
class BisectingKMeansClustering(ClusteringWorkflowBase):
name = "BisectingKMeans"
pass