Source code for skactiveml.pool.multiannotator._wrapper

from inspect import signature, Parameter

import numpy as np
from scipy.stats import rankdata
from sklearn.utils.validation import check_array, _is_arraylike

from ...base import (
    MultiAnnotatorPoolQueryStrategy,
    SingleAnnotatorPoolQueryStrategy,
)
from ...utils import (
    rand_argmax,
    check_type,
    MISSING_LABEL,
    majority_vote,
    check_random_state,
    check_scalar,
)


[docs]class SingleAnnotatorWrapper(MultiAnnotatorPoolQueryStrategy): """SingleAnnotatorWrapper Implementation of a wrapper class for pool-based active learning query strategies with a single annotator such that it transforms the query strategy for the single annotator into a query strategy for multiple annotators by choosing an annotator randomly or according to the parameter `A_pef` and setting the labeled matrix to a labeled vector by an aggregation function, e.g., majority voting. Parameters ---------- strategy : SingleAnnotatorPoolQueryStrategy An active learning strategy for a single annotator. y_aggregate : callable, optional (default=None) `y_aggregate` is used to transform `y` as a matrix of shape (n_samples, n_annotators) into a vector of shape (n_samples) during the querying process and is then passed to the given `strategy`. If `y_aggregate is None` and `y` is used in the strategy, majority_vote is used as `y_aggregate`. missing_label : scalar or string or np.nan or None, optional (default=np.nan) Value to represent a missing label. random_state : int or RandomState instance, optional (default=None) Controls the randomness of the estimator. """ def __init__( self, strategy, y_aggregate=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( random_state=random_state, missing_label=missing_label ) self.strategy = strategy self.y_aggregate = y_aggregate
[docs] def query( self, X, y, candidates=None, annotators=None, batch_size=1, n_annotators_per_sample=1, A_perf=None, return_utilities=False, **query_kwargs, ): """Determines which candidate sample is to be annotated by which annotator. The samples are first and primarily ranked by the given strategy as if one unspecified annotator where to annotate the sample. Then for each sample the sample-annotator pairs are ranked based either on previously set preferences or at random. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples, n_annotators) Labels of the training data set for each annotator (possibly including unlabeled ones indicated by self.MISSING_LABEL), meaning that `y[i, j]` contains the label annotated by annotator `i` for sample `j`. candidates : None or array-like of shape (n_candidates), dtype=int or array-like of shape (n_candidates, n_features), optional (default=None) See annotators. annotators : None or array-like of shape (n_avl_annotators), dtype=int or array-like of shape (n_candidates, n_annotators), optional (default=None) If candidate samples and annotators are not specified, i.e., `candidates=None`, `annotators=None` the unlabeled target values, `y`, are the candidates annotator-sample-pairs. If candidate samples and available annotators are specified: The annotator-sample-pairs, for which the sample is a candidate sample and the annotator is an available annotator are considered as candidate annotator-sample-pairs. If `candidates` is None, all samples of `X` are considered as candidate samples. In this case `n_candidates` equals `len(X)`. If `candidates` is of shape `(n_candidates,)` and of type int, `candidates` is considered as the indices of the sample candidates in `(X, y)`. If `candidates` is of shape `(n_candidates, n_features)`, the sample candidates are directly given in `candidates` (not necessarily contained in `X`). This is not supported by all query strategies. If `annotators` is `None`, all annotators are considered as available annotators. If `annotators` is of shape `(n_avl_annotators)`, and of type int, `annotators` is considered as the indices of the available annotators. If `annotators` is a boolean array of shape `(n_candidates, n_annotators)` the annotator-sample-pairs, for which the sample is a candidate sample and the boolean matrix has entry `True` are considered as candidate annotator-sample-pairs. batch_size : int, optional (default=1) The number of annotators sample pairs to be selected in one AL cycle. A_perf : array-like, shape (n_annotators,) or (n_candidates, n_annotators), optional (default=None) The performance based ranking of each annotator. 1.) If `A_perf` is of shape (n_candidates, n_annotators) for each sample `i` the value-annotators pair `(i, j)` is chosen over the pair `(i, k)` if `A_perf[i, j]` is greater or equal to `A_perf[i, k]`. 2.) If `A_perf` is of shape (n_annotators,) for each sample `i` the value-annotators pair `(i, j)` is chosen over the pair `(i, k)` if `A_perf[j]` is greater or equal to `A_perf[k]`. 3.) If `A_perf` is None, the annotators are chosen at random, with a different distribution for each sample. return_utilities : bool, optional (default=False) If true, also returns the utilities based on the query strategy. n_annotators_per_sample : int, array-like, optional (default=1) array-like of shape (k,), k <= n_samples If `n_annotators_per_sample` is an int, the value indicates the number of annotators that are preferably assigned to a candidate sample, selected by the query_strategy. `Preferably` in this case means depending on how many annotators can be assigned to a given candidate sample and how many annotator-sample-pairs should be assigned considering the `batch_size`. If `n_annotators_per_sample` is an int array, the values of the array are interpreted as follows. The value at the i-th index determines the preferred number of annotators for the candidate sample at the i-th index in the ranking of the batch. The ranking of the batch is given by the `strategy` (SingleAnnotatorPoolQueryStrategy). The last index of the n_annotators_per_sample array (k-1) indicates the preferred number of annotators for all candidate sample at an index greater of equal to k-1. query_kwargs : dict, optional Dictionary for the parameters of the query method besides `X` and the transformed `y`. Returns ------- query_indices : np.ndarray of shape (batchsize, 2) The query_indices indicate which candidate sample pairs are to be queried is, i.e., which candidate sample is to be annotated by which annotator, e.g., `query_indices[:, 0]` indicates the selected candidate samples and `query_indices[:, 1]` indicates the respectively selected annotators. utilities: np.ndarray of shape (batch_size, n_samples, n_annotators) or np.ndarray of shape (batch_size, n_candidates, n_annotators) The utilities of all candidate samples w.r.t. to the available annotators after each selected sample of the batch, e.g., `utilities[0, :, j]` indicates the utilities used for selecting the first sample-annotator-pair (with indices `query_indices[0]`). If `candidates` is None or of shape (n_candidates,), the indexing refers to samples in `X`. If `candidates` is of shape (n_candidates, n_features), the indexing refers to samples in candidates. """ ( X, y, candidates, annotators, batch_size, return_utilities, ) = super()._validate_data( X, y, candidates, annotators, batch_size, return_utilities, reset=True, ) X_cand, mapping, A_cand = self._transform_cand_annot( candidates, annotators, X, y ) random_state = self.random_state_ # check strategy check_type( self.strategy, "self.strategy", SingleAnnotatorPoolQueryStrategy ) if self.strategy.missing_label != self.missing_label and not ( np.isnan(self.strategy.missing_label) & np.isnan(self.missing_label) ): raise ValueError( f"`self.missing_label` must equal " f"`self.strategy.missing_label`, but " f"`self.missing_label` equals {self.missing_label} and" f"`self.strategy.missing_label` equals " f"{self.strategy.missing_label}." ) # aggregate y if self.y_aggregate is None: def y_aggregate(y): return majority_vote(y, random_state=random_state) else: y_aggregate = self.y_aggregate if not callable(y_aggregate): raise TypeError( f"`self.y_aggregate` must be callable. " f"`self.y_aggregate` is of type {type(y_aggregate)}" ) # count the number of arguments that have no default value n_free_params = len( list( filter( lambda x: x.default == Parameter.empty, signature(y_aggregate).parameters.values(), ) ) ) if n_free_params != 1: raise TypeError( f"The number of free parameters of the callable has to " f"equal one. " f"The number of free parameters is {n_free_params}." ) y_sq = y_aggregate(y) n_selectable_candidates = len(X_cand) n_candidates = len(candidates) if candidates is not None else len(X) n_annotators = y.shape[1] n_samples = X.shape[0] batch_size_sq = min(batch_size, X_cand.shape[0]) # check n_annotators_per_sample and set pref_n_annotators if isinstance(n_annotators_per_sample, (int, np.int_)): check_scalar( n_annotators_per_sample, name="n_annotators_per_sample", target_type=int, min_val=1, ) pref_n_annotators = n_annotators_per_sample * np.ones( batch_size_sq ) elif _is_arraylike(n_annotators_per_sample): pref_n_annotators = check_array( n_annotators_per_sample, ensure_2d=False ) if pref_n_annotators.ndim != 1: raise ValueError( "n_annotators_per_sample, if an array, must be of dim " f"1 but, it is of dim {pref_n_annotators.ndim}" ) else: pref_length = pref_n_annotators.shape[0] if pref_length > batch_size_sq: pref_n_annotators = pref_n_annotators[:batch_size_sq] if pref_length < batch_size_sq: appended = pref_n_annotators[-1] * np.ones( batch_size_sq - pref_length ) pref_n_annotators = np.append(pref_n_annotators, appended) else: raise TypeError( "n_annotators_per_sample must be array like " "or an integer" ) # check A_perf and set annotator_utilities if A_perf is None: annotator_utilities = random_state.rand( 1, n_selectable_candidates, n_annotators ).repeat(batch_size_sq, axis=0) elif _is_arraylike(A_perf): A_perf = check_array(A_perf, ensure_2d=False) # ensure A_perf lies in [0, 1) if A_perf.min() != A_perf.max(): A_perf = ( 1 / (A_perf.max() - A_perf.min() + 1) * (A_perf - A_perf.min()) ) else: A_perf = np.zeros_like(A_perf, dtype=float) if A_perf.shape == (n_candidates, n_annotators): annotator_utilities = A_perf[np.newaxis, :, :].repeat( batch_size_sq, axis=0 ) if candidates is None: annotator_utilities = annotator_utilities[:, mapping, :] elif A_perf.shape == (n_annotators,): annotator_utilities = ( A_perf[np.newaxis, np.newaxis, :] .repeat(n_selectable_candidates, axis=1) .repeat(batch_size_sq, axis=0) ) else: raise ValueError( f"`A_perf` is of shape {A_perf.shape}, but must be of " f"shape ({n_selectable_candidates}, {n_annotators}) or of " f"shape ({n_annotators},)." ) else: raise TypeError( f"`A_perf` is of type {type(A_perf)}, but must be array like " f"or of type None." ) candidates_sq = mapping if mapping is not None else X_cand qs_indices, w_utilities = self.strategy.query( X=X, y=y_sq, candidates=candidates_sq, **query_kwargs, batch_size=batch_size_sq, return_utilities=True, ) if mapping is None: sample_utilities = w_utilities sample_indices = qs_indices else: sample_utilities = w_utilities[:, mapping] sample_indices = np.array( [np.argwhere(mapping == i)[0, 0] for i in qs_indices] ) re_val = self._query_annotators( A_cand, batch_size, sample_utilities, annotator_utilities, return_utilities, pref_n_annotators, sample_indices, ) if mapping is None: return re_val elif return_utilities: w_indices, w_utilities = re_val utilities = np.full((batch_size, n_samples, n_annotators), np.nan) utilities[:, mapping, :] = w_utilities indices = np.zeros_like(w_indices) indices[:, 0] = mapping[w_indices[:, 0]] indices[:, 1] = w_indices[:, 1] return indices, utilities else: w_indices = re_val indices = np.zeros_like(w_indices) indices[:, 0] = mapping[w_indices[:, 0]] indices[:, 1] = w_indices[:, 1] return indices
def _query_annotators( self, A_cand, batch_size, sample_utilities, annotator_utilities, return_utilities, pref_n_annotators, qs_indices, ): random_state = check_random_state(self.random_state) n_annotators = A_cand.shape[1] n_samples = A_cand.shape[0] s_indices, s_utilities = self._get_order_preserving_s_query( A_cand, sample_utilities, annotator_utilities, qs_indices ) n_as_annotators = self._n_to_assign_annotators( batch_size, A_cand, s_indices, pref_n_annotators ) utilities = np.zeros((batch_size, n_samples, n_annotators)) query_indices = np.zeros((batch_size, 2), dtype=int) annotator_ps = 0 # current annotators per sample sample_index = 0 # sample batch index for batch_index in range(batch_size): # actual batch index utilities[batch_index] = s_utilities[sample_index] query_indices[batch_index] = rand_argmax( utilities[batch_index], random_state=random_state ) s_utilities[ :, query_indices[batch_index, 0], query_indices[batch_index, 1] ] = np.nan annotator_ps += 1 if annotator_ps >= n_as_annotators[sample_index]: sample_index += 1 annotator_ps = 0 if return_utilities: return query_indices, utilities else: return query_indices @staticmethod def _get_order_preserving_s_query( A, candidate_utilities, annotator_utilities, sample_indices ): nan_indices = np.argwhere(np.isnan(candidate_utilities)) candidate_utilities[nan_indices[:, 0], nan_indices[:, 1]] = -np.inf # force selected sample indices to have the maximum utility for i in range(len(sample_indices)): max_utility_i = np.nanmax(candidate_utilities[i]) + 1 candidate_utilities[i, sample_indices[i]] = max_utility_i # prepare candidate_utilities candidate_utilities = rankdata( candidate_utilities, method="ordinal", axis=1 ).astype(float) candidate_utilities[nan_indices[:, 0], nan_indices[:, 1]] = np.nan annotator_utilities[:, ~A] = np.nan # combine utilities by addition utilities = candidate_utilities[:, :, np.newaxis] + annotator_utilities return sample_indices, utilities @staticmethod def _n_to_assign_annotators(batch_size, A, s_indices, pref_n_annotators): n_max_annotators = np.sum(A, axis=1) n_max_chosen_annotators = n_max_annotators[s_indices] annot_per_sample = np.minimum( n_max_chosen_annotators, pref_n_annotators ) n_annotator_sample_pairs = np.sum(annot_per_sample) while n_annotator_sample_pairs < batch_size: annot_per_sample = np.minimum( n_max_chosen_annotators, annot_per_sample + 1 ) n_annotator_sample_pairs = np.sum(annot_per_sample) if n_annotator_sample_pairs >= batch_size: break return annot_per_sample