Source code for skactiveml.pool._uncertainty_sampling

"""
Module implementing various uncertainty based query strategies.
"""

# Authors: Pascal Mergard <Pascal.Mergard@student.uni-kassel.de>
#          Marek Herde <marek.herde@uni-kassel.de>

import numpy as np
from sklearn import clone
from sklearn.utils.validation import check_array

from ..base import SingleAnnotatorPoolQueryStrategy, SkactivemlClassifier
from ..utils import (
    MISSING_LABEL,
    check_cost_matrix,
    simple_batch,
    check_classes,
    check_type,
    check_equal_missing_label,
)


[docs]class UncertaintySampling(SingleAnnotatorPoolQueryStrategy): """Uncertainty Sampling. This class implement various uncertainty based query strategies, i.e., the standard uncertainty measures [1], cost-sensitive ones [2], and one optimizing expected average precision [3]. Parameters ---------- method : string, default='least_confident' The method to calculate the uncertainty, entropy, least_confident, margin_sampling, and expected_average_precision are possible. cost_matrix : array-like of shape (n_classes, n_classes) Cost matrix with cost_matrix[i,j] defining the cost of predicting class j for a sample with the actual class i. Only supported for `least_confident` and `margin_sampling` variant. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState The random state to use. References ---------- [1] Settles, Burr. Active learning literature survey. University of Wisconsin-Madison Department of Computer Sciences, 2009. [2] Chen, Po-Lung, and Hsuan-Tien Lin. "Active learning for multiclass cost-sensitive classification using probabilistic models." 2013 Conference on Technologies and Applications of Artificial Intelligence. IEEE, 2013. [3] Wang, Hanmo, et al. "Uncertainty sampling for action recognition via maximizing expected average precision." IJCAI International Joint Conference on Artificial Intelligence. 2018. """ def __init__( self, method="least_confident", cost_matrix=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( missing_label=missing_label, random_state=random_state ) self.method = method self.cost_matrix = cost_matrix
[docs] def query( self, X, y, clf, fit_clf=True, sample_weight=None, utility_weight=None, candidates=None, batch_size=1, return_utilities=False, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e. including the labeled and unlabeled samples. y : array-like of shape (n_samples) Labels of the training data set (possibly including unlabeled ones indicated by self.MISSING_LABEL. clf : skactiveml.base.SkactivemlClassifier Model implementing the methods `fit` and `predict_proba`. fit_clf : bool, optional (default=True) Defines whether the classifier should be fitted on `X`, `y`, and `sample_weight`. sample_weight: array-like of shape (n_samples), optional (default=None) Weights of training samples in `X`. utility_weight: array-like, optional (default=None) Weight for each candidate (multiplied with utilities). Usually, this is to be the density of a candidate. The length of `utility_weight` is usually n_samples, except for the case when candidates contains samples (ndim >= 2). Then the length is `n_candidates`. candidates : None or array-like of shape (n_candidates), dtype=int or array-like of shape (n_candidates, n_features), optional (default=None) If candidates is None, the unlabeled samples from (X,y) are considered as candidates. If candidates is of shape (n_candidates) and of type int, candidates is considered as the indices of the samples in (X,y). If candidates is of shape (n_candidates, n_features), the candidates are directly given in candidates (not necessarily contained in X). This is not supported by all query strategies. batch_size : int, default=1 The number of samples to be selected in one AL cycle. return_utilities : bool, default=False If true, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size) The query_indices indicate for which candidate sample a label is to queried, e.g., `query_indices[0]` indicates the first selected sample. If candidates is None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. utilities : numpy.ndarray of shape (batch_size, n_samples) or numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. If candidates is None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. """ # Validate input parameters. X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) X_cand, mapping = self._transform_candidates(candidates, X, y) # Validate classifier type. check_type(clf, "clf", SkactivemlClassifier) check_equal_missing_label(clf.missing_label, self.missing_label_) # Validate classifier type. check_type(fit_clf, "fit_clf", bool) # Check `utility_weight`. if utility_weight is None: if mapping is None: utility_weight = np.ones(len(X_cand)) else: utility_weight = np.ones(len(X)) utility_weight = check_array(utility_weight, ensure_2d=False) if mapping is None and not len(X_cand) == len(utility_weight): raise ValueError( f"'utility_weight' must have length 'n_candidates' but " f"{len(X_cand)} != {len(utility_weight)}." ) if mapping is not None and not len(X) == len(utility_weight): raise ValueError( f"'utility_weight' must have length 'n_samples' but " f"{len(utility_weight)} != {len(X)}." ) # Validate method. if not isinstance(self.method, str): raise TypeError( "{} is an invalid type for method. Type {} is " "expected".format(type(self.method), str) ) # sample_weight is checked by clf when fitted # Fit the classifier. if fit_clf: if sample_weight is not None: clf = clone(clf).fit(X, y, sample_weight) else: clf = clone(clf).fit(X, y) # Predict class-membership probabilities. probas = clf.predict_proba(X_cand) # Choose the method and calculate corresponding utilities. with np.errstate(divide="ignore"): if self.method in [ "least_confident", "margin_sampling", "entropy", ]: utilities_cand = uncertainty_scores( probas=probas, method=self.method, cost_matrix=self.cost_matrix, ) elif self.method == "expected_average_precision": classes = clf.classes_ utilities_cand = expected_average_precision(classes, probas) else: raise ValueError( "The given method {} is not valid. Supported methods are " "'entropy', 'least_confident', 'margin_sampling' and " "'expected_average_precision'".format(self.method) ) if mapping is None: utilities = utilities_cand else: utilities = np.full(len(X), np.nan) utilities[mapping] = utilities_cand utilities *= utility_weight return simple_batch( utilities, self.random_state_, batch_size=batch_size, return_utilities=return_utilities, )
[docs]def uncertainty_scores(probas, cost_matrix=None, method="least_confident"): """Computes uncertainty scores. Three methods are available: least confident ('least_confident'), margin sampling ('margin_sampling'), and entropy based uncertainty ('entropy') [1]. For the least confident and margin sampling methods cost-sensitive variants are implemented in case of a given cost matrix (see [2] for more information). Parameters ---------- probas : array-like, shape (n_samples, n_classes) Class membership probabilities for each sample. cost_matrix : array-like, shape (n_classes, n_classes) Cost matrix with C[i,j] defining the cost of predicting class j for a sample with the actual class i. Only supported for least confident variant. method : {'least_confident', 'margin_sampling', 'entropy'}, optional (default='least_confident') Least confidence (lc) queries the sample whose maximal posterior probability is minimal. In case of a given cost matrix, the maximial expected cost variant is used. Smallest margin (sm) queries the sample whose posterior probability gap between the most and the second most probable class label is minimal. In case of a given cost matrix, the cost-weighted minimum margin is used. Entropy ('entropy') queries the sample whose posterior's have the maximal entropy. There is no cost-sensitive variant of entropy based uncertainty sampling. References ---------- [1] Settles, Burr. "Active learning literature survey". University of Wisconsin-Madison Department of Computer Sciences, 2009. [2] Chen, Po-Lung, and Hsuan-Tien Lin. "Active learning for multiclass cost-sensitive classification using probabilistic models." 2013 Conference on Technologies and Applications of Artificial Intelligence. IEEE, 2013. """ # Check probabilities. probas = check_array(probas) if not np.allclose(np.sum(probas, axis=1), 1, rtol=0, atol=1.0e-3): raise ValueError( "'probas' are invalid. The sum over axis 1 must be one." ) n_classes = probas.shape[1] # Check cost matrix. if cost_matrix is not None: cost_matrix = check_cost_matrix(cost_matrix, n_classes=n_classes) # Compute uncertainties. if method == "least_confident": if cost_matrix is None: return 1 - np.max(probas, axis=1) else: costs = probas @ cost_matrix costs = np.partition(costs, 1, axis=1)[:, :2] return costs[:, 0] elif method == "margin_sampling": if cost_matrix is None: probas = -(np.partition(-probas, 1, axis=1)[:, :2]) return 1 - np.abs(probas[:, 0] - probas[:, 1]) else: costs = probas @ cost_matrix costs = np.partition(costs, 1, axis=1)[:, :2] return -np.abs(costs[:, 0] - costs[:, 1]) elif method == "entropy": if cost_matrix is None: with np.errstate(divide="ignore", invalid="ignore"): return np.nansum(-probas * np.log(probas), axis=1) else: raise ValueError( "Method `entropy` does not support cost matrices but " "`cost_matrix` was not None." ) else: raise ValueError( "Supported methods are ['least_confident', 'margin_sampling', " "'entropy'], the given one is: {}.".format(method) )
[docs]def expected_average_precision(classes, probas): """ Calculate the expected average precision. Parameters ---------- classes : array-like, shape=(n_classes) Holds the label for each class. probas : np.ndarray, shape=(n_X_cand, n_classes) The probabilistic estimation for each classes and all instance in candidates. Returns ------- score : np.ndarray, shape=(n_X_cand) The expected average precision score of all instances in candidates. References ---------- [1] Wang, Hanmo, et al. "Uncertainty sampling for action recognition via maximizing expected average precision." IJCAI International Joint Conference on Artificial Intelligence. 2018. """ # Check if `probas` is valid. probas = check_array( probas, accept_sparse=False, accept_large_sparse=True, dtype="numeric", order=None, copy=False, force_all_finite=True, ensure_2d=True, allow_nd=False, ensure_min_samples=1, ensure_min_features=1, estimator=None, ) if (np.sum(probas, axis=1) - 1).all(): raise ValueError( "probas are invalid. The sum over axis 1 must be " "one." ) # Check if `classes` are valid. check_classes(classes) if len(classes) < 2: raise ValueError("`classes` must contain at least 2 entries.") if len(classes) != probas.shape[1]: raise ValueError( "`classes` must have the same length as `probas` has " "columns." ) score = np.zeros(len(probas)) for i in range(len(classes)): for j in range(len(probas)): # The i-th column of p without p[j,i] p = probas[:, i] p = np.delete(p, [j]) # Sort p in descending order p = np.flipud(np.sort(p, axis=0)) # calculate g_arr g_arr = np.zeros((len(p), len(p))) for n in range(len(p)): for h in range(n + 1): g_arr[n, h] = _g(n, h, p, g_arr) # calculate f_arr f_arr = np.zeros((len(p) + 1, len(p) + 1)) for a in range(len(p) + 1): for b in range(a + 1): f_arr[a, b] = _f(a, b, p, f_arr, g_arr) # calculate score for t in range(len(p)): score[j] += f_arr[len(p), t + 1] / (t + 1) return score
# g-function for expected_average_precision def _g(n, t, p, g_arr): if t > n or (t == 0 and n > 0): return 0 if t == 0 and n == 0: return 1 return p[n - 1] * g_arr[n - 1, t - 1] + (1 - p[n - 1]) * g_arr[n - 1, t] # f-function for expected_average_precision def _f(n, t, p, f_arr, g_arr): if t > n or (t == 0 and n > 0): return 0 if t == 0 and n == 0: return 1 return ( p[n - 1] * f_arr[n - 1, t - 1] + p[n - 1] * t * g_arr[n - 1, t - 1] / n + (1 - p[n - 1]) * f_arr[n - 1, t] )