import numpy as np
from scipy.stats import t, rankdata
from sklearn.base import BaseEstimator, clone
from sklearn.utils.validation import check_array, check_is_fitted
from ...base import (
MultiAnnotatorPoolQueryStrategy,
SkactivemlClassifier,
AnnotatorModelMixin,
)
from ...pool._uncertainty_sampling import uncertainty_scores
from ...utils import (
check_scalar,
MISSING_LABEL,
is_labeled,
check_type,
simple_batch,
majority_vote,
)
[docs]class IntervalEstimationAnnotModel(BaseEstimator, AnnotatorModelMixin):
"""IntervalEstimationAnnotModel
This annotator model relies on 'Interval Estimation Learning' (IELearning)
for estimating the annotation performances, i.e., labeling accuracies,
of multiple annotators [1]. Therefore, it computes the mean accuracy and
the lower as well as the upper bound of the labeling accuracy per
annotator. (Weighted) majority vote is used as estimated ground truth.
Parameters
----------
classes : array-like, shape (n_classes), optional (default=None)
Holds the label for each class.
missing_label : scalar or string or np.nan or None, optional
(default=np.nan)
Value to represent a missing label.
alpha : float, interval=(0, 1), optional (default=0.05)
Half of the confidence level for student's t-distribution.
mode : 'lower' or 'mean' or 'upper', optional (default='upper')
Mode of the estimated annotation performance.
random_state : None|int|numpy.random.RandomState, optional (default=None)
The random state used for deciding on majority vote labels in case of
ties.
Attributes
----------
n_annotators_: int
Number of annotators.
A_perf_ : ndarray, shape (n_annotators, 3)
Estimated annotation performances (i.e., labeling accuracies), where
`A_cand[i, 0]` indicates the lower bound, `A_cand[i, 1]` indicates the
mean, and `A_cand[i, 2]` indicates the upper bound of the estimation
labeling accuracy.
References
----------
[1] Donmez, Pinar, Jaime G. Carbonell, and Jeff Schneider.
"Efficiently learning the accuracy of labeling sources for selective
sampling." 15th ACM SIGKDD International Conference on Knowledge
Discovery and Data Mining, pp. 259-268. 2009.
"""
def __init__(
self,
classes=None,
missing_label=MISSING_LABEL,
alpha=0.05,
mode="upper",
random_state=None,
):
self.classes = classes
self.missing_label = missing_label
self.alpha = alpha
self.mode = mode
self.random_state = random_state
[docs] def fit(self, X, y, sample_weight=None):
"""Fit annotator model for given samples.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Test samples.
y : array-like, shape (n_samples, n_annotators)
Class labels of annotators.
sample_weight : array-like, shape (n_samples, n_annotators),
optional (default=None)
Sample weight for each label and annotator.
Returns
-------
self : IntervalEstimationAnnotModel object
The fitted annotator model.
"""
# Check whether alpha is float in (0, 1).
check_scalar(
x=self.alpha,
target_type=float,
name="alpha",
min_val=0,
max_val=1,
min_inclusive=False,
max_inclusive=False,
)
# Check mode.
if self.mode not in ["lower", "mean", "upper"]:
raise ValueError("`mode` must be in `['lower', 'mean', `upper`].`")
# Check shape of labels.
if y.ndim != 2:
raise ValueError(
"`y` but must be a 2d array with shape "
"`(n_samples, n_annotators)`."
)
# Compute majority vote labels.
y_mv = majority_vote(
y=y,
w=sample_weight,
classes=self.classes,
random_state=self.random_state,
missing_label=self.missing_label,
)
# Number of annotators.
self.n_annotators_ = y.shape[1]
is_lbld = is_labeled(y, missing_label=self.missing_label)
self.A_perf_ = np.zeros((self.n_annotators_, 3))
for a_idx in range(self.n_annotators_):
is_correct = np.equal(
y_mv[is_lbld[:, a_idx]], y[is_lbld[:, a_idx], a_idx]
)
is_correct = np.concatenate((is_correct, [0, 1]))
mean = np.mean(is_correct)
std = np.std(is_correct)
t_value = t.isf([self.alpha / 2], len(is_correct) - 1)[0]
t_value *= std / np.sqrt(len(is_correct))
self.A_perf_[a_idx, 0] = mean - t_value
self.A_perf_[a_idx, 1] = mean
self.A_perf_[a_idx, 2] = mean + t_value
return self
[docs] def predict_annotator_perf(self, X):
"""Calculates the probability that an annotator provides the true label
for a given sample.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Test samples.
Returns
-------
P_annot : numpy.ndarray, shape (n_samples, n_annotators)
`P_annot[i,l]` is the probability, that annotator `l` provides the
correct class label for sample `X[i]`.
"""
check_is_fitted(self)
X = check_array(X)
if self.mode == "lower":
mode = 0
elif self.mode == "mean":
mode = 1
else:
mode = 2
return np.tile(self.A_perf_[:, mode], (len(X), 1))
[docs]class IntervalEstimationThreshold(MultiAnnotatorPoolQueryStrategy):
"""IntervalEstimationThreshold
The strategy 'Interval Estimation Threshold' (IEThresh) [1] is useful for
addressing the exploration vs. exploitation trade-off when dealing with
multiple error-prone annotators in active learning. This class relies on
`IntervalEstimationAnnotModel` for estimating the annotation
performances, i.e., label accuracies, of multiple annotators. Samples are
selected based on 'Uncertainty Sampling' (US). The selected samples are
labeled by the annotators whose estimated annotation performances are equal
or greater than an adaptive threshold.
The strategy assumes all annotators to be available and is not defined
otherwise. To deal with this case nonetheless value-annotator pairs are
first ranked according to the amount of annotators available for the given
value in `candidates` and are than ranked according to
`IntervalEstimationThreshold`.
Parameters
----------
epsilon : float, interval=[0, 1], optional (default=0.9)
Parameter for specifying the adaptive threshold used for annotator
selection.
alpha : float, interval=(0, 1), optional (default=0.05)
Half of the confidence level for student's t-distribution.
random_state : None or int or numpy.random.RandomState, optional
(default=None)
The random state used for deciding on majority vote labels in case of
ties.
References
----------
[1] Donmez, Pinar, Jaime G. Carbonell, and Jeff Schneider.
"Efficiently learning the accuracy of labeling sources for selective
sampling." 15th ACM SIGKDD International Conference on Knowledge
Discovery and Data Mining, pp. 259-268. 2009.
"""
def __init__(
self,
epsilon=0.9,
alpha=0.05,
random_state=None,
missing_label=MISSING_LABEL,
):
super().__init__(
random_state=random_state, missing_label=missing_label
)
self.epsilon = epsilon
self.alpha = alpha
[docs] def query(
self,
X,
y,
clf,
fit_clf=True,
candidates=None,
annotators=None,
sample_weight=None,
batch_size="adaptive",
return_utilities=False,
):
"""Determines which candidate sample is to be annotated by which
annotator.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data set, usually complete, i.e., including the labeled
and unlabeled samples.
y : array-like of shape (n_samples, n_annotators)
Labels of the training data set for each annotator (possibly
including unlabeled ones indicated by self.MISSING_LABEL), meaning
that `y[i, j]` contains the label annotated by annotator `i` for
sample `j`.
clf : skactiveml.base.SkactivemlClassifier
Model implementing the methods `fit` and `predict_proba`.
fit_clf : bool, default=True
Defines whether the classifier should be fitted on `X`, `y`, and
`sample_weight`.
candidates : None or array-like of shape (n_candidates), dtype=int or
array-like of shape (n_candidates, n_features),
optional (default=None)
See annotators.
annotators : None or array-like of shape (n_avl_annotators), dtype=int
or array-like of shape (n_candidates, n_annotators), optional
(default=None)
If candidate samples and annotators are not specified, i.e.,
`candidates=None`, `annotators=None` the unlabeled target values,
`y`, are the candidates annotator-sample-pairs.
If candidate samples and available annotators are specified:
The annotator-sample-pairs, for which the sample is a candidate
sample and the annotator is an available annotator are considered
as candidate annotator-sample-pairs.
If `candidates` is None, all samples of `X` are considered as
candidate samples. In this case `n_candidates` equals `len(X)`.
If `candidates` is of shape `(n_candidates,)` and of type int,
`candidates` is considered as the indices of the sample candidates
in `(X, y)`.
If `candidates` is of shape `(n_candidates, n_features)`, the
sample candidates are directly given in `candidates` (not
necessarily contained in `X`). This is not supported by all query
strategies.
If `annotators` is `None`, all annotators are considered as
available annotators.
If `annotators` is of shape `(n_avl_annotators)`, and of type int,
`annotators` is considered as the indices of the available
annotators.
If `annotators` is a boolean array of shape `(n_candidates,
n_annotators)` the annotator-sample-pairs, for which the sample
is a candidate sample and the boolean matrix has entry `True` are
considered as candidate annotator-sample-pairs.
sample_weight : array-like, (n_samples, n_annotators), optional
(default=None)
It contains the weights of the training samples' class labels.
It must have the same shape as y.
batch_size : 'adaptive' or int, optional (default=1)
The number of samples to be selected in one AL cycle. If 'adaptive'
is set, the `batch_size` is determined based on the annotation
performances and the parameter `epsilon`.
return_utilities : bool, optional (default=False)
If true, also return the utilities based on the query strategy.
Returns
-------
query_indices : numpy.ndarray of shape (batch_size, 2)
The query_indices indicate which candidate sample is to be
annotated by which annotator, e.g., `query_indices[:, 0]`
indicates the selected candidate samples and `query_indices[:, 1]`
indicates the respectively selected annotators.
utilities: numpy.ndarray of shape (batch_size, n_cand_samples,
n_annotators)
The utilities of all candidate samples w.r.t. to the available
annotators after each selected sample of the batch, e.g.,
`utilities[0, :, j]` indicates the utilities used for selecting
the first sample-annotator pair (with indices `query_indices[0]`).
"""
# base check
(
X,
y,
candidates,
annotators,
_,
return_utilities,
) = super()._validate_data(
X, y, candidates, annotators, 1, return_utilities, reset=True
)
X_cand, mapping, A_cand = self._transform_cand_annot(
candidates, annotators, X, y
)
# Validate classifier type.
check_type(clf, "clf", SkactivemlClassifier)
# Check whether epsilon is float in [0, 1].
check_scalar(
x=self.epsilon,
target_type=float,
name="epsilon",
min_val=0,
max_val=1,
)
# Check whether alpha is float in (0, 1).
check_scalar(
x=self.alpha,
target_type=float,
name="alpha",
min_val=0,
max_val=1,
min_inclusive=False,
max_inclusive=False,
)
n_annotators = y.shape[1]
# Check whether unlabeled data exists
A_cand = np.repeat(
np.all(A_cand, axis=1).reshape(-1, 1), n_annotators, axis=1
)
# Fit classifier and compute uncertainties on candidate samples.
if fit_clf:
if sample_weight is None:
clf = clone(clf).fit(X, y)
else:
clf = clone(clf).fit(X, y, sample_weight)
P = clf.predict_proba(X_cand)
uncertainties = uncertainty_scores(probas=P, method="least_confident")
# Fit annotator model and compute performance estimates.
ie_model = IntervalEstimationAnnotModel(
classes=clf.classes_,
missing_label=clf.missing_label,
alpha=self.alpha,
mode="upper",
)
ie_model.fit(X=X, y=y, sample_weight=sample_weight)
A_perf = ie_model.A_perf_
# Compute utilities.
# combine the values of A_perf and uncertainties
A_perf = A_perf[:, 2] + 1
A_perf = A_perf[np.newaxis]
max_range = np.max(A_perf) + 1
uncertainties = rankdata(uncertainties, method="ordinal") * max_range
uncertainties = np.tile(uncertainties, (n_annotators, 1)).T
utilities = uncertainties + A_perf
# exclude not available annotators
utilities[~A_cand] = np.nan
# Determine actual batch size.
if isinstance(batch_size, str) and batch_size != "adaptive":
raise ValueError(
"If `batch_size` is of type `string`, "
"it must equal `'adaptive'`."
)
elif batch_size == "adaptive":
required_perf = self.epsilon * np.max(A_perf)
actl_batch_size = int(np.sum(A_perf >= required_perf))
elif isinstance(batch_size, int):
actl_batch_size = batch_size
else:
raise TypeError(
f"`batch_size` is of type `{type(batch_size)}` "
f"but must equal `'adaptive'` or be of type "
f"`int`."
)
if mapping is not None:
w_utilities = utilities
utilities = np.full((len(X), n_annotators), np.nan)
utilities[mapping, :] = w_utilities
# Perform selection based on previously computed utilities.
return simple_batch(
utilities,
self.random_state_,
batch_size=actl_batch_size,
return_utilities=return_utilities,
)