Source code for skactiveml.pool._expected_model_change_maximization

import math

import numpy as np
from sklearn import clone

from skactiveml.base import (
    SkactivemlRegressor,
    SingleAnnotatorPoolQueryStrategy,
    SkactivemlClassifier,
)
from skactiveml.utils import (
    check_type,
    simple_batch,
    check_scalar,
    MISSING_LABEL,
    check_X_y,
    check_random_state,
    _check_callable,
)


[docs]class ExpectedModelChangeMaximization(SingleAnnotatorPoolQueryStrategy): """Expected Model Change (EMC) This class implements "Expected Model Change" (EMC) [1]_, an active learning query strategy for linear regression. Parameters ---------- bootstrap_size : int, default=3 The number of bootstraps used to estimate the true model. n_train : int or float, default=0.5 The size of a bootstrap compared to the training data if of type float. Must lie in the range of (0, 1]. The total size of a bootstrap if of type int. Must be greater or equal to 1. ord : int or string, default=2 The norm to measure the gradient length. Argument will be passed to `np.linalg.norm`. feature_map : callable, default=None The feature map of the linear regressor. Takes in the feature data. Must output a np.array of dimension 2. The default value is the identity function. An example feature map is `sklearn.preprocessing.PolynomialFeatures().fit_transform`. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState or None, default=None Random state for candidate selection. References ---------- .. [1] Cai, Wenbin, Ya Zhang, and Jun Zhou. Maximizing expected model change for active learning in regression, IEEE International Conference on Data Mining, pages 51--60, 2013. """ def __init__( self, bootstrap_size=3, n_train=0.5, ord=2, feature_map=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( random_state=random_state, missing_label=missing_label ) self.bootstrap_size = bootstrap_size self.n_train = n_train self.ord = ord self.feature_map = feature_map
[docs] def query( self, X, y, reg, fit_reg=True, sample_weight=None, candidates=None, batch_size=1, return_utilities=False, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). reg : SkactivemlRegressor Regressor to predict the data. Assumes a linear regressor with respect to the parameters. fit_reg : bool, default=True Defines whether the regressor should be fitted on `X`, `y`, and `sample_weight`. sample_weight : array-like of shape (n_samples,), default=None Weights of training samples in `X`. candidates : None or array-like of shape (n_candidates), dtype=int or \ array-like of shape (n_candidates, n_features), default=None - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, *)`, `candidates` is considered as the candidate samples in `(X,y)`. batch_size : int, default=1 The number of samples to be selected in one AL cycle. return_utilities : bool, default=False If true, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `utilities` refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, *)`, `utilities` refers to the indexing in `candidates`. """ X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) check_type(reg, "reg", SkactivemlRegressor) check_type(fit_reg, "fit_reg", bool) if self.feature_map is None: self.feature_map = lambda x: x _check_callable(self.feature_map, "self.feature_map") if fit_reg: if sample_weight is None: reg = clone(reg).fit(X, y) else: reg = clone(reg).fit(X, y, sample_weight) X_cand, mapping = self._transform_candidates(candidates, X, y) learners = _bootstrap_estimators( reg, X, y, bootstrap_size=self.bootstrap_size, n_train=self.n_train, sample_weight=sample_weight, random_state=self.random_state_, ) results_learner = np.array( [learner.predict(X_cand) for learner in learners] ) pred = reg.predict(X_cand).reshape(1, -1) scalars = np.average(np.abs(results_learner - pred), axis=0) X_cand_mapped_features = self.feature_map(X_cand) norms = np.linalg.norm(X_cand_mapped_features, ord=self.ord, axis=1) utilities_cand = scalars * norms if mapping is None: utilities = utilities_cand else: utilities = np.full(len(X), np.nan) utilities[mapping] = utilities_cand return simple_batch( utilities, self.random_state_, batch_size=batch_size, return_utilities=return_utilities, )
def _bootstrap_estimators( est, X, y, bootstrap_size=5, n_train=0.5, sample_weight=None, random_state=None, ): """Train the estimator on bootstraps of `X` and `y`. Parameters ---------- est : SkactivemlClassifier or SkactivemlRegressor The estimator to be trained. X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e. including the labeled and unlabeled samples. y : array-like of shape (n_samples) Labels of the training data set. bootstrap_size : int, default=5 The number of trained bootstraps. n_train : int or float, default=0.5 The size of each bootstrap training data set. sample_weight: array-like of shape (n_samples,), default=None Weights of training samples in `X`. random_state : int or np.random.RandomState or None, default=None The random state to use. Returns ------- bootstrap_est : list of SkactivemlClassifier or list of SkactivemlRegressor The estimators trained on different bootstraps. """ check_X_y(X=X, y=y, sample_weight=sample_weight) check_scalar(bootstrap_size, "bootstrap_size", int, min_val=1) check_type(n_train, "n_train", int, float) if isinstance(n_train, int) and n_train < 1: raise ValueError( f"`n_train` has value `{type(n_train)}`, but must have a value " f"greater or equal to one, if of type `int`." ) elif isinstance(n_train, float) and n_train <= 0 or n_train > 1: raise ValueError( f"`n_train` has value `{type(n_train)}`, but must have a value " f"between zero and one, excluding zero, if of type `float`." ) if isinstance(n_train, float): n_train = math.ceil(n_train * len(X)) check_type(est, "est", SkactivemlClassifier, SkactivemlRegressor) random_state = check_random_state(random_state) bootstrap_est = [clone(est) for _ in range(bootstrap_size)] sample_indices = np.arange(len(X)) subsets_indices = [ random_state.choice(sample_indices, size=int(len(X) * n_train + 1)) for _ in range(bootstrap_size) ] for est_b, subset_indices in zip(bootstrap_est, subsets_indices): X_for_learner = X[subset_indices] y_for_learner = y[subset_indices] if sample_weight is None: est_b.fit(X_for_learner, y_for_learner) else: weight_for_learner = sample_weight[subset_indices] est_b.fit(X_for_learner, y_for_learner, weight_for_learner) return bootstrap_est