"""
Module implementing the core-set query strategy.
Core-set selection problem aims to find a small subset given a large labeled
dataset such that a model learned over the small subset is competitive over the
whole dataset.
"""
import numpy as np
from ..base import SingleAnnotatorPoolQueryStrategy
from ..utils import (
MISSING_LABEL,
labeled_indices,
unlabeled_indices,
rand_argmax,
)
from sklearn.utils.validation import (
check_array,
check_consistent_length,
check_random_state,
column_or_1d,
)
from sklearn.metrics import pairwise_distances_argmin_min
[docs]class CoreSet(SingleAnnotatorPoolQueryStrategy):
"""Core Set
This class implement a core-set based query strategies, i.e., the
standard greedy algorithm for the k-center problem [1].
Parameters
----------
missing_label : scalar or string or np.nan or None, default=np.nan
Value to represent a missing label
random_state : None or int or np.random.RandomState, default=None
The random state to use.
References
----------
[1] O. Sener und S. Savarese, "Active Learning for Convolutional Neural
Networks: A Core-Set Approach", ICLR, 2018.
"""
def __init__(self, missing_label=MISSING_LABEL, random_state=None):
super().__init__(
missing_label=missing_label, random_state=random_state
)
[docs] def query(
self,
X,
y,
candidates=None,
batch_size=1,
return_utilities=False,
):
"""Query the next samples to be labeled
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data set, usually complete, i.e. including the labeled and
unlabeled samples
y : array-like of shape (n_samples, )
Labels of the training data set (possibly including unlabeled ones
indicated by self.missing_label)
candidates : None or array-like of shape (n_candidates), dtype = int or
array-like of shape (n_candidates, n_features), optional (default=None)
If candidates is None, the unlabeled samples from (X,y) are
considered as candidates
If candidates is of shape (n_candidates) and of type int,
candidates is considered as a list of the indices of the samples in
(X, y).
If candidates is of shape (n_candidates, n_features), the
candidates are directly given in the input candidates (not
necessarily contained in X).
batch_size : int, optional(default=1)
The number of samples to be selects in one AL cycle.
return_utilities : bool, optional(default=False)
If True, also return the utilities based on the query strategy
Returns
----------
query_indices : np.ndarray of shape (batch_size)
The query_indices indicate for which candidate sample a label is
to queried, e.g., `query_indices[0]` indicates the first selected
sample.
If candidates in None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
utilities : np.ndarray of shape (batch_size, n_samples) or
numpy.ndarray of shape (batch_size, n_candidates)
The utilities of samples for selecting each sample of the batch.
Here, utilities means the distance between each data point and its
nearest center.
If candidates is None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
"""
X, y, candidates, batch_size, return_utilities = self._validate_data(
X, y, candidates, batch_size, return_utilities, reset=True
)
X_cand, mapping = self._transform_candidates(candidates, X, y)
if mapping is not None:
query_indices, utilities = k_greedy_center(
X,
y,
batch_size,
self.random_state_,
self.missing_label_,
mapping,
)
else:
selected_samples = labeled_indices(
y=y, missing_label=self.missing_label_
)
X_with_cand = np.concatenate((X_cand, X[selected_samples]), axis=0)
n_new_cand = X_cand.shape[0]
y_cand = np.full(shape=n_new_cand, fill_value=self.missing_label)
y_with_cand = np.concatenate(
(y_cand, y[selected_samples]), axis=None
)
mapping = np.arange(n_new_cand)
query_indices, utilities = k_greedy_center(
X_with_cand,
y_with_cand,
batch_size,
self.random_state_,
self.missing_label_,
mapping,
n_new_cand,
)
if return_utilities:
return query_indices, utilities
else:
return query_indices
[docs]def k_greedy_center(
X,
y,
batch_size=1,
random_state=None,
missing_label=MISSING_LABEL,
mapping=None,
n_new_cand=None,
):
"""
An active learning method that greedily forms a batch to minimize
the maximum distance to a cluster center among all unlabeled
datapoints.
Parameters:
----------
X : array-like of shape (n_samples, n_features)
Training data set, usually complete, i.e., including the labeled and
unlabeled samples.
y : np.ndarray of shape (n_selected_samples, )
Index of datapoints already selected.
batch_size : int, optional (default=1)
The number of samples to be selected in one AL cycle.
random_state : None or int or np.random.RandomState, default=None
Random state for candidate selection.
missing_label : scalar or string or np.nan or None, default=np.nan
Value to represent a missing label.
mapping : None or np.ndarray of shape (n_candidates,), default=None
Index array that maps `candidates` to `X` (`candidates = X[mapping]`).
n_new_cand : int or None, default=None
The number of new candidates that are additionally added to X.
Only used for the case, that in the query function with the
shape of candidates is (n_candidates, n_feature).
Return:
----------
query_indices : np.ndarray of shape (batch_size,)
The query_indices indicate for which candidate sample a label is
to queried from the candidates.
If candidates in None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
utilities : numpy.ndarray of shape (batch_size, n_samples) or
np.ndarray of shape (batch_size, n_new_cand)
The distance between each data point and its nearest center that used
for selecting the next sample.
If candidates is None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
"""
# valid the input shape whether is valid or not.
X = check_array(X, allow_nd=True)
y = check_array(
y, ensure_2d=False, force_all_finite="allow-nan", dtype=None
)
y = column_or_1d(y, warn=True)
check_consistent_length(X, y)
selected_samples = labeled_indices(y, missing_label=missing_label)
random_state_ = check_random_state(random_state)
if mapping is None:
mapping = unlabeled_indices(y, missing_label=missing_label)
else:
mapping = column_or_1d(mapping, dtype=int, warn=True)
if not isinstance(batch_size, int):
raise TypeError("batch_size must be a integer")
# initialize the utilities matrix with
if n_new_cand is None:
utilities = np.zeros(shape=(batch_size, X.shape[0]))
elif isinstance(n_new_cand, int):
if n_new_cand == len(mapping):
utilities = np.zeros(shape=(batch_size, n_new_cand))
else:
raise ValueError(
"n_new_cand must equal to the length of mapping array"
)
else:
raise TypeError("Only n_new_cand with type int is supported.")
query_indices = np.zeros(batch_size, dtype=int)
for i in range(batch_size):
if i == 0:
update_dist = _update_distances(X, selected_samples, mapping)
else:
latest_dist = utilities[i - 1]
update_dist = _update_distances(
X=X,
cluster_centers=[query_indices[i - 1]],
mapping=mapping,
latest_distance=latest_dist,
)
if n_new_cand is None:
utilities[i] = update_dist
else:
utilities[i] = update_dist[mapping]
# select index
query_indices[i] = rand_argmax(
utilities[i], random_state=random_state_
)[0]
return query_indices, utilities
def _update_distances(X, cluster_centers, mapping, latest_distance=None):
"""
Update minimum distances by given cluster centers.
Parameters:
----------
X : array-like of shape (n_samples, n_features)
Training data set, usually complete, i.e., including the labeled and
unlabeled samples.
cluster_centers : array-like of shape (n_cluster_centers)
Indices of cluster centers.
mapping : np.ndarray of shape (n_candidates, ) default None
Index array that maps `candidates` to `X` (`candidates = X[mapping]`).
latest_distance : array-like of shape (n_samples) default None
The distance between each data point and its nearest center
Using to facilitate the computation the later distances for the
coming selected sample.
Return:
---------
result-dist : np.ndarray of shape (1, n_samples)
If there aren't any cluster centers existing, the default distance
will be 0.
If there are some cluster center exist, the return will be the
distance between each data point and its nearest center after
each selected sample of the batch. In the case of cluster center the
value will be 'np.nan'.
For the case, that indices aren't in 'mapping', the corresponding value
in 'result-dist' will be also 'np.nan'.
"""
dist = np.zeros(shape=X.shape[0])
if len(cluster_centers) > 0:
cluster_center_feature = X[cluster_centers]
_, dist = pairwise_distances_argmin_min(X, cluster_center_feature)
if latest_distance is not None:
sum_dist = np.nansum(latest_distance)
latest_distance_tmp = latest_distance
if sum_dist == 0:
latest_distance_tmp = latest_distance.copy()
latest_distance_tmp[latest_distance_tmp == 0] = np.inf
l_distance = np.zeros(shape=X.shape[0])
l_distance[mapping] = latest_distance_tmp[mapping]
dist = np.minimum(l_distance, dist)
result_dist = np.full(X.shape[0], np.nan)
result_dist[mapping] = dist[mapping]
result_dist[cluster_centers] = np.nan
return result_dist