Source code for skactiveml.pool._typi_clust

"""
Module implementing TypiClust.

TypiClust is a deep active learning strategy suited for low budgets.
Its aim is to query typical examples with the corresponding high score
of 'typicality'.
"""

import numpy as np

from ..base import SingleAnnotatorPoolQueryStrategy
from ..utils import MISSING_LABEL, labeled_indices, check_scalar, rand_argmax

from copy import deepcopy
from inspect import signature
from sklearn.cluster import KMeans
from sklearn.neighbors import NearestNeighbors


[docs]class TypiClust(SingleAnnotatorPoolQueryStrategy): """Typical Clustering (TypiClust) This class implements the Typical Clustering (TypiClust) query strategy [1]_, which considers both diversity and typicality (representativeness) of the samples. Parameters ---------- missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : None or int or np.random.RandomState, default=None The random state to use. cluster_algo : ClusterMixin.__class__, default=KMeans The cluster algorithm to be used. cluster_algo_dict : dict, optional (default=None) The parameters passed to the clustering algorithm `cluster_algo`, excluding the parameter for the number of clusters. n_cluster_param_name : string, default="n_clusters" The name of the parameter for the number of clusters. k : int, default=5 The number for k-nearest-neighbors for the computation of typicality. References ---------- .. [1] G. Hacohen, A. Dekel, and D. Weinshall. Active Learning on a Budget: Opposite Strategies Suit High and Low Budgets. In Int. Conf. Mach. Learn., pages 8175–8195, 2022. """ def __init__( self, missing_label=MISSING_LABEL, random_state=None, cluster_algo=KMeans, cluster_algo_dict=None, n_cluster_param_name="n_clusters", k=5, ): super().__init__( missing_label=missing_label, random_state=random_state ) self.cluster_algo = cluster_algo self.cluster_algo_dict = cluster_algo_dict self.n_cluster_param_name = n_cluster_param_name self.k = k
[docs] def query( self, X, y, candidates=None, batch_size=1, return_utilities=False, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). candidates : None or array-like of shape (n_candidates), dtype=int or \ array-like of shape (n_candidates, n_features), default=None - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. batch_size : int, default=1 The number of samples to be selected in one AL cycle. return_utilities : bool, default=False If `True`, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. The indexing refers to the samples in `X`. utilities : numpy.ndarray of shape (batch_size, n_samples) or \ numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. The indexing refers to the samples in `X`. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) _, mapping = self._transform_candidates( candidates, X, y, enforce_mapping=True ) # Validate init parameter check_scalar(self.k, "k", target_type=int, min_val=1) if not ( isinstance(self.cluster_algo_dict, dict) or self.cluster_algo_dict is None ): raise TypeError( "Please pass a dictionary with corresponding parameter name " "and value in the `init` function." ) cluster_algo_dict = ( {} if self.cluster_algo_dict is None else self.cluster_algo_dict.copy() ) if not isinstance(self.n_cluster_param_name, str): raise TypeError("`n_cluster_param_name` supports only string.") labeled_sample_indices = labeled_indices( y, missing_label=self.missing_label ) # Set number of clusters. n_clusters = len(labeled_sample_indices) + batch_size cluster_algo_dict[self.n_cluster_param_name] = n_clusters # Optionally, set random state. cluster_algo_sig = signature(self.cluster_algo.__init__).parameters algo_has_seed = "random_state" in cluster_algo_sig dict_lacks_seed = "random_state" not in cluster_algo_dict if self.random_state is not None and algo_has_seed and dict_lacks_seed: cluster_algo_dict["random_state"] = deepcopy(self.random_state) # Create object for clustering with given parameters. cluster_obj = self.cluster_algo(**cluster_algo_dict) cluster_labels = cluster_obj.fit_predict(X) # determine number of samples per cluster and mask clusters with # labeled samples cluster_sizes = np.zeros(n_clusters) cluster_ids, cluster_ids_sizes = np.unique( cluster_labels, return_counts=True ) cluster_sizes[cluster_ids] = cluster_ids_sizes covered_cluster = np.unique( [cluster_labels[i] for i in labeled_sample_indices] ) if len(covered_cluster) > 0: cluster_sizes[covered_cluster] = 0 utilities = np.full(shape=(batch_size, X.shape[0]), fill_value=np.nan) query_indices = [] for i in range(batch_size): if cluster_sizes.max() == 0: typicality = np.ones(len(X)) else: cluster_id = rand_argmax( cluster_sizes, random_state=self.random_state_ ) is_cluster = cluster_labels == cluster_id uncovered_samples_mapping = np.where(is_cluster)[0] typicality = _typicality(X, uncovered_samples_mapping, self.k) utilities[i, mapping] = typicality[mapping] utilities[i, query_indices] = np.nan idx = rand_argmax( typicality[mapping], random_state=self.random_state_ ) idx = mapping[idx[0]] query_indices = np.append(query_indices, [idx]).astype(int) cluster_sizes[cluster_id] = 0 if return_utilities: return query_indices, utilities else: return query_indices
def _typicality(X, uncovered_samples_mapping, k, eps=1e-7): """ Calculation the typicality of samples `X` in uncovered clusters. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. uncovered_samples_mapping : np.ndarray of shape (n_candidates,), default=None Index array that maps `candidates` to `X_for_cluster`. k : int k for computation of k nearst neighbors. eps : float > 0, default=1e-7 Minimum distance sum to compute typicality. Returns ------- typicality : numpy.ndarray of shape (n_X) The typicality of all uncovered samples in X """ typicality = np.full(shape=X.shape[0], fill_value=-np.inf) if len(uncovered_samples_mapping) == 1: typicality[uncovered_samples_mapping] = 1 return typicality k = np.min((len(uncovered_samples_mapping) - 1, k)) nn = NearestNeighbors(n_neighbors=k + 1).fit(X[uncovered_samples_mapping]) dist_matrix_sort_inc, _ = nn.kneighbors( X[uncovered_samples_mapping], n_neighbors=k + 1, return_distance=True ) knn = np.sum(dist_matrix_sort_inc, axis=1) + eps typi = ((1 / k) * knn) ** (-1) typicality[uncovered_samples_mapping] = typi return typicality