import numpy as np
from sklearn import clone
from sklearn.metrics import pairwise_distances, pairwise
from skactiveml.base import (
SingleAnnotatorPoolQueryStrategy,
SkactivemlRegressor,
)
from skactiveml.utils import (
rand_argmax,
labeled_indices,
MISSING_LABEL,
is_labeled,
check_type,
check_scalar,
)
[docs]class GreedySamplingX(SingleAnnotatorPoolQueryStrategy):
"""Greedy Sampling on the feature space.
This class implements greedy sampling on the feature space. A query
strategy that tries to select those samples that increase the diversity of
the feature space the most.
Parameters
----------
metric : str, optional (default="euclidean")
Metric used for calculating the distances of the samples in the feature
space. It must be a valid argument for
`sklearn.metrics.pairwise_distances` argument `metric`.
metric_dict : dict, optional (default=None)
Any further parameters are passed directly to the pairwise_distances
function.
missing_label : scalar or string or np.nan or None,
(default=skactiveml.utils.MISSING_LABEL)
Value to represent a missing label.
random_state : int | np.random.RandomState, optional
Random state for candidate selection.
References
----------
[1] Wu, Dongrui, Chin-Teng Lin, and Jian Huang. Active Learning for
Regression Using Greedy Sampling, Information Sciences, pages 90--105,
2019.
"""
def __init__(
self,
metric=None,
metric_dict=None,
missing_label=MISSING_LABEL,
random_state=None,
):
super().__init__(
random_state=random_state, missing_label=missing_label
)
self.metric = metric
self.metric_dict = metric_dict
[docs] def query(
self, X, y, candidates=None, batch_size=1, return_utilities=False
):
"""Determines for which candidate samples labels are to be queried.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data set, usually complete, i.e. including the labeled and
unlabeled samples.
y : array-like of shape (n_samples)
Labels of the training data set (possibly including unlabeled ones
indicated by self.MISSING_LABEL).
candidates : None or array-like of shape (n_candidates), dtype=int or
array-like of shape (n_candidates, n_features),
optional (default=None)
If candidates is None, the unlabeled samples from (X,y) are
considered as candidates.
If candidates is of shape (n_candidates) and of type int,
candidates is considered as the indices of the samples in (X,y).
If candidates is of shape (n_candidates, n_features), the
candidates are directly given in candidates (not necessarily
contained in X).
batch_size : int, optional (default=1)
The number of samples to be selected in one AL cycle.
return_utilities : bool, optional (default=False)
If true, also return the utilities based on the query strategy.
Returns
-------
query_indices : numpy.ndarray of shape (batch_size)
The query_indices indicate for which candidate sample a label is
to queried, e.g., `query_indices[0]` indicates the first selected
sample.
If candidates is None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
utilities : numpy.ndarray of shape (batch_size, n_samples) or
numpy.ndarray of shape (batch_size, n_candidates)
The utilities of samples after each selected sample of the batch,
e.g., `utilities[0]` indicates the utilities used for selecting
the first sample (with index `query_indices[0]`) of the batch.
Utilities for labeled samples will be set to np.nan.
If candidates is None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
"""
X, y, candidates, batch_size, return_utilities = self._validate_data(
X, y, candidates, batch_size, return_utilities, reset=True
)
X_cand, mapping = self._transform_candidates(candidates, X, y)
sample_indices = np.arange(len(X), dtype=int)
selected_indices = labeled_indices(y, missing_label=self.missing_label)
if mapping is None:
X_all = np.append(X, X_cand, axis=0)
candidate_indices = len(X) + np.arange(len(X_cand), dtype=int)
else:
X_all = X
candidate_indices = mapping
query_indices_cand, utilities_cand = _greedy_sampling(
X_cand,
X_all,
sample_indices,
selected_indices,
candidate_indices,
batch_size,
random_state=self.random_state_,
method="x",
metric_x=self.metric,
metric_dict_x=self.metric_dict,
)
if mapping is not None:
utilities = np.full((batch_size, len(X)), np.nan)
utilities[:, mapping] = utilities_cand
query_indices = mapping[query_indices_cand]
else:
utilities, query_indices = utilities_cand, query_indices_cand
if return_utilities:
return query_indices, utilities
else:
return query_indices
[docs]class GreedySamplingTarget(SingleAnnotatorPoolQueryStrategy):
"""Greedy Sampling on the target space.
This class implements greedy sampling on the target space. A query strategy
that at first selects samples to maximize the diversity in the
feature space and than selects samples to maximize the diversity in the
feature and the target space (GSi), optionally only the diversity in the
target space can be maximized (GSy).
Parameters
----------
x_metric : str, optional (default=None)
Metric used for calculating the distances of the samples in the feature
space. It must be a valid argument for
`sklearn.metrics.pairwise_distances` argument `metric`.
y_metric : str, optional (default=None)
Metric used for calculating the distances of the samples in the target
space. It must be a valid argument for
`sklearn.metrics.pairwise_distances` argument `metric`.
x_metric_dict : dict, optional (default=None)
Any further parameters for computing the distances of the samples in
the feature space are passed directly to the pairwise_distances
function.
y_metric_dict : dict, optional (default=None)
Any further parameters for computing the distances of the samples in
the target space are passed directly to the pairwise_distances
function.
n_GSx_samples : int, optional (default=1)
Indicates the number of selected samples required till the query
strategy switches from GSx to the strategy specified by `method`.
method : "GSy" or "GSi", optional (default="GSi")
Specifies whether only the diversity in the target space (`GSy`) or the
diversity in the feature and the target space (`GSi`) should be
maximized, when the number of selected samples exceeds `n_GSx_samples`.
missing_label : scalar or string or np.nan or None,
(default=skactiveml.utils.MISSING_LABEL)
Value to represent a missing label.
random_state : int | np.random.RandomState, optional
Random state for candidate selection.
References
----------
[1] Wu, Dongrui, Chin-Teng Lin, and Jian Huang. Active Learning for
Regression using Greedy Sampling, Information Sciences, pages 90--105,
2019.
"""
def __init__(
self,
x_metric=None,
y_metric=None,
x_metric_dict=None,
y_metric_dict=None,
method=None,
n_GSx_samples=1,
missing_label=MISSING_LABEL,
random_state=None,
):
super().__init__(
random_state=random_state, missing_label=missing_label
)
self.method = method
self.x_metric = x_metric
self.y_metric = y_metric
self.x_metric_dict = x_metric_dict
self.y_metric_dict = y_metric_dict
self.n_GSx_samples = n_GSx_samples
[docs] def query(
self,
X,
y,
reg,
fit_reg=True,
sample_weight=None,
candidates=None,
batch_size=1,
return_utilities=False,
):
"""Determines for which candidate samples labels are to be queried.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data set, usually complete, i.e. including the labeled and
unlabeled samples.
y : array-like of shape (n_samples)
Labels of the training data set (possibly including unlabeled ones
indicated by `self.missing_label`).
reg: SkactivemlRegressor
Regressor to predict the data.
fit_reg : bool, optional (default=True)
Defines whether the regressor should be fitted on `X`, `y`, and
`sample_weight`.
sample_weight: array-like of shape (n_samples), optional (default=None)
Weights of training samples in `X`.
candidates : None or array-like of shape (n_candidates), dtype=int or
array-like of shape (n_candidates, n_features),
optional (default=None)
If candidates is None, the unlabeled samples from (X,y) are
considered as candidates.
If candidates is of shape (n_candidates) and of type int,
candidates is considered as the indices of the samples in (X,y).
If candidates is of shape (n_candidates, n_features), the
candidates are directly given in candidates (not necessarily
contained in X).
batch_size : int, optional (default=1)
The number of samples to be selected in one AL cycle.
return_utilities : bool, optional (default=False)
If true, also return the utilities based on the query strategy.
Returns
-------
query_indices : numpy.ndarray of shape (batch_size)
The query_indices indicate for which candidate sample a label is
to queried, e.g., `query_indices[0]` indicates the first selected
sample.
If candidates is None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
utilities : numpy.ndarray of shape (batch_size, n_samples) or
numpy.ndarray of shape (batch_size, n_candidates)
The utilities of samples after each selected sample of the batch,
e.g., `utilities[0]` indicates the utilities used for selecting
the first sample (with index `query_indices[0]`) of the batch.
Utilities for labeled samples will be set to np.nan.
If candidates is None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
"""
X, y, candidates, batch_size, return_utilities = self._validate_data(
X, y, candidates, batch_size, return_utilities, reset=True
)
check_type(reg, "reg", SkactivemlRegressor)
check_type(fit_reg, "fit_reg", bool)
if self.method is None:
self.method = "GSi"
check_type(self.method, "self.method", target_vals=["GSy", "GSi"])
check_scalar(self.n_GSx_samples, "self.k_0", int, min_val=0)
X_cand, mapping = self._transform_candidates(candidates, X, y)
n_labeled = np.sum(is_labeled(y, missing_label=self.missing_label_))
batch_size_x = max(0, min(self.n_GSx_samples - n_labeled, batch_size))
batch_size_y = batch_size - batch_size_x
if fit_reg:
if sample_weight is None:
reg = clone(reg).fit(X, y)
else:
reg = clone(reg).fit(X, y, sample_weight)
sample_indices = np.arange(len(X), dtype=int)
selected_indices = labeled_indices(y)
y_cand = reg.predict(X_cand)
if mapping is None:
X_all = np.append(X, X_cand, axis=0)
y_all = np.append(y, reg.predict(X_cand))
candidate_indices = len(X) + np.arange(len(X_cand), dtype=int)
else:
X_all = X
y_all = y.copy()
y_all[mapping] = y_cand
candidate_indices = mapping
query_indices = np.zeros(batch_size, dtype=int)
utilities = np.full((batch_size, len(X_cand)), np.nan)
if batch_size_x > 0:
query_indices_x, utilities_x = _greedy_sampling(
X_cand=X_cand,
y_cand=y_cand,
X=X_all,
y=y_all,
sample_indices=sample_indices,
selected_indices=selected_indices,
candidate_indices=candidate_indices,
batch_size=batch_size_x,
random_state=None,
metric_x=self.x_metric,
metric_dict_x=self.x_metric_dict,
method="x",
)
query_indices[0:batch_size_x] = query_indices_x
utilities[0:batch_size_x, :] = utilities_x
else:
query_indices_x = np.array([], dtype=int)
selected_indices = np.append(
selected_indices, candidate_indices[query_indices_x]
)
candidate_indices = np.delete(candidate_indices, query_indices_x)
is_queried = np.full(len(X_cand), False)
is_queried[query_indices_x] = True
unselected_cands = np.argwhere(~is_queried).flatten()
X_cand = np.delete(X_cand, query_indices_x, axis=0)
y_cand = np.delete(y_cand, query_indices_x)
if batch_size_y > 0:
query_indices_y, utilities_y = _greedy_sampling(
X_cand=X_cand,
y_cand=y_cand,
X=X_all,
y=y_all,
sample_indices=sample_indices,
selected_indices=selected_indices,
candidate_indices=candidate_indices,
batch_size=batch_size_y,
random_state=None,
metric_x=self.x_metric,
metric_dict_x=self.x_metric_dict,
metric_y=self.y_metric,
metric_dict_y=self.y_metric_dict,
method="xy" if self.method == "GSi" else "y",
)
query_indices[batch_size_x:] = unselected_cands[query_indices_y]
utilities[batch_size_x:][:, unselected_cands] = utilities_y
if mapping is not None:
utilities_cand, query_indices_cand = utilities, query_indices
utilities = np.full((batch_size, len(X)), np.nan)
utilities[:, mapping] = utilities_cand
query_indices = mapping[query_indices_cand]
if return_utilities:
return query_indices, utilities
else:
return query_indices
def _greedy_sampling(
X_cand,
X,
sample_indices,
selected_indices,
candidate_indices,
batch_size,
y_cand=None,
y=None,
random_state=None,
method=None,
**kwargs,
):
dist_dict = dict(
X_cand=X_cand, y_cand=y_cand, X=X, y=y, method=method, **kwargs
)
query_indices = np.zeros(batch_size, dtype=int)
utilities = np.full((batch_size, len(X_cand)), np.nan)
distances = np.full((len(X_cand), len(X)), np.nan)
if len(selected_indices) == 0:
distances[:, sample_indices] = _measure_distance(
sample_indices, **dist_dict
)
else:
distances[:, selected_indices] = _measure_distance(
selected_indices, **dist_dict
)
not_selected_candidates = np.arange(len(X_cand), dtype=int)
for i in range(batch_size):
if len(selected_indices) == 0:
dist = distances[not_selected_candidates][:, sample_indices]
util = -np.sum(dist, axis=1)
else:
dist = distances[not_selected_candidates][:, selected_indices]
util = np.min(dist, axis=1)
utilities[i, not_selected_candidates] = util
idx = rand_argmax(util, random_state=random_state)
query_indices[i] = not_selected_candidates[idx][0]
distances[:, candidate_indices[idx]] = _measure_distance(
candidate_indices[idx], **dist_dict
)
selected_indices = np.append(
selected_indices, candidate_indices[idx], axis=0
)
candidate_indices = np.delete(candidate_indices, idx, axis=0)
not_selected_candidates = np.delete(not_selected_candidates, idx)
return query_indices, utilities
def _measure_distance(
indices,
X_cand,
y_cand,
X,
y,
metric_dict_x=None,
metric_x=None,
metric_dict_y=None,
metric_y=None,
method=None,
):
metric_x = metric_x if metric_x is not None else "euclidean"
metric_y = metric_y if metric_y is not None else "euclidean"
for metric, name in zip([metric_x, metric_y], ["metric_x", "metric_y"]):
check_type(
metric,
name,
target_vals=pairwise.PAIRWISE_DISTANCE_FUNCTIONS.keys(),
)
metric_dict_x = metric_dict_x if metric_dict_x is not None else {}
metric_dict_y = metric_dict_y if metric_dict_y is not None else {}
for metric_dict, name in zip(
[metric_dict_x, metric_dict_y], ["metric_dict_x", "metric_dict_y"]
):
check_type(metric_dict, name, dict)
dist = np.ones((len(X_cand), len(indices)))
if "x" in method:
dist *= pairwise_distances(
X_cand, X[indices], metric=metric_x, **metric_dict_x
)
if "y" in method:
dist *= pairwise_distances(
y_cand.reshape(-1, 1),
y[indices].reshape(-1, 1),
metric=metric_y,
**metric_dict_y,
)
return dist