Source code for skactiveml.pool._greedy_sampling

import numpy as np
from sklearn import clone
from sklearn.metrics import pairwise_distances, pairwise

from skactiveml.base import (
    SingleAnnotatorPoolQueryStrategy,
    SkactivemlRegressor,
)
from skactiveml.utils import (
    rand_argmax,
    labeled_indices,
    MISSING_LABEL,
    is_labeled,
    check_type,
    check_scalar,
)


[docs]class GreedySamplingX(SingleAnnotatorPoolQueryStrategy): """Greedy Sampling on the feature space. This class implements greedy sampling on the feature space. A query strategy that tries to select those samples that increase the diversity of the feature space the most. Parameters ---------- metric : str, optional (default="euclidean") Metric used for calculating the distances of the samples in the feature space. It must be a valid argument for `sklearn.metrics.pairwise_distances` argument `metric`. metric_dict : dict, optional (default=None) Any further parameters are passed directly to the pairwise_distances function. missing_label : scalar or string or np.nan or None, (default=skactiveml.utils.MISSING_LABEL) Value to represent a missing label. random_state : int | np.random.RandomState, optional Random state for candidate selection. References ---------- [1] Wu, Dongrui, Chin-Teng Lin, and Jian Huang. Active Learning for Regression Using Greedy Sampling, Information Sciences, pages 90--105, 2019. """ def __init__( self, metric=None, metric_dict=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( random_state=random_state, missing_label=missing_label ) self.metric = metric self.metric_dict = metric_dict
[docs] def query( self, X, y, candidates=None, batch_size=1, return_utilities=False ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e. including the labeled and unlabeled samples. y : array-like of shape (n_samples) Labels of the training data set (possibly including unlabeled ones indicated by self.MISSING_LABEL). candidates : None or array-like of shape (n_candidates), dtype=int or array-like of shape (n_candidates, n_features), optional (default=None) If candidates is None, the unlabeled samples from (X,y) are considered as candidates. If candidates is of shape (n_candidates) and of type int, candidates is considered as the indices of the samples in (X,y). If candidates is of shape (n_candidates, n_features), the candidates are directly given in candidates (not necessarily contained in X). batch_size : int, optional (default=1) The number of samples to be selected in one AL cycle. return_utilities : bool, optional (default=False) If true, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size) The query_indices indicate for which candidate sample a label is to queried, e.g., `query_indices[0]` indicates the first selected sample. If candidates is None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. utilities : numpy.ndarray of shape (batch_size, n_samples) or numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. If candidates is None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) X_cand, mapping = self._transform_candidates(candidates, X, y) sample_indices = np.arange(len(X), dtype=int) selected_indices = labeled_indices(y, missing_label=self.missing_label) if mapping is None: X_all = np.append(X, X_cand, axis=0) candidate_indices = len(X) + np.arange(len(X_cand), dtype=int) else: X_all = X candidate_indices = mapping query_indices_cand, utilities_cand = _greedy_sampling( X_cand, X_all, sample_indices, selected_indices, candidate_indices, batch_size, random_state=self.random_state_, method="x", metric_x=self.metric, metric_dict_x=self.metric_dict, ) if mapping is not None: utilities = np.full((batch_size, len(X)), np.nan) utilities[:, mapping] = utilities_cand query_indices = mapping[query_indices_cand] else: utilities, query_indices = utilities_cand, query_indices_cand if return_utilities: return query_indices, utilities else: return query_indices
[docs]class GreedySamplingTarget(SingleAnnotatorPoolQueryStrategy): """Greedy Sampling on the target space. This class implements greedy sampling on the target space. A query strategy that at first selects samples to maximize the diversity in the feature space and than selects samples to maximize the diversity in the feature and the target space (GSi), optionally only the diversity in the target space can be maximized (GSy). Parameters ---------- x_metric : str, optional (default=None) Metric used for calculating the distances of the samples in the feature space. It must be a valid argument for `sklearn.metrics.pairwise_distances` argument `metric`. y_metric : str, optional (default=None) Metric used for calculating the distances of the samples in the target space. It must be a valid argument for `sklearn.metrics.pairwise_distances` argument `metric`. x_metric_dict : dict, optional (default=None) Any further parameters for computing the distances of the samples in the feature space are passed directly to the pairwise_distances function. y_metric_dict : dict, optional (default=None) Any further parameters for computing the distances of the samples in the target space are passed directly to the pairwise_distances function. n_GSx_samples : int, optional (default=1) Indicates the number of selected samples required till the query strategy switches from GSx to the strategy specified by `method`. method : "GSy" or "GSi", optional (default="GSi") Specifies whether only the diversity in the target space (`GSy`) or the diversity in the feature and the target space (`GSi`) should be maximized, when the number of selected samples exceeds `n_GSx_samples`. missing_label : scalar or string or np.nan or None, (default=skactiveml.utils.MISSING_LABEL) Value to represent a missing label. random_state : int | np.random.RandomState, optional Random state for candidate selection. References ---------- [1] Wu, Dongrui, Chin-Teng Lin, and Jian Huang. Active Learning for Regression using Greedy Sampling, Information Sciences, pages 90--105, 2019. """ def __init__( self, x_metric=None, y_metric=None, x_metric_dict=None, y_metric_dict=None, method=None, n_GSx_samples=1, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( random_state=random_state, missing_label=missing_label ) self.method = method self.x_metric = x_metric self.y_metric = y_metric self.x_metric_dict = x_metric_dict self.y_metric_dict = y_metric_dict self.n_GSx_samples = n_GSx_samples
[docs] def query( self, X, y, reg, fit_reg=True, sample_weight=None, candidates=None, batch_size=1, return_utilities=False, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e. including the labeled and unlabeled samples. y : array-like of shape (n_samples) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). reg: SkactivemlRegressor Regressor to predict the data. fit_reg : bool, optional (default=True) Defines whether the regressor should be fitted on `X`, `y`, and `sample_weight`. sample_weight: array-like of shape (n_samples), optional (default=None) Weights of training samples in `X`. candidates : None or array-like of shape (n_candidates), dtype=int or array-like of shape (n_candidates, n_features), optional (default=None) If candidates is None, the unlabeled samples from (X,y) are considered as candidates. If candidates is of shape (n_candidates) and of type int, candidates is considered as the indices of the samples in (X,y). If candidates is of shape (n_candidates, n_features), the candidates are directly given in candidates (not necessarily contained in X). batch_size : int, optional (default=1) The number of samples to be selected in one AL cycle. return_utilities : bool, optional (default=False) If true, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size) The query_indices indicate for which candidate sample a label is to queried, e.g., `query_indices[0]` indicates the first selected sample. If candidates is None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. utilities : numpy.ndarray of shape (batch_size, n_samples) or numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. If candidates is None or of shape (n_candidates), the indexing refers to samples in X. If candidates is of shape (n_candidates, n_features), the indexing refers to samples in candidates. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) check_type(reg, "reg", SkactivemlRegressor) check_type(fit_reg, "fit_reg", bool) if self.method is None: self.method = "GSi" check_type(self.method, "self.method", target_vals=["GSy", "GSi"]) check_scalar(self.n_GSx_samples, "self.k_0", int, min_val=0) X_cand, mapping = self._transform_candidates(candidates, X, y) n_labeled = np.sum(is_labeled(y, missing_label=self.missing_label_)) batch_size_x = max(0, min(self.n_GSx_samples - n_labeled, batch_size)) batch_size_y = batch_size - batch_size_x if fit_reg: if sample_weight is None: reg = clone(reg).fit(X, y) else: reg = clone(reg).fit(X, y, sample_weight) sample_indices = np.arange(len(X), dtype=int) selected_indices = labeled_indices(y) y_cand = reg.predict(X_cand) if mapping is None: X_all = np.append(X, X_cand, axis=0) y_all = np.append(y, reg.predict(X_cand)) candidate_indices = len(X) + np.arange(len(X_cand), dtype=int) else: X_all = X y_all = y.copy() y_all[mapping] = y_cand candidate_indices = mapping query_indices = np.zeros(batch_size, dtype=int) utilities = np.full((batch_size, len(X_cand)), np.nan) if batch_size_x > 0: query_indices_x, utilities_x = _greedy_sampling( X_cand=X_cand, y_cand=y_cand, X=X_all, y=y_all, sample_indices=sample_indices, selected_indices=selected_indices, candidate_indices=candidate_indices, batch_size=batch_size_x, random_state=None, metric_x=self.x_metric, metric_dict_x=self.x_metric_dict, method="x", ) query_indices[0:batch_size_x] = query_indices_x utilities[0:batch_size_x, :] = utilities_x else: query_indices_x = np.array([], dtype=int) selected_indices = np.append( selected_indices, candidate_indices[query_indices_x] ) candidate_indices = np.delete(candidate_indices, query_indices_x) is_queried = np.full(len(X_cand), False) is_queried[query_indices_x] = True unselected_cands = np.argwhere(~is_queried).flatten() X_cand = np.delete(X_cand, query_indices_x, axis=0) y_cand = np.delete(y_cand, query_indices_x) if batch_size_y > 0: query_indices_y, utilities_y = _greedy_sampling( X_cand=X_cand, y_cand=y_cand, X=X_all, y=y_all, sample_indices=sample_indices, selected_indices=selected_indices, candidate_indices=candidate_indices, batch_size=batch_size_y, random_state=None, metric_x=self.x_metric, metric_dict_x=self.x_metric_dict, metric_y=self.y_metric, metric_dict_y=self.y_metric_dict, method="xy" if self.method == "GSi" else "y", ) query_indices[batch_size_x:] = unselected_cands[query_indices_y] utilities[batch_size_x:][:, unselected_cands] = utilities_y if mapping is not None: utilities_cand, query_indices_cand = utilities, query_indices utilities = np.full((batch_size, len(X)), np.nan) utilities[:, mapping] = utilities_cand query_indices = mapping[query_indices_cand] if return_utilities: return query_indices, utilities else: return query_indices
def _greedy_sampling( X_cand, X, sample_indices, selected_indices, candidate_indices, batch_size, y_cand=None, y=None, random_state=None, method=None, **kwargs, ): dist_dict = dict( X_cand=X_cand, y_cand=y_cand, X=X, y=y, method=method, **kwargs ) query_indices = np.zeros(batch_size, dtype=int) utilities = np.full((batch_size, len(X_cand)), np.nan) distances = np.full((len(X_cand), len(X)), np.nan) if len(selected_indices) == 0: distances[:, sample_indices] = _measure_distance( sample_indices, **dist_dict ) else: distances[:, selected_indices] = _measure_distance( selected_indices, **dist_dict ) not_selected_candidates = np.arange(len(X_cand), dtype=int) for i in range(batch_size): if len(selected_indices) == 0: dist = distances[not_selected_candidates][:, sample_indices] util = -np.sum(dist, axis=1) else: dist = distances[not_selected_candidates][:, selected_indices] util = np.min(dist, axis=1) utilities[i, not_selected_candidates] = util idx = rand_argmax(util, random_state=random_state) query_indices[i] = not_selected_candidates[idx][0] distances[:, candidate_indices[idx]] = _measure_distance( candidate_indices[idx], **dist_dict ) selected_indices = np.append( selected_indices, candidate_indices[idx], axis=0 ) candidate_indices = np.delete(candidate_indices, idx, axis=0) not_selected_candidates = np.delete(not_selected_candidates, idx) return query_indices, utilities def _measure_distance( indices, X_cand, y_cand, X, y, metric_dict_x=None, metric_x=None, metric_dict_y=None, metric_y=None, method=None, ): metric_x = metric_x if metric_x is not None else "euclidean" metric_y = metric_y if metric_y is not None else "euclidean" for metric, name in zip([metric_x, metric_y], ["metric_x", "metric_y"]): check_type( metric, name, target_vals=pairwise.PAIRWISE_DISTANCE_FUNCTIONS.keys(), ) metric_dict_x = metric_dict_x if metric_dict_x is not None else {} metric_dict_y = metric_dict_y if metric_dict_y is not None else {} for metric_dict, name in zip( [metric_dict_x, metric_dict_y], ["metric_dict_x", "metric_dict_y"] ): check_type(metric_dict, name, dict) dist = np.ones((len(X_cand), len(indices))) if "x" in method: dist *= pairwise_distances( X_cand, X[indices], metric=metric_x, **metric_dict_x ) if "y" in method: dist *= pairwise_distances( y_cand.reshape(-1, 1), y[indices].reshape(-1, 1), metric=metric_y, **metric_dict_y, ) return dist