Source code for skactiveml.pool._regression_tree_based_al

import warnings

import numpy as np

from copy import copy
from sklearn import clone
from sklearn.cluster import KMeans
from sklearn.metrics import (
    pairwise_distances_argmin_min,
    pairwise_distances_argmin,
    pairwise_distances,
)
from sklearn.tree import DecisionTreeRegressor

from skactiveml.base import SingleAnnotatorPoolQueryStrategy
from skactiveml.regressor import SklearnRegressor
from skactiveml.utils import (
    MISSING_LABEL,
    check_type,
    check_equal_missing_label,
    is_labeled,
    simple_batch,
    rand_argmax,
    check_scalar,
    labeled_indices,
)


[docs]class RegressionTreeBasedAL(SingleAnnotatorPoolQueryStrategy): """Regression Tree-based Active Learning (RT-AL) This class implements the query strategy Regression Tree-based Active Learning (RT-AL) [1]_, which is based on a regression tree and selects the number `n_k` of samples to be selected from each leaf `k` given a certain `batch size`. It than uses one of the three methods 'random', 'diversity', or 'representativity' to select `n_k` samples from each leaf `k`. Parameters ---------- method : str, default='random' Possible values are 'random', 'diversity', and 'representativity'. missing_label : scalar or string or np.nan or None, default=skactiveml.utils.MISSING_LABEL Value to represent a missing label. random_state : int or np.random.RandomState, default=None The random state to use. max_iter_representativity : int, default=5 Maximum number of optimisation iterations. Only used if `method='representativity'`. References ---------- .. [1] A. Jose, J. P. A. de Mendonça, E. Devijver, N. Jakse, V. Monbet, and R. Poloni. Regression Tree-based Active Learning. Data Min. Knowl. Discov., pages 420–460, 2023. """ def __init__( self, method="random", missing_label=MISSING_LABEL, random_state=None, max_iter_representativity=5, ): super().__init__( random_state=random_state, missing_label=missing_label ) self.method = method self.max_iter_representativity = max_iter_representativity
[docs] def query( self, X, y, reg, fit_reg=True, sample_weight=None, candidates=None, batch_size=1, return_utilities=False, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by self.MISSING_LABEL). reg : SkactivemlRegressor The regressor must be `sklearn.tree.DecisionTreeRegressor` to predict the data. Ensure that the number of samples in the leaf is greater than 1. For example, by setting `min_samples_leaf >= 2` or by restricting the tree's depth. fit_reg : bool, default=True Defines whether the regressor should be fitted on `X`, `y`, and `sample_weight`. sample_weight : array-like of shape (n_samples), default=None Weights of training samples in `X`. candidates : None or array-like of shape (n_candidates), dtype=int or \ array-like of shape (n_candidates, n_features), default=None - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, *)`, the candidate samples are directly given in `candidates` (not necessarily contained in `X`). This is not supported by all query strategies. batch_size : int, default=1 The number of samples to be selected in one AL cycle. Originally, this query strategy is developed for `batch_sizes > 1`. return_utilities : bool, default=False If true, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size,) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) or \ numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. """ # Validate input parameters. X, y, candidates, batch_size, return_utilities = self._validate_data( X, y, candidates, batch_size, return_utilities, reset=True ) if batch_size == 1: warnings.warn( "This query strategy was originally developed for " "`batch_sizes > 1`." ) X_cand, mapping = self._transform_candidates(candidates, X, y) labeled_idxs = labeled_indices(y, self.missing_label_) # Validate regressor type. check_type(reg, "reg", SklearnRegressor) check_type(reg.estimator, "reg.estimator", DecisionTreeRegressor) check_equal_missing_label(reg.missing_label, self.missing_label_) # Validate boolean flag check_type(fit_reg, "fit_reg", bool) # Validate method type. check_type(self.method, "method", str) # Validate max_iter_representativity type. check_scalar( self.max_iter_representativity, "max_iter_representativity", int, min_val=1, ) # Fallback to random sampling if no sample is labeled. if len(labeled_idxs) <= 1: warnings.warn("No sample is labeled. Fallback to random sampling.") if mapping is None: utilities = np.ones(len(X_cand)) else: utilities = np.full(len(X), np.nan) utilities[mapping] = np.ones(len(mapping)) return simple_batch( utilities, self.random_state_, batch_size=batch_size, return_utilities=return_utilities, method="proportional", ) # Fit the regressor. if fit_reg: if sample_weight is None: reg = clone(reg).fit(X, y) else: reg = clone(reg).fit(X, y, sample_weight) # Calculate the number of samples to be selected from each leaf k. n_k = _calc_acquisitions_per_leaf( X, y, reg, missing_label=self.missing_label_, batch_size=batch_size ) # Discretize number of leaf acquisitions. n_k_discrete = _discretize_acquisitions_per_leaf( n_k, batch_size, self.random_state_ ) # Calculate the number of candidates per leaf. leaf_indices_cand = reg.apply(X_cand) if self.method == "random": batch_utilities_cand = np.full((batch_size, len(X_cand)), -np.inf) query_indices = [] for leaf_idx in np.unique(leaf_indices_cand): for _ in range(n_k_discrete[leaf_idx]): batch_utilities_cand[ len(query_indices), leaf_indices_cand == leaf_idx ] = 1 batch_utilities_cand[len(query_indices), query_indices] = ( np.nan ) query_indices.append( rand_argmax( batch_utilities_cand[len(query_indices)], random_state=self.random_state_, ) ) elif self.method == "diversity": batch_utilities_cand = np.full((batch_size, len(X_cand)), -np.inf) query_indices = [] X_labeled = X[labeled_idxs] leaf_indices_labeled = reg.apply(X_labeled) for leaf_idx in np.unique(leaf_indices_cand): X_cand_leaf = X_cand[leaf_indices_cand == leaf_idx] X_labeled_leaf = X_labeled[leaf_indices_labeled == leaf_idx] # Calculate the L2 distance of each unlabeled sample in leaf k # to all the labeled samples using equation (6) and (7). # Compute the shortest distance from x_j to all labeled # samples using equation (7). _, d_min = pairwise_distances_argmin_min( X_cand_leaf, X_labeled_leaf, axis=1 ) for _ in range(n_k_discrete[leaf_idx]): batch_utilities_cand[ len(query_indices), leaf_indices_cand == leaf_idx ] = d_min batch_utilities_cand[len(query_indices), query_indices] = ( np.nan ) query_indices.append( rand_argmax( batch_utilities_cand[len(query_indices)], random_state=self.random_state_, ) ) elif self.method == "representativity": # Perform a k-means clustering in leaf k with n_k clusters. query_indices = np.empty(shape=batch_size, dtype=int) l_cand = np.full(len(X_cand), fill_value=-1, dtype=int) for leaf in np.argwhere(n_k_discrete != 0).flatten(): X_cand_leaf = X_cand[leaf_indices_cand == leaf] kmeans = KMeans( n_k_discrete[leaf], random_state=self.random_state_ ).fit(X_cand_leaf) l_cand[leaf_indices_cand == leaf] = kmeans.predict( X_cand_leaf ) + np.sum(n_k_discrete[0:leaf]) centroids = kmeans.cluster_centers_ query_indices[ np.sum(n_k_discrete[0:leaf]) + range(n_k_discrete[leaf]) ] = pairwise_distances_argmin(centroids, X_cand, axis=1) # Calculate R using Eq. (9) R_cand = np.zeros(len(X_cand)) for l_idx in np.unique(l_cand): C_l = X_cand[l_cand == l_idx] if l_idx != -1 and len(C_l) > 1: R_cand[l_cand == l_idx] = pairwise_distances(C_l, C_l).sum( axis=1 ) / (len(C_l) - 1) batch_utilities_cand = np.full((batch_size, len(X_cand)), -np.inf) for i in range(self.max_iter_representativity): prev_best_indices = copy(query_indices) for l_idx in range(batch_size): # Update DELTA using the current centroids. X_M = X[labeled_idxs] X_M = np.append(X_M, X_cand[query_indices[:l_idx]], axis=0) X_M = np.append( X_M, X_cand[query_indices[l_idx + 1 :]], axis=0 ) X_cand_l = X_cand[l_cand == l_idx] _, delta_l = pairwise_distances_argmin_min( X_cand_l, X_M, axis=1 ) # Equation (10) # Use Eq. (8) to find the next sample to be labeled. R_l = R_cand[l_cand == l_idx] batch_utilities_cand[l_idx, l_cand == l_idx] = ( delta_l - R_l ) query_indices[l_idx] = rand_argmax( batch_utilities_cand[l_idx], random_state=self.random_state_, )[0] if np.all(prev_best_indices == query_indices): break for l_idx in range(batch_size): batch_utilities_cand[l_idx, query_indices[:l_idx]] = np.nan else: raise ValueError( f'The given method "{self.method}" is not valid. Supported ' f'methods are "random", "diversity", and "representativity".' ) if mapping is None: batch_utilities = batch_utilities_cand else: batch_utilities = np.full((batch_size, len(X)), np.nan) batch_utilities[:, mapping] = batch_utilities_cand query_indices = mapping[query_indices] # Check whether utilities are to be returned. if return_utilities: return query_indices, batch_utilities else: return query_indices
def _discretize_acquisitions_per_leaf(n_k, batch_size, random_state): """Discretizes a given array of non-negative floats corresponding to the number of acquisitions per leaf of the regression tree. Guarantees that we acquire a minimum number (i.e., floored floats) per leaf. Parameters ---------- n_k : numpy.ndarray of shape (n_leafs,) Float acquisitions per leaf of the regression tree. batch_size : int Number of acquisitions. random_state : np.random.RandomState Random state for reproducibility. Returns ------- n_k_discrete : numpy.ndarray of shape (n_leafs,) Integer acquisitions per leaf of the regression tree. """ n_k_rest, n_k_discrete = np.modf(n_k) rest_size = n_k_rest.sum() leaf_indices = np.arange(len(n_k)) if rest_size > 0: sampled_leaf_indices = random_state.choice( leaf_indices, p=n_k_rest / rest_size, size=batch_size - np.sum(np.round(n_k_discrete)).astype(int), replace=False, ) add_leaf_indices, add_leaf_counts = np.unique( sampled_leaf_indices, return_counts=True ) n_k_discrete[add_leaf_indices] += add_leaf_counts return n_k_discrete.astype(int) def _calc_acquisitions_per_leaf(X, y, reg, missing_label, batch_size=1): """Computes the number of samples to be selected from each leaf of the regression tree. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e. including the labeled and unlabeled samples. y : array-like of shape (n_samples) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). missing_label : scalar or string or np.nan or None Value to represent a missing label. reg: SkactivemlRegressor Fitted regressor to predict the data. batch_size : int, default=1 The number of samples to be selected in one AL cycle. Returns ------- n_samples_per_leaf : numpy.ndarray of shape (n_leafs) Number of samples per leaf. """ is_lbld = is_labeled(y, missing_label=missing_label) # Compute the variance v_k on labeled samples in leaf k. leaf_labeled = reg.apply(X[is_lbld]) y_labeled = y[is_lbld] v_k = np.zeros(reg.tree_.node_count) for leaf in range(len(v_k)): y_labeled_leaf = y_labeled[leaf_labeled == leaf] if len(y_labeled_leaf) > 1: v_k[leaf] = np.var(y_labeled_leaf, ddof=1) v_k[np.isnan(v_k)] = 0 if 0 in v_k[np.unique(leaf_labeled)]: warnings.warn( "There are leaves with less than two labeled samples, " "which causes a variance of zero. To avoid this, set " "parameter `min_samples_leaf` of `reg` to >= 2." ) # Compute the probability p_k that an unlabeled sample belongs to leaf k. leaf_unlabeled = reg.apply(X[~is_lbld]) samples_per_leaf = np.bincount(leaf_unlabeled, minlength=len(v_k)) p_k = samples_per_leaf / sum(~is_lbld) # Compute the number of sample to be selected from each leaf of the # regression tree. n_k = np.sqrt(p_k * v_k) if np.sum(n_k) == 0: n_k = np.full_like(n_k, fill_value=batch_size / reg.tree_.node_count) else: n_k = batch_size * n_k / np.sum(n_k) return n_k