"""
Module implementing `UHerding`, a deep active learning strategy combining
uncertainty and coverage.
"""
import numpy as np
from scipy.special import softmax
from sklearn import clone
from sklearn.metrics import pairwise_distances, pairwise_kernels
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import normalize
from sklearn.utils.validation import column_or_1d
from ..base import SingleAnnotatorPoolQueryStrategy, SkactivemlClassifier
from ..utils import (
MISSING_LABEL,
check_equal_missing_label,
check_scalar,
check_type,
labeled_indices,
rand_argmax,
)
from ._uncertainty_sampling import uncertainty_scores
[docs]
class UHerding(SingleAnnotatorPoolQueryStrategy):
"""Uncertainty Herding (UHerding)
"Uncertainty Herding" (UHerding) is a query strategy [1]_ that
greedily maximizes an uncertainty-weighted coverage objective in feature
space. In addition to the greedy selection itself, the implementation
follows the parameter adaptation scheme of the paper:
- select a temperature based on calibration via train/validation splits of
the currently labeled set,
- adapt the Gaussian kernel radius to the current labeled feature space.
Parameters
----------
method : 'least_confident' or 'margin_sampling' or 'entropy', \
default='margin_sampling'
Uncertainty definition applied to temperature-scaled probabilities.
predict_proba_dict : dict or None, default=None
Optional keyword arguments forwarded to `clf.predict_proba` to request
additional outputs such as logits and embeddings.
If `predict_proba_parser is None`, optional outputs are interpreted by
the default convention `(probas, logits, embeddings)`.
Typical usage with `SkorchClassifier` is therefore::
predict_proba_dict={"extra_outputs": ["logits", "emb"]}
If logits are not returned by `predict_proba`, `decision_function`
is used as a fallback when available, e.g. for scikit-learn logistic
regression models wrapped by `SklearnClassifier`.
predict_proba_parser : callable or None, default=None
Optional parser applied to the raw return value of
`clf.predict_proba(X, **predict_proba_dict)`.
The parser must return either `(probas, logits)` or
`(probas, logits, embeddings)`. `probas` may be `None`, in which case
they are computed from `logits` via softmax. `embeddings` may be
`None`, in which case the original samples are used.
If `None`, the default convention is used:
- array output: treated as `probas`,
- tuple output: treated as `(probas, logits, embeddings)`.
temperatures : float or array-like of shape (n_temperatures,) or None, \
default=None
Candidate temperatures used during the calibration search. If a single
positive float or a length-one array is provided, that temperature is
used directly without internal calibration refits. If `None`,
`temperatures=np.logspace(-1, 1, 49)` is used.
validation_size : float or int, default=0.2
Validation size passed to the calibration train/validation split.
n_ece_bins : int, default=15
Number of bins used for the expected calibration error.
normalize_samples : bool, default=True
Flag whether to normalize feature vectors to unit length before
computing pairwise distances and kernels.
metric : str or callable, default='rbf'
Kernel used for the coverage objective.
metric_dict : dict or None, default=None
Optional keyword arguments passed to `pairwise_kernels`.
adaptive_sigma : bool, default=True
Flag whether to adapt the radius according to the minimum non-zero
labeled pairwise distance. This option requires `metric='rbf'`.
missing_label : scalar or string or np.nan or None, default=np.nan
Value to represent a missing label.
random_state : None or int or np.random.RandomState, default=None
The random state to use.
References
----------
.. [1] W. Bae, G. Oliveira, and D. J. Sutherland.
"Uncertainty Herding: One Active Learning Method for All Label
Budgets." In Int. Conf. Learn. Represent., 2025.
"""
def __init__(
self,
method="margin_sampling",
predict_proba_dict=None,
predict_proba_parser=None,
temperatures=None,
validation_size=0.2,
n_ece_bins=15,
normalize_samples=True,
metric="rbf",
metric_dict=None,
adaptive_sigma=True,
missing_label=MISSING_LABEL,
random_state=None,
):
super().__init__(
missing_label=missing_label, random_state=random_state
)
self.method = method
self.predict_proba_dict = predict_proba_dict
self.predict_proba_parser = predict_proba_parser
self.temperatures = temperatures
self.validation_size = validation_size
self.n_ece_bins = n_ece_bins
self.normalize_samples = normalize_samples
self.metric = metric
self.metric_dict = metric_dict
self.adaptive_sigma = adaptive_sigma
[docs]
def query(
self,
X,
y,
clf,
fit_clf=True,
sample_weight=None,
candidates=None,
batch_size=1,
return_utilities=False,
):
"""Determines for which candidate samples labels are to be queried.
Parameters
----------
X : array-like of shape (n_samples, ...)
Training data set, usually complete, i.e., including the labeled
and unlabeled samples.
y : array-like of shape (n_samples,)
Labels of the training data set (possibly including unlabeled ones
indicated by `self.missing_label`).
clf : skactiveml.base.SkactivemlClassifier
Classifier implementing `fit` and `predict_proba`. For
temperature-scaled uncertainty estimation, the classifier should
either provide logits via `predict_proba` extras or implement
`decision_function`. Otherwise, the non-calibrated probabilities
are used as fallback.
fit_clf : bool, default=True
Defines whether the classifier `clf` should be fitted on `X`, `y`,
and `sample_weight` before evaluating the acquisition function.
Independent of this flag, temporary cloned classifiers may still be
fitted internally to select the temperature parameter.
sample_weight : array-like of shape (n_samples,), default=None
Weights of training samples in `X`.
candidates : None or array-like of shape (n_candidates,), dtype=int \
or array-like of shape (n_candidates, ...), default=None
- If `candidates` is `None`, the unlabeled samples from `(X, y)`
are considered as candidates.
- If `candidates` is of shape `(n_candidates,)` and of type
`int`, `candidates` is considered as the indices of the samples
in `(X, y)`.
- If `candidates` is of shape `(n_candidates, ...)`, the
candidate samples are directly given in `candidates` (not
necessarily contained in `X`).
batch_size : int, default=1
The number of samples to be selected in one AL cycle.
return_utilities : bool, default=False
If `True`, also return the utilities based on the query strategy.
Returns
-------
query_indices : numpy.ndarray of shape (batch_size,)
The query indices indicate for which candidate sample a label is
to be queried, e.g., `query_indices[0]` indicates the first
selected sample.
- If `candidates` is `None` or of shape `(n_candidates,)`, the
indexing refers to the samples in `X`.
- If `candidates` is of shape `(n_candidates, ...)`, the
indexing refers to the samples in `candidates`.
utilities : numpy.ndarray of shape (batch_size, n_samples) or \
numpy.ndarray of shape (batch_size, n_candidates)
The utilities of samples after each selected sample of the batch,
e.g., `utilities[0]` indicates the utilities used for selecting
the first sample (with index `query_indices[0]`) of the batch.
Utilities for labeled samples or already selected candidates are
set to `np.nan`.
- If `candidates` is `None`, the indexing refers to the samples
in `X`.
- If `candidates` is of shape `(n_candidates,)` and of type
`int`, `utilities` refers to the samples in `X`.
- If `candidates` is of shape `(n_candidates, ...)`,
`utilities` refers to the indexing in `candidates`.
"""
# Determine candidate samples and validate parameters.
X, y, candidates, batch_size, return_utilities = self._validate_data(
X, y, candidates, batch_size, return_utilities, reset=True
)
X_cand, mapping = self._transform_candidates(candidates, X, y)
check_type(clf, "clf", SkactivemlClassifier)
check_equal_missing_label(clf.missing_label, self.missing_label_)
check_scalar(fit_clf, "fit_clf", bool)
check_scalar(self.normalize_samples, "normalize_samples", bool)
check_scalar(self.adaptive_sigma, "adaptive_sigma", bool)
check_scalar(self.n_ece_bins, "n_ece_bins", int, min_val=1)
check_type(
self.predict_proba_dict, "predict_proba_dict", (dict, type(None))
)
check_type(
self.predict_proba_parser,
"predict_proba_parser",
type(None),
indicator_funcs=[callable],
)
check_type(self.metric_dict, "metric_dict", (dict, type(None)))
metric_dict = (
{} if self.metric_dict is None else self.metric_dict.copy()
)
if self.adaptive_sigma:
if self.metric != "rbf":
raise ValueError(
"`adaptive_sigma=True` is only supported with "
"`metric='rbf'`."
)
elif "gamma" in metric_dict:
raise ValueError(
"`'gamma' cannot be part of the `metric_dict` "
"with `adaptive_sigma=True`."
)
if isinstance(self.validation_size, int):
check_scalar(
self.validation_size, "validation_size", int, min_val=1
)
else:
check_scalar(
self.validation_size,
"validation_size",
(float, np.floating),
min_inclusive=False,
max_inclusive=False,
min_val=0.0,
max_val=1.0,
)
if self.temperatures is None:
temperatures = np.logspace(-1, 1, 49)
elif np.isscalar(self.temperatures):
temperatures = float(self.temperatures)
if temperatures <= 0 or np.isnan(temperatures):
raise ValueError(
"`temperatures` must contain only positive values."
)
else:
temperatures = column_or_1d(self.temperatures, dtype=float)
if len(temperatures) == 0:
raise ValueError(
"`temperatures` must contain at least one entry."
)
if np.any(temperatures <= 0) or np.isnan(temperatures).any():
raise ValueError(
"`temperatures` must contain only positive values."
)
# Calibrate classifier by selecting a corresponding temperature.
tau = self._select_temperature(
X=X,
y=y,
clf=clf,
temperatures=temperatures,
sample_weight=sample_weight,
)
# (Re-)fit classifier on full labeled data if requested.
if fit_clf:
if sample_weight is None:
clf_eval = clone(clf).fit(X, y)
else:
clf_eval = clone(clf).fit(X, y, sample_weight)
else:
clf_eval = clf
# Infer probabilities and if available logits as well as embeddings.
probas_cand, logits_cand, X_cand_repr = self._predict_with_extras(
clf_eval, X_cand
)
if X_cand_repr is None:
X_cand_repr = X_cand
if logits_cand is not None:
probas_cand = softmax(logits_cand / tau, axis=1)
# Compute uncertainty scores by either using the original probability
# scores or the calibrated ones, if logits were available.
unc_cand = uncertainty_scores(probas=probas_cand, method=self.method)
if not np.all(np.isfinite(unc_cand)) or np.allclose(unc_cand, 0.0):
# Fall back to pure coverage if the uncertainty model carries no
# information, e.g. when only one class has been observed so far.
unc_cand = np.ones_like(unc_cand)
# Get embeddings for the labeled samples.
labeled_idx = labeled_indices(y=y, missing_label=self.missing_label_)
X_labeled_repr = None
if len(labeled_idx) > 0:
_, _, X_labeled_repr = self._predict_with_extras(
clf_eval, X[labeled_idx]
)
if X_labeled_repr is None:
X_labeled_repr = X[labeled_idx]
# Normalize candidate and labeled samples to unit length.
if self.normalize_samples:
X_cand_repr = normalize(X_cand_repr, copy=True)
if X_labeled_repr is not None:
X_labeled_repr = normalize(X_labeled_repr, copy=True)
# Compute kernel similarities, where the bandwidth is automatically
# tuned if an RBF kernel is employed.
metric_dict = self._resolve_metric_dict(
X_cand_repr=X_cand_repr,
X_labeled_repr=X_labeled_repr,
metric_dict=metric_dict,
)
K_cand = pairwise_kernels(
X_cand_repr, metric=self.metric, **metric_dict
)
if X_labeled_repr is not None and len(X_labeled_repr) > 0:
K_cand_labeled = pairwise_kernels(
X_cand_repr, X_labeled_repr, metric=self.metric, **metric_dict
)
k_max = K_cand_labeled.max(axis=1)
else:
k_max = np.zeros(len(X_cand_repr), dtype=float)
# Perform sequential batch selection.
query_indices_cand = np.empty(batch_size, dtype=int)
utilities_cand = np.empty((batch_size, len(X_cand_repr)), dtype=float)
for b in range(batch_size):
gains = np.maximum(K_cand - k_max[:, None], 0.0)
utilities_cand[b] = np.mean(unc_cand[:, None] * gains, axis=0)
utilities_cand[b][query_indices_cand[:b]] = np.nan
query_indices_cand[b] = rand_argmax(
utilities_cand[b], random_state=self.random_state_
)[0]
k_max = np.maximum(k_max, K_cand[:, query_indices_cand[b]])
# Map queried indices and utilities back to the expected output.
if mapping is None:
query_indices = query_indices_cand
utilities = utilities_cand
else:
query_indices = mapping[query_indices_cand]
utilities = np.full((batch_size, len(X)), np.nan)
utilities[:, mapping] = utilities_cand
if return_utilities:
return query_indices, utilities
return query_indices
def _select_temperature(self, X, y, clf, temperatures, sample_weight=None):
# Fallback if there is only one temperature candidate.
if np.isscalar(temperatures):
return float(temperatures)
if len(temperatures) == 1:
return float(temperatures[0])
# Try to perform train-test split. If it not possilbe, return 1.0 as
# temperature.
labeled_idx = labeled_indices(y=y, missing_label=self.missing_label_)
if len(labeled_idx) < 2:
return 1.0
y_labeled = y[labeled_idx]
split_kwargs = {
"test_size": self.validation_size,
"random_state": self.random_state_,
"shuffle": True,
}
if len(np.unique(y_labeled)) > 1:
split_kwargs["stratify"] = y_labeled
try:
train_idx, val_idx = train_test_split(labeled_idx, **split_kwargs)
except ValueError:
split_kwargs.pop("stratify", None)
try:
train_idx, val_idx = train_test_split(
labeled_idx, **split_kwargs
)
except ValueError:
return 1.0
if len(train_idx) == 0 or len(val_idx) == 0:
return 1.0
X_train = X[train_idx]
y_train = y[train_idx]
X_val = X[val_idx]
y_val = y[val_idx]
sw_train = None if sample_weight is None else sample_weight[train_idx]
try:
if sw_train is None:
clf_cal = clone(clf).fit(X_train, y_train)
else:
clf_cal = clone(clf).fit(X_train, y_train, sw_train)
except Exception:
return 1.0
_, logits_val, _ = self._predict_with_extras(clf_cal, X_val)
if logits_val is None:
return 1.0
# Select temperature by iterating over all candidates and selecting
# the one with the lowest expected calibration error.
best_tau = float(temperatures[0])
best_ece = np.inf
for tau in temperatures:
probas = softmax(logits_val / tau, axis=1)
ece = self._expected_calibration_error(
probas=probas, y_true=y_val, classes=clf_cal.classes_
)
if ece < best_ece:
best_tau = float(tau)
best_ece = ece
return best_tau
def _resolve_metric_dict(self, X_cand_repr, X_labeled_repr, metric_dict):
"""
Computes adaptive sigma if required.
"""
# Keep the metric paramters unchanged if no adaptive sigma is requried.
metric_dict = metric_dict.copy()
if not self.adaptive_sigma:
return metric_dict
if X_labeled_repr is not None:
# If there are labeled samples compute minimum distance as sigma.
distances = self._nonzero_distances(X_labeled_repr)
sigma = np.min(distances)
else:
# If there are labeled samples compute median distance between
# candidate samples as sigma.
distances = self._nonzero_distances(X_cand_repr)
sigma = np.median(distances)
if sigma is None or sigma <= 0 or np.isnan(sigma):
# Fallback if no valid sigma could be computed.
sigma = 1.0
# Transform sigma to the gamma parameter expected by the RBF kernel
# implementation in sklearn.
metric_dict["gamma"] = 1.0 / (sigma**2)
return metric_dict
def _predict_with_extras(self, clf, X):
"""
Helper function to streamline required predictions.
"""
predict_proba_dict = (
{}
if self.predict_proba_dict is None
else self.predict_proba_dict.copy()
)
out = clf.predict_proba(X, **predict_proba_dict)
probas, logits, emb = self._parse_predict_output(out)
if logits is None:
logits = self._decision_function_logits(clf, X)
if probas is None and logits is not None:
probas = softmax(logits, axis=1)
return probas, logits, emb
def _parse_predict_output(self, out):
"""
Helper function to streamline required predictions according to
user information.
"""
if self.predict_proba_parser is not None:
parsed = self.predict_proba_parser(out)
if not isinstance(parsed, (tuple, list)):
raise TypeError(
"`predict_proba_parser` must return a tuple or list."
)
if len(parsed) == 2:
probas, logits = parsed
emb = None
elif len(parsed) == 3:
probas, logits, emb = parsed
else:
raise ValueError(
"`predict_proba_parser` must return "
"`(probas, logits)` or `(probas, logits, embeddings)`."
)
return probas, logits, emb
if not isinstance(out, tuple):
return out, None, None
if len(out) == 0:
raise ValueError("`predict_proba` returned an empty tuple.")
if len(out) > 3:
raise ValueError(
"`predict_proba` returned more than three outputs. Pass "
"`predict_proba_parser` to disambiguate them."
)
probas = out[0]
logits = out[1] if len(out) >= 2 else None
emb = out[2] if len(out) >= 3 else None
return probas, logits, emb
def _expected_calibration_error(self, probas, y_true, classes):
"""
Computes expected calibration error for determining the temperature.
"""
confidences = np.max(probas, axis=1)
pred_labels = classes[np.argmax(probas, axis=1)]
accuracies = pred_labels == y_true
bins = np.linspace(0.0, 1.0, self.n_ece_bins + 1)
ece = 0.0
for left, right in zip(bins[:-1], bins[1:]):
if right == 1.0:
mask = (confidences >= left) & (confidences <= right)
else:
mask = (confidences >= left) & (confidences < right)
if not np.any(mask):
continue
bin_weight = np.mean(mask)
bin_acc = np.mean(accuracies[mask])
bin_conf = np.mean(confidences[mask])
ece += bin_weight * np.abs(bin_acc - bin_conf)
return ece
@staticmethod
def _decision_function_logits(clf, X):
"""
Helper function to compute logits from the decision function as a
common method in sklearn.
"""
if not hasattr(clf, "decision_function"):
return None
try:
logits = clf.decision_function(X)
except Exception:
return None
logits = np.asarray(logits)
if logits.ndim == 1:
logits = np.column_stack([np.zeros_like(logits), logits])
return logits
@staticmethod
def _nonzero_distances(X):
"""
Helper function for computing non-zero distances.
"""
if X is None or len(X) < 2:
return None
distances = pairwise_distances(X)
distances = distances[np.triu_indices_from(distances, k=1)]
distances = distances[distances > 0]
return distances