Source code for skactiveml.pool._greedy_sampling

import numpy as np
from sklearn import clone
from sklearn.metrics import pairwise_distances, pairwise

from skactiveml.base import (
    SingleAnnotatorPoolQueryStrategy,
    SkactivemlRegressor,
)
from skactiveml.utils import (
    rand_argmax,
    labeled_indices,
    MISSING_LABEL,
    is_labeled,
    check_type,
    check_scalar,
)


[docs]class GreedySamplingX(SingleAnnotatorPoolQueryStrategy): """Greedy Sampling in the Feature Space (GSx) This class implements the query strategy Greedy Sampling in the Feature Space (GSx) [1]_ that tries to select those samples that increase the diversity of the feature space the most. Parameters ---------- metric : str, default="euclidean" Metric used for calculating the distances of the samples in the feature space. It must be a valid argument for `sklearn.metrics.pairwise_distances` argument `metric`. metric_dict : dict, default=None Any further parameters are passed directly to the pairwise_distances function. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState, default=None Random state for candidate selection. References ---------- .. [1] D. Wu, C.-T. Lin, and J. Huang. Active Learning for Regression using Greedy Sampling. Inf. Sci., 474:90–105, 2019. """ def __init__( self, metric=None, metric_dict=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( random_state=random_state, missing_label=missing_label ) self.metric = metric self.metric_dict = metric_dict
[docs] def query( self, X, y, candidates=None, batch_size=1, return_utilities=False ): """Query the next samples to be labeled. X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`.) candidates : None or array-like of shape (n_candidates, ) of type \ int, default=None - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, *)`, `candidates` is considered as the candidate samples in `(X,y)`. batch_size : int, default=1 The number of samples to be selected in one AL cycle. return_utilities : bool, default=False If true, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `utilities` refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, *)`, `utilities` refers to the indexing in `candidates`. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) X_cand, mapping = self._transform_candidates(candidates, X, y) sample_indices = np.arange(len(X), dtype=int) selected_indices = labeled_indices(y, missing_label=self.missing_label) if mapping is None: X_all = np.append(X, X_cand, axis=0) candidate_indices = len(X) + np.arange(len(X_cand), dtype=int) else: X_all = X candidate_indices = mapping query_indices_cand, utilities_cand = _greedy_sampling( X_cand, X_all, sample_indices, selected_indices, candidate_indices, batch_size, random_state=self.random_state_, method="x", metric_x=self.metric, metric_dict_x=self.metric_dict, ) if mapping is not None: utilities = np.full((batch_size, len(X)), np.nan) utilities[:, mapping] = utilities_cand query_indices = mapping[query_indices_cand] else: utilities, query_indices = utilities_cand, query_indices_cand if return_utilities: return query_indices, utilities else: return query_indices
[docs]class GreedySamplingTarget(SingleAnnotatorPoolQueryStrategy): """Greedy Sampling in the Target Space (GSi or GSy) This class implements the query strategy Greedy Sampling in the Target Space (GSi or GSy) [1]_ that at first selects samples to maximize the diversity in the feature space and than selects samples to maximize the diversity in the feature and the target space (GSi), optionally only the diversity in the target space can be maximized (GSy). Parameters ---------- x_metric : str, default=None Metric used for calculating the distances of the samples in the feature space. It must be a valid argument for `sklearn.metrics.pairwise_distances` argument `metric`. y_metric : str, default=None Metric used for calculating the distances of the samples in the target space. It must be a valid argument for `sklearn.metrics.pairwise_distances` argument `metric`. x_metric_dict : dict, default=None Any further parameters for computing the distances of the samples in the feature space are passed directly to the pairwise_distances function. y_metric_dict : dict, default=None Any further parameters for computing the distances of the samples in the target space are passed directly to the pairwise_distances function. n_GSx_samples : int, default=1 Indicates the number of selected samples required till the query strategy switches from GSx to the strategy specified by `method`. method : "GSy" or "GSi", optional (default="GSi") Specifies whether only the diversity in the target space ("GSy") or the diversity in the feature and the target space ("GSi") should be maximized, when the number of selected samples exceeds `n_GSx_samples`. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState, default=None Random state for candidate selection. References ---------- .. [1] D. Wu, C.-T. Lin, and J. Huang. Active Learning for Regression using Greedy Sampling. Inf. Sci., 474:90–105, 2019. """ def __init__( self, x_metric=None, y_metric=None, x_metric_dict=None, y_metric_dict=None, method=None, n_GSx_samples=1, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( random_state=random_state, missing_label=missing_label ) self.method = method self.x_metric = x_metric self.y_metric = y_metric self.x_metric_dict = x_metric_dict self.y_metric_dict = y_metric_dict self.n_GSx_samples = n_GSx_samples
[docs] def query( self, X, y, reg, fit_reg=True, sample_weight=None, candidates=None, batch_size=1, return_utilities=False, ): """Query the next samples to be labeled. X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`.) candidates : None or array-like of shape (n_candidates, ) of type \ int, default=None - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, *)`, `candidates` is considered as the candidate samples in `(X,y)`. batch_size : int, default=1 The number of samples to be selected in one AL cycle. return_utilities : bool, default=False If true, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `utilities` refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, *)`, `utilities` refers to the indexing in `candidates`. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) check_type(reg, "reg", SkactivemlRegressor) check_type(fit_reg, "fit_reg", bool) if self.method is None: self.method = "GSi" check_type(self.method, "self.method", target_vals=["GSy", "GSi"]) check_scalar(self.n_GSx_samples, "self.k_0", int, min_val=0) X_cand, mapping = self._transform_candidates(candidates, X, y) n_labeled = np.sum(is_labeled(y, missing_label=self.missing_label_)) batch_size_x = max(0, min(self.n_GSx_samples - n_labeled, batch_size)) batch_size_y = batch_size - batch_size_x if fit_reg: if sample_weight is None: reg = clone(reg).fit(X, y) else: reg = clone(reg).fit(X, y, sample_weight) sample_indices = np.arange(len(X), dtype=int) selected_indices = labeled_indices(y) y_cand = reg.predict(X_cand) if mapping is None: X_all = np.append(X, X_cand, axis=0) y_all = np.append(y, reg.predict(X_cand)) candidate_indices = len(X) + np.arange(len(X_cand), dtype=int) else: X_all = X y_all = y.copy() y_all[mapping] = y_cand candidate_indices = mapping query_indices = np.zeros(batch_size, dtype=int) utilities = np.full((batch_size, len(X_cand)), np.nan) if batch_size_x > 0: query_indices_x, utilities_x = _greedy_sampling( X_cand=X_cand, y_cand=y_cand, X=X_all, y=y_all, sample_indices=sample_indices, selected_indices=selected_indices, candidate_indices=candidate_indices, batch_size=batch_size_x, random_state=None, metric_x=self.x_metric, metric_dict_x=self.x_metric_dict, method="x", ) query_indices[0:batch_size_x] = query_indices_x utilities[0:batch_size_x, :] = utilities_x else: query_indices_x = np.array([], dtype=int) selected_indices = np.append( selected_indices, candidate_indices[query_indices_x] ) candidate_indices = np.delete(candidate_indices, query_indices_x) is_queried = np.full(len(X_cand), False) is_queried[query_indices_x] = True unselected_cands = np.argwhere(~is_queried).flatten() X_cand = np.delete(X_cand, query_indices_x, axis=0) y_cand = np.delete(y_cand, query_indices_x) if batch_size_y > 0: query_indices_y, utilities_y = _greedy_sampling( X_cand=X_cand, y_cand=y_cand, X=X_all, y=y_all, sample_indices=sample_indices, selected_indices=selected_indices, candidate_indices=candidate_indices, batch_size=batch_size_y, random_state=None, metric_x=self.x_metric, metric_dict_x=self.x_metric_dict, metric_y=self.y_metric, metric_dict_y=self.y_metric_dict, method="xy" if self.method == "GSi" else "y", ) query_indices[batch_size_x:] = unselected_cands[query_indices_y] utilities[batch_size_x:][:, unselected_cands] = utilities_y if mapping is not None: utilities_cand, query_indices_cand = utilities, query_indices utilities = np.full((batch_size, len(X)), np.nan) utilities[:, mapping] = utilities_cand query_indices = mapping[query_indices_cand] if return_utilities: return query_indices, utilities else: return query_indices
def _greedy_sampling( X_cand, X, sample_indices, selected_indices, candidate_indices, batch_size, y_cand=None, y=None, random_state=None, method=None, **kwargs, ): dist_dict = dict( X_cand=X_cand, y_cand=y_cand, X=X, y=y, method=method, **kwargs ) query_indices = np.zeros(batch_size, dtype=int) utilities = np.full((batch_size, len(X_cand)), np.nan) distances = np.full((len(X_cand), len(X)), np.nan) if len(selected_indices) == 0: distances[:, sample_indices] = _measure_distance( sample_indices, **dist_dict ) else: distances[:, selected_indices] = _measure_distance( selected_indices, **dist_dict ) not_selected_candidates = np.arange(len(X_cand), dtype=int) for i in range(batch_size): if len(selected_indices) == 0: dist = distances[not_selected_candidates][:, sample_indices] util = -np.sum(dist, axis=1) else: dist = distances[not_selected_candidates][:, selected_indices] util = np.min(dist, axis=1) utilities[i, not_selected_candidates] = util idx = rand_argmax(util, random_state=random_state) query_indices[i] = not_selected_candidates[idx][0] distances[:, candidate_indices[idx]] = _measure_distance( candidate_indices[idx], **dist_dict ) selected_indices = np.append( selected_indices, candidate_indices[idx], axis=0 ) candidate_indices = np.delete(candidate_indices, idx, axis=0) not_selected_candidates = np.delete(not_selected_candidates, idx) return query_indices, utilities def _measure_distance( indices, X_cand, y_cand, X, y, metric_dict_x=None, metric_x=None, metric_dict_y=None, metric_y=None, method=None, ): metric_x = metric_x if metric_x is not None else "euclidean" metric_y = metric_y if metric_y is not None else "euclidean" for metric, name in zip([metric_x, metric_y], ["metric_x", "metric_y"]): check_type( metric, name, target_vals=pairwise.PAIRWISE_DISTANCE_FUNCTIONS.keys(), ) metric_dict_x = metric_dict_x if metric_dict_x is not None else {} metric_dict_y = metric_dict_y if metric_dict_y is not None else {} for metric_dict, name in zip( [metric_dict_x, metric_dict_y], ["metric_dict_x", "metric_dict_y"] ): check_type(metric_dict, name, dict) dist = np.ones((len(X_cand), len(indices))) if "x" in method: dist *= pairwise_distances( X_cand, X[indices], metric=metric_x, **metric_dict_x ) if "y" in method: dist *= pairwise_distances( y_cand.reshape(-1, 1), y[indices].reshape(-1, 1), metric=metric_y, **metric_dict_y, ) return dist