Source code for skactiveml.pool._cost_embedding_al

"""
Implementation of Active Learning with Cost Embedding (CostEmbeddingAL), which
is modified of:

https://github.com/ntucllab/libact/blob/master/libact.
Copyright (c) 2014, National Taiwan University All rights reserved.
"""

import warnings

import numpy as np
from joblib import Parallel, delayed
from sklearn import clone
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import euclidean_distances
from sklearn.neighbors import NearestNeighbors
from sklearn.svm import SVR
from sklearn.utils import check_array, check_symmetric

from ..base import SingleAnnotatorPoolQueryStrategy
from ..utils import (
    simple_batch,
    check_classifier_params,
    MISSING_LABEL,
    check_scalar,
    check_random_state,
    check_X_y,
    is_labeled,
    ExtLabelEncoder,
)


[docs]class CostEmbeddingAL(SingleAnnotatorPoolQueryStrategy): """Active Learning with Cost Embedding (ALCE) The Active Learning with Cost Embeddings (ALCE) [1]_ query strategy uses a cost-sensitive uncertainty measure based on a distance measure embedding space reflecting a given cost matrix. This implementation is based on libact [2]_. Parameters ---------- classes: array-like of shape (n_classes,) List of all classes that can occur. base_regressor : sklearn.base.RegressorMixin, default=None An sklearn regression model implementing the methods `fit` and `predict`. cost_matrix: array-like of shape (n_classes, n_classes), default=None Cost matrix with `cost_matrix[i,j]` defining the cost of predicting class `j` for a sample with the actual class `i`. embed_dim : int, optional (default=None) If it is `None`, `embed_dim=n_classes` is used. mds_params : dict, default=None Parameters passed to `sklearn.manifold.MDS`. nn_params : dict, default=None Parameters passed to `sklearn.neighbors.NearestNeighbors`. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState, default=None Random state for annotator selection. References ---------- .. [1] K.-H. Huang and H.-T. Lin. A Novel Uncertainty Sampling Algorithm for Cost-Sensitive Multiclass Active Learning. In IEEE Int. Conf. Data Min., pages 925–930, 2016. .. [2] Y.-Y. Yang, S.-C. Lee, Y.-A. Chung, T.-E. Wu, S.-A. Chen, and H.-T. Lin. libact: Pool-based Active Learning in Python. arXiv:1710.00379, 2017. """ def __init__( self, classes, base_regressor=None, cost_matrix=None, embed_dim=None, mds_params=None, nn_params=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( missing_label=missing_label, random_state=random_state ) self.classes = classes self.base_regressor = base_regressor self.cost_matrix = cost_matrix self.embed_dim = embed_dim self.missing_label = missing_label self.random_state = random_state self.mds_params = mds_params self.nn_params = nn_params
[docs] def query( self, X, y, sample_weight=None, candidates=None, batch_size=1, return_utilities=False, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). sample_weight: array-like of shape (n_samples,), default=None Weights of training samples in `X`. candidates : None or array-like of shape (n_candidates), dtype=int or \ array-like of shape (n_candidates, n_features), default=None - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, *)`, the candidate samples are directly given in `candidates` (not necessarily contained in `X`). This is not supported by all query strategies. batch_size : int, default=1 The number of samples to be selected in one AL cycle. return_utilities : bool, default=False If `True`, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size,) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) or \ numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. """ # Check standard parameters. ( X, y, candidates, batch_size, return_utilities, ) = super()._validate_data( X=X, y=y, candidates=candidates, batch_size=batch_size, return_utilities=return_utilities, reset=True, ) # Obtain candidates plus mapping. X_cand, mapping = self._transform_candidates(candidates, X, y) util_cand = _alce( X_cand, X, y, self.base_regressor, self.cost_matrix, self.classes, self.embed_dim, sample_weight, self.missing_label, self.random_state_, self.mds_params, self.nn_params, ) if mapping is None: utilities = util_cand else: utilities = np.full(len(X), np.nan) utilities[mapping] = util_cand return simple_batch( utilities, self.random_state_, batch_size=batch_size, return_utilities=return_utilities, )
def _alce( X_cand, X, y, base_regressor, cost_matrix, classes, embed_dim, sample_weight, missing_label, random_state, mds_params, nn_params, ): """Compute the ALCE score for the candidate samples. Parameters ---------- X_cand : array-like of shape (n_candidates, n_features) Unlabeled candidate samples. X : array-like of shape (n_samples, n_features) Complete data set. y : array-like of shape (n_samples) Labels of the data set. base_regressor : RegressorMixin Regressor used for the embedding. cost_matrix : array-like of shape (n_classes, n_classes), default=None Cost matrix with `cost_matrix[i,j]` defining the cost of predicting class `j` for a sample with the actual class `i`. classes : array-like of shape (n_classes,) List of all classes that can occur. sample_weight : array-like, shape (n_samples) Weights for uncertain annotators. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState, default=None Random state for annotator selection. base_regressor : sklearn.base.RegressorMixin, default=None An sklearn regression model implementing the methods `fit` and `predict`. embed_dim : int, optional (default=None) If it is `None`, `embed_dim=n_classes` is used. mds_params : dict, default=None Parameters passed to `sklearn.manifold.MDS`. nn_params : dict, default=None Parameters passed to `sklearn.neighbors.NearestNeighbors`. Returns ------- utilities : np.ndarray of shape (n_candidates,) The utilities of all candidate samples. """ # Check base regressor if base_regressor is None: base_regressor = SVR() if not isinstance(base_regressor, RegressorMixin): raise TypeError("'base_regressor' must be an sklearn regressor") check_classifier_params(classes, missing_label, cost_matrix) if cost_matrix is None: cost_matrix = 1 - np.eye(len(classes)) if np.count_nonzero(cost_matrix) == 0: raise ValueError( "The cost matrix must contain at least one positive " "number." ) # Check the given data X, y, X_cand, sample_weight, sample_weight_cand = check_X_y( X, y, X_cand, sample_weight, ensure_all_finite=False, missing_label=missing_label, ) labeled = is_labeled(y, missing_label=missing_label) y = ExtLabelEncoder(classes, missing_label).fit_transform(y) X = X[labeled] y = y[labeled].astype(int) sample_weight = sample_weight[labeled] # If all samples are unlabeled, the strategy randomly selects a sample if len(X) == 0: warnings.warn( "There are no labeled samples. The strategy selects " "one random sample." ) return np.ones(len(X_cand)) # Check embedding dimension embed_dim = len(classes) if embed_dim is None else embed_dim check_scalar(embed_dim, "embed_dim", int, min_val=1) # Update mds parameters mds_params_default = { "metric": False, "n_components": embed_dim, "n_uq": len(classes), "max_iter": 300, "eps": 1e-6, "dissimilarity": "precomputed", "n_init": 8, "n_jobs": 1, "random_state": random_state, } if mds_params is not None: if type(mds_params) is not dict: raise TypeError("'mds_params' must be a dictionary or None") mds_params_default.update(mds_params) mds_params = mds_params_default # Update nearest neighbor parameters nn_params = {} if nn_params is None else nn_params if type(nn_params) is not dict: raise TypeError("'nn_params' must be a dictionary or None") regressors = [clone(base_regressor) for _ in range(embed_dim)] n_classes = len(classes) dissimilarities = np.zeros((2 * n_classes, 2 * n_classes)) dissimilarities[:n_classes, n_classes:] = cost_matrix dissimilarities[n_classes:, :n_classes] = cost_matrix.T W = np.zeros((2 * n_classes, 2 * n_classes)) W[:n_classes, n_classes:] = 1 W[n_classes:, :n_classes] = 1 mds = MDSP(**mds_params) embedding = mds.fit(dissimilarities).embedding_ class_embed = embedding[:n_classes, :] nn = NearestNeighbors(n_neighbors=1, **nn_params) nn.fit(embedding[n_classes:, :]) pred_embed = np.zeros((len(X_cand), embed_dim)) for i in range(embed_dim): regressors[i].fit(X, class_embed[y, i], sample_weight) pred_embed[:, i] = regressors[i].predict(X_cand) dist, _ = nn.kneighbors(pred_embed) utilities = dist[:, 0] return utilities """ Implementation of Multi-dimensional Scaling Partial (MDSP), which is a modification of: https://github.com/ntucllab/libact/blob/master/libact/query_strategies/multiclass/mdsp.py written by Kuan-Hao Huang (BSD license. Copyright (c) 2014, National Taiwan University All rights reserved.) and https://github.com/scikit-learn/scikit-learn/blob/14031f6/sklearn/manifold/mds.py. written by Nelle Varoquaux <nelle.varoquaux@gmail.com> (BSD license. Copyright (c) 2007–2016 The scikit-learn developers. All rights reserved.). """ def _smacof_single_p( similarities, n_uq, metric=True, n_components=2, init=None, max_iter=300, verbose=0, eps=1e-3, random_state=None, ): """Computes multidimensional scaling using the SMACOF algorithm. Parameters ---------- similarities : symmetric np.ndarray of shape (n * n,) Similarities between the points. n_uq : int Number of unique values. metric : boolean, default=True Compute metric or nonmetric SMACOF algorithm. n_components : int default=2 Number of dimension in which to immerse the similarities overwritten if initial array is provided. init : None or np.ndarray, default=None - If `init=None`, randomly chooses the initial configuration. - if `init` is np.ndarray, initialize SMACOF algorithm with this array. max_iter : int, default=300 Maximum number of iterations of the SMACOF algorithm for a single run. verbose : int, default=0 Level of verbosity. eps : float, default=1e-6 Relative tolerance w.r.t stress to declare converge. random_state : integer or np.RandomState, default=None The generator used to initialize the centers. If an integer is given, it fixes the seed. Defaults to the global numpy random number generator. Returns ------- X : np.ndarray of (n_samples, n_components) Coordinates of the n_samples points in a n_components-space. _stress : float The final value of the stress (sum of squared distance of the disparities and the distances for all constrained points). it : int Number of iterations run. """ similarities = check_symmetric(similarities, raise_exception=True) n_samples = similarities.shape[0] random_state = check_random_state(random_state) W = np.ones((n_samples, n_samples)) W[:n_uq, :n_uq] = 0.0 W[n_uq:, n_uq:] = 0.0 V = -W V[np.arange(len(V)), np.arange(len(V))] = W.sum(axis=1) e = np.ones((n_samples, 1)) Vp = ( np.linalg.inv(V + np.dot(e, e.T) / n_samples) - np.dot(e, e.T) / n_samples ) sim_flat = similarities.ravel() sim_flat_w = sim_flat[sim_flat != 0] if init is None: # Randomly choose initial configuration. X = random_state.rand(n_samples * n_components) X = X.reshape((n_samples, n_components)) else: # Overrides the parameter `p`. n_components = init.shape[1] if n_samples != init.shape[0]: raise ValueError( "init matrix should be of shape (%d, %d)" % (n_samples, n_components) ) X = init old_stress = None ir = IsotonicRegression() for it in range(max_iter): # Compute distance and monotonic regression. dis = euclidean_distances(X) if metric: disparities = similarities else: dis_flat = dis.ravel() # Similarities with 0 are considered as missing values. dis_flat_w = dis_flat[sim_flat != 0] # Compute the disparities using a monotonic regression. disparities_flat = ir.fit_transform(sim_flat_w, dis_flat_w) disparities = dis_flat.copy() disparities[sim_flat != 0] = disparities_flat disparities = disparities.reshape((n_samples, n_samples)) disparities *= np.sqrt( (n_samples * (n_samples - 1) / 2) / (disparities**2).sum() ) disparities[similarities == 0] = 0 # Compute stress. _stress = ( W.ravel() * ((dis.ravel() - disparities.ravel()) ** 2) ).sum() _stress /= 2 # Update X using the Guttman transform. dis[dis == 0] = 1e-5 ratio = disparities / dis _B = -W * ratio _B[np.arange(len(_B)), np.arange(len(_B))] += (W * ratio).sum(axis=1) X = np.dot(Vp, np.dot(_B, X)) dis = np.sqrt((X**2).sum(axis=1)).sum() if verbose >= 2: print("it: %d, stress %s" % (it, _stress)) if old_stress is not None: if (old_stress - _stress / dis) < eps: if verbose: print(f"breaking at iteration {it} with stress {_stress}") break old_stress = _stress / dis return X, _stress, it + 1 def smacof_p( similarities, n_uq, metric=True, n_components=2, init=None, n_init=8, n_jobs=1, max_iter=300, verbose=0, eps=1e-3, random_state=None, return_n_iter=False, ): """Computes multidimensional scaling using SMACOF (Scaling by Majorizing a Complicated Function) algorithm. The SMACOF algorithm is a multidimensional scaling algorithm: it minimizes a objective function, the *stress*, using a majorization technique. The Stress Majorization, also known as the Guttman Transform, guarantees a monotone convergence of Stress, and is more powerful than traditional techniques such as gradient descent. The SMACOF algorithm for metric MDS can summarized by the following steps: 1. Set an initial start configuration, randomly or not. 2. Compute the stress 3. Compute the Guttman Transform 4. Iterate 2 and 3 until convergence. The nonmetric algorithm adds a monotonic regression steps before computing the stress. Parameters ---------- similarities : symmetric np.ndarray of shape (n_samples, n_samples) Similarities between the points. n_uq : int Number of unique values. metric : boolean, default=True Compute metric or nonmetric SMACOF algorithm. n_components : int, default=2 Number of dimension in which to immerse the similarities overridden if initial array is provided. init : None or ndarray of shape (n_samples, n_components), default=None - If `init=None`, randomly chooses the initial configuration. - if `init` is np.ndarray, initialize SMACOF algorithm with this array. n_init : int, default=8 Number of times the smacof_p algorithm will be run with different initialisations. The final results will be the best output of the n_init consecutive runs in terms of stress. n_jobs : int, default=1 The number of jobs to use for the computation. This works by breaking down the pairwise matrix into n_jobs even slices and computing them in parallel. - If -1 all CPUs are used. - If 1 is given, no parallel computing code is used at all, which is useful for debugging. - For `n_jobs` below -1, `(n_cpus + 1 + n_jobs)` are used. Thus, for `n_jobs=-2, all CPUs but one are used. max_iter : int, default=300 Maximum number of iterations of the SMACOF algorithm for a single run. verbose : int, default=0 Level of verbosity. eps : float, default=1e-6 Relative tolerance w.r.t stress to declare converge. random_state : integer or numpy.RandomState, default=None The generator used to initialize the centers. If an integer is given, it fixes the seed. Defaults to the global numpy random number generator. return_n_iter : bool Whether or not to return the number of iterations. Returns ------- X : np.ndarray of shape (n_samples, n_components) Coordinates of the n_samples points in a n_components-space. stress : float The final value of the stress (sum of squared distance of the disparities and the distances for all constrained points). n_iter : int The number of iterations corresponding to the best stress. Returned only if `return_n_iter` is set to True. References ---------- .. [1] "Modern Multidimensional Scaling - Theory and Applications" Borg, I.; Groenen P. Springer Series in Statistics (1997) .. [2] "Nonmetric multidimensional scaling: a numerical method" Kruskal, J. Psychometrika, 29 (1964) .. [3] "Multidimensional scaling by optimizing goodness of fit to a nonmetric hypothesis" Kruskal, J. Psychometrika, 29, (1964) """ similarities = check_array(similarities) random_state = check_random_state(random_state) if hasattr(init, "__array__"): init = np.asarray(init).copy() if not n_init == 1: warnings.warn( "Explicit initial positions passed: " "performing only one init of the MDS instead of %d" % n_init ) n_init = 1 best_pos, best_stress = None, None if n_jobs == 1: for it in range(n_init): pos, stress, n_iter_ = _smacof_single_p( similarities, n_uq, metric=metric, n_components=n_components, init=init, max_iter=max_iter, verbose=verbose, eps=eps, random_state=random_state, ) if best_stress is None or stress < best_stress: best_stress = stress best_pos = pos.copy() best_iter = n_iter_ else: seeds = random_state.randint(np.iinfo(np.int32).max, size=n_init) results = Parallel(n_jobs=n_jobs, verbose=max(verbose - 1, 0))( delayed(_smacof_single_p)( similarities, n_uq, metric=metric, n_components=n_components, init=init, max_iter=max_iter, verbose=verbose, eps=eps, random_state=seed, ) for seed in seeds ) positions, stress, n_iters = zip(*results) best = np.argmin(stress) best_stress = stress[best] best_pos = positions[best] best_iter = n_iters[best] if return_n_iter: return best_pos, best_stress, best_iter else: return best_pos, best_stress class MDSP(BaseEstimator): """Multidimensional Scaling Partial (MDSP) Parameters ---------- metric : boolean, default=True compute metric or nonmetric SMACOF (Scaling by Majorizing a Complicated Function) algorithm. n_uq : int, default=1 Number of unique values. n_components : int, default=2 Number of dimension in which to immerse the similarities overridden if initial array is provided. n_init : int, default=4 Number of time the smacof_p algorithm will be run with different initialisation. The final results will be the best output of the n_init consecutive runs in terms of stress. max_iter : int, default=300 Maximum number of iterations of the SMACOF algorithm for a single run. verbose : int, default=0 Level of verbosity. eps : float, default=1e-6 Relative tolerance w.r.t stress to declare converge. n_jobs : int, default=1 The number of jobs to use for the computation. This works by breaking down the pairwise matrix into n_jobs even slices and computing them in parallel. - If -1 all CPUs are used. - If 1 is given, no parallel computing code is used at all, which is useful for debugging. - For `n_jobs` below -1, `(n_cpus + 1 + n_jobs)` are used. Thus, for `n_jobs=-2, all CPUs but one are used. random_state : integer or numpy.RandomState, default=None The generator used to initialize the centers. If an integer is given, it fixes the seed. Defaults to the global numpy random number generator. dissimilarity : string Which dissimilarity measure to use. Supported are 'euclidean' and 'precomputed'. Attributes ---------- embedding_ : array-like of shape (n_components, n_samples) Stores the position of the dataset in the embedding space stress_ : float The final value of the stress (sum of squared distance of the disparities and the distances for all constrained points). References ---------- .. [1] "Modern Multidimensional Scaling - Theory and Applications" Borg, I.; Groenen P. Springer Series in Statistics (1997) .. [2] "Nonmetric multidimensional scaling: a numerical method" Kruskal, J. Psychometrika, 29 (1964) .. [3] "Multidimensional scaling by optimizing goodness of fit to a nonmetric hypothesis" Kruskal, J. Psychometrika, 29, (1964) """ def __init__( self, n_components=2, n_uq=1, metric=True, n_init=4, max_iter=300, verbose=0, eps=1e-3, n_jobs=1, random_state=None, dissimilarity="euclidean", ): self.n_components = n_components self.n_uq = n_uq self.dissimilarity = dissimilarity self.metric = metric self.n_init = n_init self.max_iter = max_iter self.eps = eps self.verbose = verbose self.n_jobs = n_jobs self.random_state = random_state def fit(self, X, y=None, init=None): """Compute the position of the points in the embedding space. Parameters ---------- X : array-like of shape (n_samples, n_features), or \ (n_samples, n_samples) if dissimilarity='precomputed' Input data. init : None or ndarray of shape (n_samples,), default=None - If `None`, randomly chooses the initial configuration. - If `np.ndarray`, initialize the SMACOF algorithm with this array. """ self.fit_transform(X, init=init) return self def fit_transform(self, X, y=None, init=None): """Fit the data from X, and returns the embedded coordinates. Parameters ---------- X : array-like of shape (n_samples, n_features), or \ (n_samples, n_samples) if dissimilarity='precomputed' Input data. init : None or ndarray of shape (n_samples,), default=None - If `None`, randomly chooses the initial configuration. - If `np.ndarray`, initialize the SMACOF algorithm with this array. """ X = check_array(X) if X.shape[0] == X.shape[1] and self.dissimilarity != "precomputed": warnings.warn( "The MDS API has changed. `fit` now constructs an" " dissimilarity matrix from data. To use a custom " "dissimilarity matrix, set " "`dissimilarity=precomputed`." ) if self.dissimilarity == "precomputed": self.dissimilarity_matrix_ = X elif self.dissimilarity == "euclidean": self.dissimilarity_matrix_ = euclidean_distances(X) else: raise ValueError( "Proximity must be 'precomputed' or 'euclidean'." " Got %s instead" % str(self.dissimilarity) ) self.embedding_, self.stress_, self.n_iter_ = smacof_p( self.dissimilarity_matrix_, self.n_uq, metric=self.metric, n_components=self.n_components, init=init, n_init=self.n_init, n_jobs=self.n_jobs, max_iter=self.max_iter, verbose=self.verbose, eps=self.eps, random_state=self.random_state, return_n_iter=True, ) return self.embedding_