Source code for skactiveml.pool._wrapper

from ..base import SingleAnnotatorPoolQueryStrategy
from ..utils import (
    MISSING_LABEL,
    check_random_state,
    is_labeled,
    labeled_indices,
    unlabeled_indices,
    check_scalar,
    simple_batch,
    match_signature,
)
from math import ceil
import numpy as np
from joblib import Parallel, delayed, cpu_count
import warnings


[docs]class SubSamplingWrapper(SingleAnnotatorPoolQueryStrategy): """Sub-sampling Wrapper This class implements a wrapper for single-annotator pool-based strategies that randomly sub-samples a set of candidates before computing their utilities. Parameters ---------- query_strategy : skactiveml.base.SingleAnnotatorPoolQueryStrategy The strategy used for computing the utilities of the candidate sub-sample. max_candidates : int or float, default=0.1 Determines the number of candidates. If `max_candidates` is an integer, `max_candidates` is the maximum number of candidates whose utilities are computed. If `max_candidates` is a float, `max_candidates` is the fraction of the original number of candidates. exclude_non_subsample : bool, default=False - If `True`, unlabeled candidates in `X` and `y` are excluded which are not part of the subsample. If `candidates` is an array-like of shape `(n_candidates, n_features)`, all unlabeled data will be removed from `X` and `y`. - If `False`, `X` and `y` stay the same. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState, default=None The random state to use. """ def __init__( self, query_strategy=None, max_candidates=0.1, exclude_non_subsample=False, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( missing_label=missing_label, random_state=random_state ) self.query_strategy = query_strategy self.max_candidates = max_candidates self.exclude_non_subsample = exclude_non_subsample
[docs] @match_signature("query_strategy", "query") def query( self, X, y, candidates=None, batch_size=1, return_utilities=False, **query_kwargs, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples) Labels of the training data set (possibly including unlabeled ones indicated by self.MISSING_LABEL). candidates : None or array-like of shape (n_candidates), dtype=int or array-like of shape (n_candidates, n_features), default=None - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, *)`, the candidate samples are directly given in `candidates` (not necessarily contained in `X`). This is not supported by all query strategies. batch_size : int, default=1 The number of samples to be selected in one AL cycle. return_utilities : bool, default=False If `True`, also return the utilities based on the query strategy. **query_kwargs : dict-like Further keyword arguments are passed to the `query` method of the `query_strategy` object. Returns ------- query_indices : numpy.ndarray of shape (batch_size,) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) or \ numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) if not isinstance( self.query_strategy, SingleAnnotatorPoolQueryStrategy ): raise TypeError( f"`query_strategy` is of type `{type(self.query_strategy)}` " f"but must be of type `SingleAnnotatorPoolQueryStrategy`." ) check_scalar(self.exclude_non_subsample, "exclude_non_subsample", bool) seed_multiplier = ( int(is_labeled(y, missing_label=self.missing_label_).sum()) + 1 ) max_candidates = self.max_candidates if isinstance(self.max_candidates, int): check_scalar( self.max_candidates, name="max_candidates", target_type=int, min_inclusive=True, min_val=1, ) elif isinstance(self.max_candidates, float): check_scalar( self.max_candidates, name="max_candidates", target_type=float, min_inclusive=False, max_inclusive=True, min_val=0.0, max_val=1.0, ) else: raise TypeError( f"`max_candidates` is of type `{type(self.max_candidates)}`" f" but must be in `[int, float]`." ) random_state = check_random_state(self.random_state, seed_multiplier) # subsampling with no explicit provided candidates if candidates is None: candidate_indices = unlabeled_indices( y=y, missing_label=self.missing_label_ ) # transform max_candidates to int if a ratio is given if isinstance(max_candidates, float): max_candidates = ceil( len(candidate_indices) * self.max_candidates ) max_candidates = min(max_candidates, len(candidate_indices)) # subsample new candidates new_candidates = random_state.choice( a=candidate_indices, size=max_candidates, replace=False ) # subsampling with privided explicit candidates else: # transform max_candidates to int if a ratio is given if isinstance(max_candidates, float): max_candidates = ceil(len(candidates) * self.max_candidates) max_candidates = min(max_candidates, len(candidates)) if candidates.ndim == 1: candidate_indices = candidates # subsample new candidates new_candidates = random_state.choice( a=candidates, size=max_candidates, replace=False ) else: candidate_indices = range(len(candidates)) # subsample new candidates new_candidate_indices = random_state.choice( a=candidate_indices, size=max_candidates, replace=False ) new_candidates = candidates[new_candidate_indices] # check if to exclude unlabeled non-candidate training data if self.exclude_non_subsample: all_labeled = labeled_indices( y=y, missing_label=self.missing_label_ ) if candidates is not None and candidates.ndim > 1: subset_and_labeled_indices = all_labeled else: subset_and_labeled_indices = np.concatenate( [all_labeled, new_candidates] ) subset_and_labeled_indices = np.sort(subset_and_labeled_indices) new_X = X[subset_and_labeled_indices] new_y = y[subset_and_labeled_indices] # for explicitely provided candidates recalculate candidate indices # that are passed to the wrapped query strategy if candidates is None or candidates.ndim == 1: new_candidates = unlabeled_indices( y=new_y, missing_label=self.missing_label_ ) else: new_X = X new_y = y qs_output = self.query_strategy.query( X=new_X, y=new_y, candidates=new_candidates, batch_size=batch_size, return_utilities=return_utilities, **query_kwargs, ) # unpack result of query strategy if needed queried_indices = qs_output utilities = None if return_utilities: queried_indices, utilities = qs_output # retransform queried indices and utilities as if no training data was # removed if self.exclude_non_subsample and ( candidates is None or candidates.ndim == 1 ): # transform to original candidate indices queried_indices = subset_and_labeled_indices[queried_indices] # transform to original utilities shape if utilities is not None: new_utilities = np.full( shape=(batch_size, len(X)), fill_value=np.nan ) transformed_new_candidates = subset_and_labeled_indices[ new_candidates ] new_utilities[:, transformed_new_candidates] = utilities[ :, new_candidates ] utilities = new_utilities new_candidates = transformed_new_candidates # transform indices if candidates was provided in the shape of # (n_candidates, n_features) if candidates is not None and candidates.ndim > 1: new_queried_indices = new_candidate_indices[queried_indices] else: new_queried_indices = queried_indices # transform utilities from subsampled shape to original utilities shape if return_utilities: if candidates is None or candidates.ndim == 1: new_utilities = np.full( shape=(batch_size, len(X)), fill_value=np.nan ) new_utilities[:, candidate_indices] = -np.inf new_utilities[:, new_candidates] = utilities[:, new_candidates] else: new_utilities = np.full( shape=(batch_size, len(candidates)), fill_value=np.nan ) new_utilities[:, candidate_indices] = -np.inf new_utilities[:, new_candidate_indices] = utilities if return_utilities: return new_queried_indices, new_utilities else: return new_queried_indices
[docs]class ParallelUtilityEstimationWrapper(SingleAnnotatorPoolQueryStrategy): """Parallel Utility Estimation Wrapper This class implements a wrapper for single-annotator pool-based strategies such that utilities for candidates can be calculated in parallel. The main assumption for this is that the utility computations are independent from another. Therefore, only `batch_size=1` is supported. Parameters ---------- query_strategy : skactiveml.base.SingleAnnotatorPoolQueryStrategy The strategy used for computing the utilities of the candidates. n_jobs : int, default=-1 Determines the number of maximum number of parallel utility computations. If `n_jobs` is set to -1 (default), the number of parallel computations is set to the number of available CPU cores are. For further details refer to `n_jobs` in `joblib.Parallel`. parallel_dict : dict-like, default=None Further arguments that will be passed to `joblib.Parallel`. Note that, `n_jobs` should not be set in `parallel_dict`. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState, default=None The random state to use. """ def __init__( self, query_strategy=None, n_jobs=-1, parallel_dict=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( missing_label=missing_label, random_state=random_state ) self.query_strategy = query_strategy self.n_jobs = n_jobs self.parallel_dict = parallel_dict
[docs] @match_signature("query_strategy", "query") def query( self, X, y, candidates=None, batch_size=1, return_utilities=False, **query_kwargs, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples) Labels of the training data set (possibly including unlabeled ones indicated by self.MISSING_LABEL). candidates : None or array-like of shape (n_candidates), dtype=int or array-like of shape (n_candidates, n_features), (default=None) - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, *)`, the candidate samples are directly given in `candidates` (not necessarily contained in `X`). This is not supported by all query strategies. batch_size : int, default=1 The number of samples to be selected in one AL cycle. For this wrapper, only `batch_size=1` is supported. return_utilities : bool, default=False If `True`, also return the utilities based on the query strategy. **query_kwargs : dict-like Further keyword arguments are passed to the `query` method of the `query_strategy` object. Returns ------- query_indices : numpy.ndarray of shape (batch_size,) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) or \ numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) if batch_size != 1: raise ValueError("`batch_size` must be set to 1.") if not isinstance( self.query_strategy, SingleAnnotatorPoolQueryStrategy ): raise TypeError( f"`query_strategy` is of type `{type(self.query_strategy)}` " f"but must be of type `SingleAnnotatorPoolQueryStrategy`." ) X_cand, mapping = self._transform_candidates(candidates, X, y) if self.parallel_dict is None: parallel_dict = {} elif isinstance(self.parallel_dict, dict): parallel_dict = self.parallel_dict.copy() if "n_jobs" in parallel_dict.keys(): warnings.warn( f"`n_jobs` ({parallel_dict['n_jobs']}) " "is specified in `parallel_dict`. " f"This will be replaced with `n_jobs={self.n_jobs}`." ) else: raise TypeError( f"`parallel_dict` is of type `{type(self.parallel_dict)}` " f"but must be a dictionary or None." ) parallel_dict["n_jobs"] = min(self.n_jobs, len(X_cand)) parallel_pool = Parallel(**parallel_dict) def query_lambda_func(candidate): return self.query_strategy.query( X=X, y=y, candidates=np.array(candidate), batch_size=1, return_utilities=True, **query_kwargs, ) if parallel_dict["n_jobs"] < 0: chunks = np.array_split(X_cand, cpu_count()) else: chunks = np.array_split(X_cand, parallel_dict["n_jobs"]) qs_outputs = parallel_pool( delayed(query_lambda_func)(c) for c in chunks ) utilities_cand = np.concatenate( [qs_output[1][0] for qs_output in qs_outputs], axis=0 ) if mapping is None: utilities = utilities_cand else: utilities = np.full(len(X), np.nan) utilities[mapping] = utilities_cand return simple_batch( utilities, self.random_state_, batch_size=batch_size, return_utilities=return_utilities, )