Source code for skactiveml.pool._four_ds

"""
Module implementing 4DS active learning strategy.
"""

# Author: Marek Herde <marek.herde@uni-kassel.de>


import numpy as np
from sklearn.base import clone

from ..base import SingleAnnotatorPoolQueryStrategy
from ..classifier import MixtureModelClassifier
from ..utils import (
    rand_argmax,
    is_labeled,
    check_type,
    MISSING_LABEL,
    check_equal_missing_label,
    check_scalar,
)


[docs]class FourDs(SingleAnnotatorPoolQueryStrategy): """FourDs Implementation of the pool-based query strategy 4DS for training a MixtureModelClassifier [1]. Parameters ---------- lmbda : float between 0 and 1, optional (default=min((batch_size-1)*0.05, 0.5)) For the selection of more than one sample within each query round, 4DS uses a diversity measure to avoid the selection of redundant samples whose influence is regulated by the weighting factor 'lmbda'. missing_label : scalar or string or np.nan or None, optional (default=MISSING_LABEL) Value to represent a missing label. random_state : int or np.random.RandomState, optional (default=None) The random state to use. References --------- [1] Reitmaier, T., & Sick, B. (2013). Let us know your decision: Pool-based active training of a generative classifier with the selection strategy 4DS. Information Sciences, 230, 106-131. """ def __init__( self, lmbda=None, missing_label=MISSING_LABEL, random_state=None ): super().__init__( missing_label=missing_label, random_state=random_state ) self.lmbda = lmbda
[docs] def query( self, X, y, clf, fit_clf=True, sample_weight=None, candidates=None, return_utilities=False, batch_size=1, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X: array-like of shape (n_samples, n_features) Training data set, usually complete, i.e. including the labeled and unlabeled samples. y: array-like of shape (n_samples) Labels of the training data set (possibly including unlabeled ones indicated by self.MISSING_LABEL. clf : skactiveml.classifier.MixtureModelClassifier GMM-based classifier to be trained. fit_clf : bool, optional (default=True) Defines whether the classifier should be fitted on `X`, `y`, and `sample_weight`. sample_weight: array-like of shape (n_samples), optional (default=None) Weights of training samples in `X`. candidates : None or array-like of shape (n_candidates), dtype=int or array-like of shape (n_candidates, n_features), optional (default=None) If candidates is None, the unlabeled samples from (X,y) are considered as candidates. If candidates is of shape (n_candidates) and of type int, candidates is considered as the indices of the samples in (X,y). If candidates is of shape (n_candidates, n_features), the candidates are directly given in candidates (not necessarily contained in X). This is not supported by all query strategies. batch_size : int, optional (default=1) The number of samples to be selected in one AL cycle. return_utilities : bool, optional (default=False) If True, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size) The query_indices indicate for which candidate sample a label is to queried, e.g., `query_indices[0]` indicates the first selected sample. If candidates is None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. utilities : numpy.ndarray of shape (batch_size, n_samples) or numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. If candidates is None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. """ # Check standard parameters. ( X, y, candidates, batch_size, return_utilities, ) = super()._validate_data( X=X, y=y, candidates=candidates, batch_size=batch_size, return_utilities=return_utilities, reset=True, ) # Check classifier type. check_type(clf, "clf", MixtureModelClassifier) check_type(fit_clf, "fit_clf", bool) check_equal_missing_label(clf.missing_label, self.missing_label_) # Check lmbda. lmbda = self.lmbda if lmbda is None: lmbda = np.min(((batch_size - 1) * 0.05, 0.5)) check_scalar( lmbda, target_type=(float, int), name="lmbda", min_val=0, max_val=1 ) # Obtain candidates plus mapping. X_cand, mapping = self._transform_candidates(candidates, X, y) # Storage for query indices. query_indices_cand = np.full(batch_size, fill_value=-1, dtype=int) # Fit the classifier and get the probabilities. if fit_clf: clf = clone(clf).fit(X, y, sample_weight) P_cand = clf.predict_proba(X_cand) R_cand = clf.mixture_model_.predict_proba(X_cand) is_lbld = is_labeled(y, missing_label=clf.missing_label) if np.sum(is_lbld) >= 1: R_lbld = clf.mixture_model_.predict_proba(X[is_lbld]) else: R_lbld = np.array([0]) # Compute distance according to Eq. 9 in [1]. P_cand_sorted = np.sort(P_cand, axis=1) distance_cand = np.log( (P_cand_sorted[:, -1] + 1.0e-5) / (P_cand_sorted[:, -2] + 1.0e-5) ) distance_cand = (distance_cand - np.min(distance_cand) + 1.0e-5) / ( np.max(distance_cand) - np.min(distance_cand) + 1.0e-5 ) # Compute densities according to Eq. 10 in [1]. density_cand = clf.mixture_model_.score_samples(X_cand) density_cand = (density_cand - np.min(density_cand) + 1.0e-5) / ( np.max(density_cand) - np.min(density_cand) + 1.0e-5 ) # Compute distributions according to Eq. 11 in [1]. R_lbld_sum = np.sum(R_lbld, axis=0, keepdims=True) R_sum = R_cand + R_lbld_sum R_mean = R_sum / (len(R_lbld) + 1) distribution_cand = clf.mixture_model_.weights_ - R_mean distribution_cand = np.maximum( np.zeros_like(distribution_cand), distribution_cand ) distribution_cand = 1 - np.sum(distribution_cand, axis=1) # Compute rho according to Eq. 15 in [1]. diff = np.sum( np.abs(clf.mixture_model_.weights_ - np.mean(R_lbld, axis=0)) ) rho = min(1, diff) # Compute e_dwus according to Eq. 13 in [1]. e_dwus = np.mean((1 - P_cand_sorted[:, -1]) * density_cand) # Normalization such that alpha, beta, and rho sum up to one. alpha = (1 - rho) * e_dwus beta = 1 - rho - alpha # Compute utilities to select sample. utilities_cand = np.empty((batch_size, len(X_cand)), dtype=float) utilities_cand[0] = ( alpha * (1 - distance_cand) + beta * density_cand + rho * distribution_cand ) query_indices_cand[0] = rand_argmax( utilities_cand[0], self.random_state_ )[0] is_selected = np.zeros(len(X_cand), dtype=bool) is_selected[query_indices_cand[0]] = True if batch_size > 1: # Compute e_us according to Eq. 14 in [1]. e_us = np.mean(1 - P_cand_sorted[:, -1]) # Normalization of the coefficients alpha, beta, and rho such # that these coefficients plus # lmbda sum up to one. rho = min(rho, 1 - lmbda) alpha = (1 - (rho + lmbda)) * (1 - e_us) beta = 1 - (rho + lmbda) - alpha for i in range(1, batch_size): # Update distributions according to Eq. 11 in [1]. R_sum = ( R_cand + np.sum(R_cand[is_selected], axis=0, keepdims=True) + R_lbld_sum ) R_mean = R_sum / (len(R_lbld) + len(query_indices_cand) + 1) distribution_cand = clf.mixture_model_.weights_ - R_mean distribution_cand = np.maximum( np.zeros_like(distribution_cand), distribution_cand ) distribution_cand = 1 - np.sum(distribution_cand, axis=1) # Compute diversity according to Eq. 12 in [1]. diversity_cand = -np.log( density_cand + np.sum(density_cand[is_selected]) ) / (len(query_indices_cand) + 1) diversity_cand = (diversity_cand - np.min(diversity_cand)) / ( np.max(diversity_cand) - np.min(diversity_cand) ) # Compute utilities to select sample. utilities_cand[i] = ( alpha * (1 - distance_cand) + beta * density_cand + lmbda * diversity_cand + rho * distribution_cand ) utilities_cand[i, is_selected] = np.nan query_indices_cand[i] = rand_argmax( utilities_cand[i], self.random_state_ )[0] is_selected[query_indices_cand[i]] = True # Remapping of utilities and query indices if required. if mapping is None: utilities = utilities_cand query_indices = query_indices_cand if mapping is not None: utilities = np.full((batch_size, len(X)), np.nan) utilities[:, mapping] = utilities_cand query_indices = mapping[query_indices_cand] # Check whether utilities are to be returned. if return_utilities: return query_indices, utilities else: return query_indices