"""Utilities for selection."""
import operator
import warnings
from functools import reduce
import numpy as np
from scipy.stats import rankdata
from sklearn.utils import check_array
from ._validation import check_random_state, check_scalar, check_type
[docs]def rand_argmin(a, random_state=None, **argmin_kwargs):
"""Returns index of minimum value. In case of ties, a randomly selected
index of the minimum elements is returned.
Parameters
----------
a: array-like
Indexable data-structure of whose minimum element's index is to be
determined.
random_state: int, RandomState instance or None, optional (default=None)
Determines random number generation for shuffling the data. Pass an int
for reproducible results across multiple
function calls.
argmin_kwargs: dict-like
Keyword argument passed to numpy function argmin.
Returns
-------
index_array: ndarray of ints
Array of indices into the array. It has the same shape as a.shape with
the dimension along axis removed.
"""
random_state = check_random_state(random_state)
a = np.asarray(a)
index_array = np.argmax(
random_state.random(a.shape)
* (a == np.nanmin(a, **argmin_kwargs, keepdims=True)),
**argmin_kwargs,
)
if np.isscalar(index_array) and a.ndim > 1:
index_array = np.unravel_index(index_array, a.shape)
index_array = np.atleast_1d(index_array)
return index_array
[docs]def rand_argmax(a, random_state=None, **argmax_kwargs):
"""Returns index of maximum value. In case of ties, a randomly selected
index of the maximum elements is returned.
Parameters
----------
a: array-like
Indexable data-structure of whose maximum element's index is to be
determined.
random_state: int, RandomState instance or None, optional (default=None)
Determines random number generation for shuffling the data. Pass an int
for reproducible results across multiple function calls.
argmax_kwargs: dict-like
Keyword argument passed to numpy function argmax.
Returns
-------
index_array: ndarray of ints
Array of indices into the array. It has the same shape as a.shape with
the dimension along axis removed.
"""
random_state = check_random_state(random_state)
a = np.asarray(a)
index_array = np.argmax(
random_state.random(a.shape)
* (a == np.nanmax(a, **argmax_kwargs, keepdims=True)),
**argmax_kwargs,
)
if np.isscalar(index_array) and a.ndim > 1:
index_array = np.unravel_index(index_array, a.shape)
index_array = np.atleast_1d(index_array)
return index_array
[docs]def simple_batch(
utilities,
random_state=None,
batch_size=1,
return_utilities=False,
method="max",
):
"""Generates a batch by selecting the highest values in the 'utilities'.
If utilities is an ND-array, the returned utilities will be an
(N+1)D-array, with the shape batch_size x utilities.shape, filled the given
utilities but set the n-th highest values in the n-th row to np.nan.
Parameters
----------
utilities : np.ndarray
The utilities to be used to create the batch.
random_state : int | np.random.RandomState (default=None)
The random state to use. If `random_state is None` random
`random_state` is used.
batch_size : int, default=1
The number of samples to be selected in one AL cycle.
return_utilities : bool, default=False
If True, the utilities are returned.
method : str, default='max'
Determines how to select 'best_indices'. 'max' selects the indices with
the maximum utilities. 'proportional' randomly choose the
'best_indices' with the probabilities proportional to 'utilities'.
Returns
-------
best_indices : np.ndarray of shape (batch_size) if utilities.ndim == 1
else (batch_size, utilities.ndim)
The index of the batch instance.
batch_utilities : np.ndarray of shape (batch_size, len(utilities))
The utilities of the batch (if return_utilities=True).
"""
# validation
utilities = check_array(
utilities,
ensure_2d=False,
dtype=float,
force_all_finite="allow-nan",
allow_nd=True,
)
check_scalar(batch_size, target_type=int, name="batch_size", min_val=1)
max_batch_size = np.sum(~np.isnan(utilities), dtype=int)
if max_batch_size < batch_size:
warnings.warn(
"'batch_size={}' is larger than number of candidate samples "
"in 'utilities'. Instead, 'batch_size={}' was set.".format(
batch_size, max_batch_size
)
)
batch_size = max_batch_size
check_type(method, "method", str)
# generate batch
best_indices = np.empty((batch_size, utilities.ndim), dtype=int)
if method == "max":
batch_utilities = np.empty((batch_size,) + utilities.shape)
for i in range(batch_size):
best_indices[i] = rand_argmax(utilities, random_state=random_state)
batch_utilities[i] = utilities
utilities[tuple(best_indices[i])] = np.nan
elif method == "proportional":
random_state = check_random_state(random_state)
p = utilities / np.nansum(utilities)
p[np.isnan(p)] = 0
best_indices = random_state.choice(
len(utilities),
size=batch_size,
p=p,
replace=False,
)
batch_utilities = np.repeat([utilities], batch_size, axis=0)
for i in range(batch_size):
batch_utilities[i, best_indices[:i]] = np.nan
else:
raise ValueError(
f'"method" has to be either "max" or "proportional" '
f"but {method} was given."
)
if utilities.ndim == 1:
best_indices = best_indices.flatten()
# Check whether utilities are to be returned.
if return_utilities:
return best_indices, batch_utilities
else:
return best_indices
def combine_ranking(*iter_ranking, rank_method=None, rank_per_batch=False):
"""Combine different rankings hierarchically to one ranking assignment.
A ranking index i is ranked higher than index j iff ranking[i]>ranking[j].
For the combined ranking it will hold that the first ranking of
`iter_ranking` always determines the ranking position at an index, and only
when two ranking assignments are equal the second ranking will determine
the ranking position and so forth.
Parameters
----------
iter_ranking : iterable of array-like
The different rankings. They must share a common shape in the sense
that they have the same number of dimensions and are broadcastable by
numpy.
rank_method : string, optional (default = None)
The method by which the utilities are ranked. See `scipy.rankdata`s
argument `method` for details.
rank_per_batch : bool, optional (default = False)
Whether the first index determines the batch and is not used for
ranking.
Returns
-------
combined_ranking : np.ndarray
The combined ranking.
"""
if rank_method is None:
rank_method = "dense"
check_type(rank_method, "rank_method", str)
check_type(rank_per_batch, "rank_per_batch", bool)
iter_ranking = list(iter_ranking)
for idx, ranking in enumerate(iter_ranking):
iter_ranking[idx] = check_array(
ranking, allow_nd=True, ensure_2d=False, force_all_finite=False
).astype(float)
if idx != 0 and iter_ranking[idx - 1].ndim != ranking.ndim:
raise ValueError(
f"The number of dimensions of the `ranking` in "
f"`iter_ranking` must be the same, but "
f"`iter_ranking[{idx}].ndim == {ranking.ndim}"
f" and `iter_ranking[{idx-1}].ndim == "
f"{iter_ranking[idx - 1].ndim}`."
)
np.broadcast_shapes(*(u.shape for u in iter_ranking))
combined_ranking = iter_ranking[0]
for idx in range(1, len(iter_ranking)):
next_ranking = iter_ranking[idx]
cr_shape = combined_ranking.shape
if rank_per_batch:
rank_shape = (
cr_shape[0],
max(reduce(operator.mul, cr_shape[1:], 1), 1),
)
rank_dict = {"method": rank_method, "axis": 1}
else:
rank_shape = reduce(operator.mul, cr_shape, 1)
rank_dict = {"method": rank_method}
combined_ranking = combined_ranking.reshape(rank_shape)
# exchange nan values to make rankdata work.
nan_values = np.isnan(combined_ranking)
combined_ranking[nan_values] = -np.inf
combined_ranking = rankdata(combined_ranking, **rank_dict).astype(
float
)
combined_ranking[nan_values] = np.nan
combined_ranking = combined_ranking.reshape(cr_shape)
combined_ranking = combined_ranking + 1 / (
1 + np.exp(-next_ranking)
) # sigmoid
return combined_ranking