Source code for skactiveml.pool._wrapper

from ..base import SingleAnnotatorPoolQueryStrategy
from ..utils import (
    MISSING_LABEL,
    check_random_state,
    is_labeled,
    labeled_indices,
    unlabeled_indices,
    check_scalar,
    simple_batch,
    match_signature,
)
from math import ceil
import numpy as np
from joblib import Parallel, delayed, cpu_count
import warnings


[docs] class SubSamplingWrapper(SingleAnnotatorPoolQueryStrategy): """Sub-sampling Wrapper This class implements a wrapper for single-annotator pool-based strategies that randomly sub-samples a set of candidates before computing their utilities. This is useful when the number of available candidates is too large and a small subset of candidates is sufficient to select a good batch for labeling. The number of candidates can be controlled using `max_candidates` which supports an absolute number or a fraction of the available candidates. Additionally, `exclude_non_subsample` provides an option to mask all candidates that were not included in the subsample. This can further improve the runtime for query strategies that utilize all available unlabeled data in their selection. Parameters ---------- query_strategy : skactiveml.base.SingleAnnotatorPoolQueryStrategy The strategy used for computing the utilities of the candidate sub-sample. max_candidates : int or float, default=0.1 Determines the number of candidates. If `max_candidates` is an integer, `max_candidates` is the maximum number of candidates whose utilities are computed. If `max_candidates` is a float, `max_candidates` is the fraction of the original number of candidates. exclude_non_subsample : bool, default=False - If `True`, unlabeled candidates in `X` and `y` are excluded which are not part of the subsample. If `candidates` is an array-like of shape `(n_candidates, n_features)`, all unlabeled data will be removed from `X` and `y`. - If `False`, `X` and `y` stay the same. embed_samples_func : Callable or None, default=None - If `embed_samples_func` is a `Callable`, it must accept the samples `X` as input and return the sample-wise embeddings. - If `embed_samples_func` is None, no action is performed. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState, default=None The random state to use. """ def __init__( self, query_strategy=None, max_candidates=0.1, exclude_non_subsample=False, embed_samples_func=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( missing_label=missing_label, random_state=random_state ) self.query_strategy = query_strategy self.max_candidates = max_candidates self.exclude_non_subsample = exclude_non_subsample self.embed_samples_func = embed_samples_func
[docs] @match_signature("query_strategy", "query") def query( self, X, y, candidates=None, batch_size=1, return_utilities=False, **query_kwargs, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by self.MISSING_LABEL). candidates : None or array-like of shape (n_candidates), dtype=int or\ array-like of shape (n_candidates, n_features), default=None - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, ...)`, the candidate samples are directly given in `candidates` (not necessarily contained in `X`). This is not supported by all query strategies. batch_size : int, default=1 The number of samples to be selected in one AL cycle. return_utilities : bool, default=False If `True`, also return the utilities based on the query strategy. **query_kwargs : dict-like Further keyword arguments are passed to the `query` method of the `query_strategy` object. Returns ------- query_indices : numpy.ndarray of shape (batch_size,) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) or \ numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) if not isinstance( self.query_strategy, SingleAnnotatorPoolQueryStrategy ): raise TypeError( f"`query_strategy` is of type `{type(self.query_strategy)}` " f"but must be of type `SingleAnnotatorPoolQueryStrategy`." ) check_scalar(self.exclude_non_subsample, "exclude_non_subsample", bool) seed_multiplier = ( int(is_labeled(y, missing_label=self.missing_label_).sum()) + 1 ) max_candidates = self.max_candidates if isinstance(self.max_candidates, int): check_scalar( self.max_candidates, name="max_candidates", target_type=int, min_inclusive=True, min_val=1, ) elif isinstance(self.max_candidates, float): check_scalar( self.max_candidates, name="max_candidates", target_type=float, min_inclusive=False, max_inclusive=True, min_val=0.0, max_val=1.0, ) else: raise TypeError( f"`max_candidates` is of type `{type(self.max_candidates)}`" f" but must be in `[int, float]`." ) if self.embed_samples_func is not None and not callable( self.embed_samples_func ): raise TypeError( "`embed_samples_func` must be either a `Callable` or `None`." ) random_state = check_random_state(self.random_state, seed_multiplier) # subsampling with no explicit provided candidates if candidates is None: candidate_indices = unlabeled_indices( y=y, missing_label=self.missing_label_ ) # transform max_candidates to int if a ratio is given if isinstance(max_candidates, float): max_candidates = ceil( len(candidate_indices) * self.max_candidates ) max_candidates = min(max_candidates, len(candidate_indices)) # subsample new candidates new_candidates = random_state.choice( a=candidate_indices, size=max_candidates, replace=False ) # subsampling with provided explicit candidates else: # transform max_candidates to int if a ratio is given if isinstance(max_candidates, float): max_candidates = ceil(len(candidates) * self.max_candidates) max_candidates = min(max_candidates, len(candidates)) if candidates.ndim == 1: candidate_indices = candidates # subsample new candidates new_candidates = random_state.choice( a=candidates, size=max_candidates, replace=False ) else: candidate_indices = range(len(candidates)) # subsample new candidates new_candidate_indices = random_state.choice( a=candidate_indices, size=max_candidates, replace=False ) new_candidates = candidates[new_candidate_indices] # check if to exclude unlabeled non-candidate training data if self.exclude_non_subsample: all_labeled = labeled_indices( y=y, missing_label=self.missing_label_ ) if candidates is not None and candidates.ndim > 1: subset_and_labeled_indices = all_labeled else: # ignore labeled candidates to avoid duplicate samples all_labeled = np.setdiff1d(all_labeled, new_candidates) subset_and_labeled_indices = np.concatenate( [all_labeled, new_candidates] ) sorted_idx = np.argsort(subset_and_labeled_indices) subset_and_labeled_indices = subset_and_labeled_indices[sorted_idx] new_X = X[subset_and_labeled_indices] new_y = y[subset_and_labeled_indices] # for explicitly provided candidates recalculate candidate indices # that are passed to the wrapped query strategy if candidates is None or candidates.ndim == 1: new_candidates = np.flatnonzero(sorted_idx >= len(all_labeled)) else: new_X = X new_y = y if self.embed_samples_func: new_X = self.embed_samples_func(new_X) qs_output = self.query_strategy.query( X=new_X, y=new_y, candidates=new_candidates, batch_size=batch_size, return_utilities=return_utilities, **query_kwargs, ) # unpack result of query strategy if needed queried_indices = qs_output utilities = None if return_utilities: queried_indices, utilities = qs_output # retransform queried indices and utilities as if no training data was # removed if self.exclude_non_subsample and ( candidates is None or candidates.ndim == 1 ): # transform to original candidate indices queried_indices = subset_and_labeled_indices[queried_indices] # transform to original utilities shape if utilities is not None: new_utilities = np.full( shape=(batch_size, len(X)), fill_value=np.nan ) transformed_new_candidates = subset_and_labeled_indices[ new_candidates ] new_utilities[:, transformed_new_candidates] = utilities[ :, new_candidates ] utilities = new_utilities new_candidates = transformed_new_candidates # transform indices if candidates was provided in the shape of # (n_candidates, n_features) if candidates is not None and candidates.ndim > 1: new_queried_indices = new_candidate_indices[queried_indices] else: new_queried_indices = queried_indices # transform utilities from subsampled shape to original utilities shape if return_utilities: if candidates is None or candidates.ndim == 1: new_utilities = np.full( shape=(batch_size, len(X)), fill_value=np.nan ) new_utilities[:, candidate_indices] = -np.inf new_utilities[:, new_candidates] = utilities[:, new_candidates] else: new_utilities = np.full( shape=(batch_size, len(candidates)), fill_value=np.nan ) new_utilities[:, candidate_indices] = -np.inf new_utilities[:, new_candidate_indices] = utilities if return_utilities: return new_queried_indices, new_utilities else: return new_queried_indices
[docs] class ParallelUtilityEstimationWrapper(SingleAnnotatorPoolQueryStrategy): """Parallel Utility Estimation Wrapper This class implements a wrapper for single-annotator pool-based strategies such that utilities for candidates can be calculated in parallel. The main assumption for this is that the utility computations are independent from another. Therefore, only `batch_size=1` is supported. Parameters ---------- query_strategy : skactiveml.base.SingleAnnotatorPoolQueryStrategy The strategy used for computing the utilities of the candidates. n_jobs : int, default=-1 Determines the number of maximum number of parallel utility computations. If `n_jobs` is set to -1 (default), the number of parallel computations is set to the number of available CPU cores are. For further details refer to `n_jobs` in `joblib.Parallel`. parallel_dict : dict-like, default=None Further arguments that will be passed to `joblib.Parallel`. Note that, `n_jobs` should not be set in `parallel_dict`. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState, default=None The random state to use. """ def __init__( self, query_strategy=None, n_jobs=-1, parallel_dict=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( missing_label=missing_label, random_state=random_state ) self.query_strategy = query_strategy self.n_jobs = n_jobs self.parallel_dict = parallel_dict
[docs] @match_signature("query_strategy", "query") def query( self, X, y, candidates=None, batch_size=1, return_utilities=False, **query_kwargs, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples) Labels of the training data set (possibly including unlabeled ones indicated by self.MISSING_LABEL). candidates : None or array-like of shape (n_candidates), dtype=int or array-like of shape (n_candidates, n_features), (default=None) - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, ...)`, the candidate samples are directly given in `candidates` (not necessarily contained in `X`). This is not supported by all query strategies. batch_size : int, default=1 The number of samples to be selected in one AL cycle. For this wrapper, only `batch_size=1` is supported. return_utilities : bool, default=False If `True`, also return the utilities based on the query strategy. **query_kwargs : dict-like Further keyword arguments are passed to the `query` method of the `query_strategy` object. Returns ------- query_indices : numpy.ndarray of shape (batch_size,) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) or \ numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) if batch_size != 1: raise ValueError("`batch_size` must be set to 1.") if not isinstance( self.query_strategy, SingleAnnotatorPoolQueryStrategy ): raise TypeError( f"`query_strategy` is of type `{type(self.query_strategy)}` " f"but must be of type `SingleAnnotatorPoolQueryStrategy`." ) X_cand, mapping = self._transform_candidates(candidates, X, y) if self.parallel_dict is None: parallel_dict = {} elif isinstance(self.parallel_dict, dict): parallel_dict = self.parallel_dict.copy() if "n_jobs" in parallel_dict.keys(): warnings.warn( f"`n_jobs` ({parallel_dict['n_jobs']}) " "is specified in `parallel_dict`. " f"This will be replaced with `n_jobs={self.n_jobs}`." ) else: raise TypeError( f"`parallel_dict` is of type `{type(self.parallel_dict)}` " f"but must be a dictionary or None." ) parallel_dict["n_jobs"] = min(self.n_jobs, len(X_cand)) parallel_pool = Parallel(**parallel_dict) def query_lambda_func(candidate): return self.query_strategy.query( X=X, y=y, candidates=np.array(candidate), batch_size=1, return_utilities=True, **query_kwargs, ) if parallel_dict["n_jobs"] < 0: chunks = np.array_split(X_cand, cpu_count()) else: chunks = np.array_split(X_cand, parallel_dict["n_jobs"]) qs_outputs = parallel_pool( delayed(query_lambda_func)(c) for c in chunks ) utilities_cand = np.concatenate( [qs_output[1][0] for qs_output in qs_outputs], axis=0 ) if mapping is None: utilities = utilities_cand else: utilities = np.full(len(X), np.nan) utilities[mapping] = utilities_cand return simple_batch( utilities, self.random_state_, batch_size=batch_size, return_utilities=return_utilities, )