Source code for skactiveml.pool._typi_clust

"""
Module implementing TypiClust.

TypiClust is a deep active learning strategy suited for low budgets.
Its aim is to query typical examples with the corresponding high score
of 'typicality'.
"""

import numpy as np

from ..base import SingleAnnotatorPoolQueryStrategy
from ..utils import MISSING_LABEL, labeled_indices, check_scalar, rand_argmax
from sklearn.cluster import KMeans
from sklearn.neighbors import NearestNeighbors


[docs]class TypiClust(SingleAnnotatorPoolQueryStrategy): """Typical Clustering This class implements the Typical Clustering (TypiClust) query strategy [1]_, which considers both diversity and typicality (representativeness) of the samples. Parameters ---------- missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : None or int or np.random.RandomState, default=None The random state to use. cluster_algo : ClusterMixin.__class__, default=KMeans The cluster algorithm to be used. cluster_algo_dict : dict, optional (default=None) The parameters passed to the clustering algorithm `cluster_algo`, excluding the parameter for the number of clusters. n_cluster_param_name : string, default="n_clusters" The name of the parameter for the number of clusters. k : int, default=5 The number for k-nearest-neighbors for the computation of typicality. References ---------- .. [1] G. Hacohen, A. Dekel, und D. Weinshall, "Active Learning on a Budget: Opposite Strategies Suit High and Low Budgets", ICML, 2022. """ def __init__( self, missing_label=MISSING_LABEL, random_state=None, cluster_algo=KMeans, cluster_algo_dict=None, n_cluster_param_name="n_clusters", k=5, ): super().__init__( missing_label=missing_label, random_state=random_state ) self.cluster_algo = cluster_algo self.cluster_algo_dict = cluster_algo_dict self.n_cluster_param_name = n_cluster_param_name self.k = k
[docs] def query( self, X, y, candidates=None, batch_size=1, return_utilities=False, ): """Query the next samples to be labeled Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples, ) Labels of the training data set (possibly including unlabeled ones indicated by self.missing_label) candidates : None or array-like of shape (n_candidates), dtype = int or array-like of shape (n_candidates, n_features), optional (default=None) If candidates is None, the unlabeled samples from (X, y) are considered as candidates. If candidates is of shape (n_candidates) and of type int, candidates is considered as a list of the indices of the samples in (X, y). If candidates is of shape (n_candidates, n_features), the candidates are directly given in the input candidates (not necessarily contained in X). batch_size : int, optional(default=1) The number of samples to be selects in one AL cycle. return_utilities : bool, optional(default=False) If True, also return the utilities based on the query strategy Returns ---------- query_indices : np.ndarray of shape (batch_size,) The query_indices indicate for which candidate sample a label is to queried, e.g., `query_indices[0]` indicates the first selected sample. If candidates in None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. utilities : numpy.ndarray of shape (batch_size, n_samples) or np.ndarray of shape (batch_size, n_candidates) The utilities of samples for selecting each sample of the batch. Here, utilities mean the typicality in the considered cluster. If candidates is None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) _, mapping = self._transform_candidates( candidates, X, y, enforce_mapping=True ) # Validate init parameter check_scalar(self.k, "k", target_type=int, min_val=1) if not ( isinstance(self.cluster_algo_dict, dict) or self.cluster_algo_dict is None ): raise TypeError( "Please pass a dictionary with corresponding parameter name " "and value in the `init` function." ) cluster_algo_dict = ( {} if self.cluster_algo_dict is None else self.cluster_algo_dict.copy() ) if not isinstance(self.n_cluster_param_name, str): raise TypeError("`n_cluster_param_name` supports only string.") labeled_sample_indices = labeled_indices( y, missing_label=self.missing_label ) n_clusters = len(labeled_sample_indices) + batch_size cluster_algo_dict[self.n_cluster_param_name] = n_clusters cluster_obj = self.cluster_algo(**cluster_algo_dict) cluster_labels = cluster_obj.fit_predict(X) # determine number of samples per cluster and mask clusters with # labeled samples cluster_sizes = np.zeros(n_clusters) cluster_ids, cluster_ids_sizes = np.unique( cluster_labels, return_counts=True ) cluster_sizes[cluster_ids] = cluster_ids_sizes covered_cluster = np.unique( [cluster_labels[i] for i in labeled_sample_indices] ) if len(covered_cluster) > 0: cluster_sizes[covered_cluster] = 0 utilities = np.full(shape=(batch_size, X.shape[0]), fill_value=np.nan) query_indices = [] for i in range(batch_size): if cluster_sizes.max() == 0: typicality = np.ones(len(X)) else: cluster_id = rand_argmax( cluster_sizes, random_state=self.random_state_ ) is_cluster = cluster_labels == cluster_id uncovered_samples_mapping = np.where(is_cluster)[0] typicality = _typicality(X, uncovered_samples_mapping, self.k) utilities[i, mapping] = typicality[mapping] utilities[i, query_indices] = np.nan idx = rand_argmax( typicality[mapping], random_state=self.random_state_ ) idx = mapping[idx[0]] query_indices = np.append(query_indices, [idx]).astype(int) cluster_sizes[cluster_id] = 0 if return_utilities: return query_indices, utilities else: return query_indices
def _typicality(X, uncovered_samples_mapping, k, eps=1e-7): """ Calculation the typicality of samples `X` in uncovered clusters. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. uncovered_samples_mapping : np.ndarray of shape (n_candidates,), default=None Index array that maps `candidates` to `X_for_cluster`. k : int k for computation of k nearst neighbors. eps : float > 0, default=1e-7 Minimum distance sum to compute typicality. Returns ------- typicality : numpy.ndarray of shape (n_X) The typicality of all uncovered samples in X """ typicality = np.full(shape=X.shape[0], fill_value=-np.inf) if len(uncovered_samples_mapping) == 1: typicality[uncovered_samples_mapping] = 1 return typicality k = np.min((len(uncovered_samples_mapping) - 1, k)) nn = NearestNeighbors(n_neighbors=k + 1).fit(X[uncovered_samples_mapping]) dist_matrix_sort_inc, _ = nn.kneighbors( X[uncovered_samples_mapping], n_neighbors=k + 1, return_distance=True ) knn = np.sum(dist_matrix_sort_inc, axis=1) + eps typi = ((1 / k) * knn) ** (-1) typicality[uncovered_samples_mapping] = typi return typicality