Source code for skactiveml.pool._expected_error_reduction

from copy import deepcopy

import numpy as np

from .utils import IndexClassifierWrapper
from ..base import SingleAnnotatorPoolQueryStrategy, SkactivemlClassifier
from ..utils import (
    check_type,
    is_labeled,
    simple_batch,
    check_cost_matrix,
    MISSING_LABEL,
    check_equal_missing_label,
    unlabeled_indices,
    is_unlabeled,
)


class ExpectedErrorReduction(SingleAnnotatorPoolQueryStrategy):
    """Abstract class for Expected Error Reduction (EER)

    This class implements the basic workflow of EER algorithms containing:
     - determining ever candidates x label pair and simulate its outcome
       in the classifier by simulating it,
     - determining some kind of risk for the new classifier.

    These structure has been used by [1]_, [2]_, [3]_, and [4]_.

    Parameters
    ----------
    enforce_mapping : bool
        If True, an exception is raised when no exact mapping between
        samples in `X` and samples in `candidates` can be determined.
    cost_matrix: array-like, shape (n_classes, n_classes), default=None
        Cost matrix with `cost_matrix[i,j]` defining the cost of predicting
        class `j` for a sample with the actual class `i`.
        Used for misclassification loss and ignored for log loss.
    missing_label : scalar or string or np.nan or None, default=np.nan
        Value to represent a missing label.
    random_state : int or np.random.RandomState or None, default=None
        The random state to use.

    References
    ----------
    .. [1] Roy, N., & McCallum, A. (2001). Toward optimal active learning
       through monte carlo estimation of error reduction. ICML, (pp. 441-448).
    .. [2] Joshi, A. J., Porikli, F., & Papanikolopoulos, N. P. (2012).
       Scalable active learning for multiclass image classification.
       IEEE TrPAMI, 34(11), pp. 2259-2273.
    .. [3] Margineantu, D. D. (2005). Active cost-sensitive learning.
       In IJCAI (Vol. 5, pp. 1622-1623).
    .. [4] Kapoor, Ashish, Eric Horvitz, and Sumit Basu. "Selective
       Supervision: Guiding Supervised Learning with Decision-Theoretic Active
       Learning." IJCAI. Vol. 7. 2007.
    """

    def __init__(
        self,
        enforce_mapping,
        cost_matrix=None,
        missing_label=MISSING_LABEL,
        random_state=None,
    ):
        super().__init__(
            missing_label=missing_label, random_state=random_state
        )
        self.cost_matrix = cost_matrix
        self.enforce_mapping = enforce_mapping

    def query(
        self,
        X,
        y,
        clf,
        fit_clf=True,
        ignore_partial_fit=True,
        sample_weight=None,
        candidates=None,
        sample_weight_candidates=None,
        X_eval=None,
        sample_weight_eval=None,
        batch_size=1,
        return_utilities=False,
    ):
        """Determines for which candidate samples labels are to be queried.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training data set, usually complete, i.e., including the labeled
            and unlabeled samples.
        y : array-like of shape (n_samples,)
            Labels of the training data set (possibly including unlabeled ones
            indicated by `self.missing_label`).
        clf : skactiveml.base.SkactivemlClassifier
            Model implementing the methods `fit` and `predict_proba`.
        fit_clf : bool, default=True
            Defines whether the classifier should be fitted on `X`, `y`, and
            `sample_weight`.
        ignore_partial_fit : bool, default=True
            Relevant in cases where `clf` implements `partial_fit`. If `True`,
            the `partial_fit` function is ignored and `fit` is used instead.
        sample_weight : array-like of shape (n_samples,), default=None
            Weights of training samples in `X`.
        candidates : None or array-like of shape (n_candidates), dtype=int or \
                array-like of shape (n_candidates, n_features), default=None
            - If `candidates` is `None`, the unlabeled samples from
              `(X,y)` are considered as `candidates`.
            - If `candidates` is of shape `(n_candidates,)` and of type
              `int`, `candidates` is considered as the indices of the
              samples in `(X,y)`.
            - If `candidates` is of shape `(n_candidates, *)`, the
              candidate samples are directly given in `candidates` (not
              necessarily contained in `X`).
        sample_weight_candidates : array-like of shape (n_candidates,), \
                default=None
            Weights of candidates samples in `candidates` if candidates are
            directly given (i.e., `candidates.ndim > 1`). Otherwise, weights
            for `candidates` are given in `sample_weight`.
        X_eval : array-like of shape (n_eval_samples, n_features), \
                default=None
            Unlabeled evaluation data set that is used for estimating the risk.
            Not applicable for all EER methods.
        sample_weight_eval : array-like of shape (n_eval_samples,), \
                default=None
            Weights of evaluation samples in `X_eval` if given. Used to weight
            the importance of samples when estimating the risk.
        batch_size : int, default=1
            The number of samples to be selected in one AL cycle.
        return_utilities : bool, default=False
            If `True`, also return the utilities based on the query strategy.

        Returns
        -------
        query_indices : numpy.ndarray of shape (batch_size,)
            The query indices indicate for which candidate sample a label is
            to be queried, e.g., `query_indices[0]` indicates the first
            selected sample.

            - If `candidates` is `None` or of shape
              `(n_candidates,)`, the indexing refers to the samples in
              `X`.
            - If `candidates` is of shape `(n_candidates, n_features)`,
              the indexing refers to the samples in `candidates`.
        utilities : numpy.ndarray of shape (batch_size, n_samples) or \
                numpy.ndarray of shape (batch_size, n_candidates)
            The utilities of samples after each selected sample of the batch,
            e.g., `utilities[0]` indicates the utilities used for selecting
            the first sample (with index `query_indices[0]`) of the batch.
            Utilities for labeled samples will be set to np.nan.

            - If `candidates` is `None` or of shape
              `(n_candidates,)`, the indexing refers to the samples in
              `X`.
            - If `candidates` is of shape `(n_candidates, n_features)`,
              the indexing refers to the samples in `candidates`.
        """
        (
            X,
            y,
            sample_weight,
            clf,
            candidates,
            sample_weight_candidates,
            X_eval,
            sample_weight_eval,
            batch_size,
            return_utilities,
        ) = self._validate_data(
            X,
            y,
            sample_weight,
            clf,
            candidates,
            sample_weight_candidates,
            X_eval,
            sample_weight_eval,
            batch_size,
            return_utilities,
            reset=True,
            check_X_dict=None,
        )

        _, mapping = self._transform_candidates(
            candidates, X, y, enforce_mapping=self.enforce_mapping
        )

        (
            X_full,
            y_full,
            w_full,
            w_eval,
            idx_train,
            idx_cand,
            idx_eval,
        ) = self._concatenate_samples(
            X,
            y,
            sample_weight,
            candidates,
            sample_weight_candidates,
            X_eval,
            sample_weight_eval,
        )

        # Check fit_clf
        check_type(fit_clf, "fit_clf", bool)

        # Initialize classifier that works with indices to improve readability
        id_clf = IndexClassifierWrapper(
            deepcopy(clf),
            X_full,
            y_full,
            w_full,
            set_base_clf=not fit_clf,
            ignore_partial_fit=ignore_partial_fit,
            enforce_unique_samples=True,
            use_speed_up=True,
            missing_label=self.missing_label_,
        )

        # Fit the classifier.
        id_clf = self._precompute_and_fit_clf(
            id_clf,
            X_full,
            y_full,
            idx_train,
            idx_cand,
            idx_eval,
            fit_clf=fit_clf,
        )
        # Compute class-membership probabilities of candidate samples
        probs_cand = id_clf.predict_proba(idx_cand)

        # Check cost matrix.
        classes = id_clf.classes_
        self._validate_cost_matrix(len(classes))

        # precomputating current error
        current_error = self._estimate_current_error(
            id_clf, idx_train, idx_cand, idx_eval, w_eval
        )

        # Storage for computed errors per candidate sample
        errors = np.zeros([len(idx_cand), len(classes)])

        # Iterate over candidate samples
        for i_cx, idx_cx in enumerate(idx_cand):
            # Simulate acquisition of label for each candidate sample and class
            for i_cy, cy in enumerate(classes):
                errors[i_cx, i_cy] = self._estimate_error_for_candidate(
                    id_clf,
                    [idx_cx],
                    [cy],
                    idx_train,
                    idx_cand,
                    idx_eval,
                    w_eval,
                )

        # utils are maximized, errors minimized: hence multiply by (-1)
        future_error = np.sum(probs_cand * errors, axis=1)
        utilities_cand = -1 * (future_error - current_error)

        if mapping is None:
            utilities = np.array(utilities_cand)
        else:
            utilities = np.full(len(X), np.nan)
            utilities[mapping] = utilities_cand

        return simple_batch(
            utilities,
            self.random_state_,
            batch_size=batch_size,
            return_utilities=return_utilities,
        )

    def _validate_data(
        self,
        X,
        y,
        sample_weight,
        clf,
        candidates,
        sample_weight_candidates,
        X_eval,
        sample_weight_eval,
        batch_size,
        return_utilities,
        reset=True,
        check_X_dict=None,
    ):
        # Validate input parameters.
        (
            X,
            y,
            candidates,
            batch_size,
            return_utilities,
        ) = super()._validate_data(
            X,
            y,
            candidates,
            batch_size,
            return_utilities,
            reset=reset,
            check_X_dict=check_X_dict,
        )

        # Validate classifier type.
        check_type(clf, "clf", SkactivemlClassifier)
        check_equal_missing_label(clf.missing_label, self.missing_label_)

        self._validate_init_params()

        return (
            X,
            y,
            sample_weight,
            clf,
            candidates,
            sample_weight_candidates,
            X_eval,
            sample_weight_eval,
            batch_size,
            return_utilities,
        )

    def _validate_init_params(self):
        """Function used to evaluate parameters of the `__init__` function that
        are not part of the abstract class to avoid redundancies.
        """
        pass

    def _precompute_and_fit_clf(
        self,
        id_clf,
        X_full,
        y_full,
        idx_train,
        idx_cand,
        idx_eval,
        fit_clf=True,
    ):
        if fit_clf:
            id_clf.fit(idx_train, set_base_clf=True)
        return id_clf

    def _estimate_current_error(
        self, id_clf, idx_train, idx_cand, idx_eval, w_eval
    ):
        """
        Result must be of float or of shape `(len(idx_eval))`.
        """
        return 0.0

    def _estimate_error_for_candidate(
        self, uclf, idx_cx, cy, idx_train, idx_cand, idx_eval, w_eval
    ):
        raise NotImplementedError(
            "Error estimation method must be implemented"
            "by the query strategy."
        )

    def _validate_cost_matrix(self, n_classes):
        cost_matrix = (
            1 - np.eye(n_classes)
            if self.cost_matrix is None
            else self.cost_matrix
        )
        self.cost_matrix_ = check_cost_matrix(cost_matrix, n_classes)

    def _concatenate_samples(
        self,
        X,
        y,
        sample_weight,
        candidates,
        sample_weight_candidates,
        X_eval,
        sample_weight_eval,
    ):
        # Check if candidates are samples if sample_weight_candidates is set
        if (
            candidates is None or candidates.ndim == 1
        ) and sample_weight_candidates is not None:
            raise ValueError(
                "Attribute `sample_weight_candidates` can only "
                "be set if `candidates` consists of samples."
            )

        # TODO: test sample weight_eval - length + column

        if sample_weight is not None and len(X) != len(sample_weight):
            raise ValueError(
                "If `sample_weight` is set, it must have same "
                "length as `X`."
            )

        if sample_weight_candidates is not None and len(candidates) != len(
            sample_weight_candidates
        ):
            raise ValueError(
                "If `sample_weight_candidates` is set, it must have same "
                "length as `X`."
            )

        # Concatenate samples
        X_full = X
        y_full = y
        w_full = sample_weight
        idx_train = np.arange(len(X))
        idx_unld = unlabeled_indices(y, self.missing_label_)

        if candidates is None:
            idx_cand = idx_unld
        elif candidates.ndim == 1:
            idx_cand = candidates
        else:
            X_full = np.concatenate([X_full, candidates], axis=0)
            y_full = np.concatenate(
                [y_full, np.full(len(candidates), np.nan)], axis=0
            )
            if not (w_full is None and sample_weight_candidates is None):
                if w_full is None:
                    w_full = np.ones(len(X))
                if sample_weight_candidates is None:
                    sample_weight_candidates = np.ones(len(candidates))
                w_full = np.concatenate(
                    [w_full, sample_weight_candidates], axis=0
                )
            idx_cand = np.arange(len(X), len(X_full))

        if X_eval is None:
            idx_eval = idx_train
            if sample_weight_eval is None:
                w_eval = np.ones(len(X_full))
            else:
                if len(sample_weight_eval) != len(idx_eval):
                    raise ValueError(
                        "If `sample_weight_eval` is set but "
                        "`X_eval` is None, then it should have "
                        "same size as `X`"
                    )
                w_eval = np.zeros(len(X_full))
                w_eval[idx_eval] = sample_weight_eval
        else:
            X_full = np.concatenate([X_full, X_eval], axis=0)
            y_full = np.concatenate(
                [y_full, np.full(len(X_eval), np.nan)], axis=0
            )
            idx_eval = np.arange(len(X_full) - len(X_eval), len(X_full))
            w_eval = np.ones(len(X_full))
            if sample_weight_eval is not None:
                if len(sample_weight_eval) != len(idx_eval):
                    raise ValueError(
                        "If `sample_weight_eval` and `X_eval` "
                        "are set, then `sample_weight_eval` "
                        "should have len(X_eval)"
                    )
                w_eval[idx_eval] = sample_weight_eval
            if w_full is not None:
                w_full = np.concatenate([w_full, sample_weight_eval], axis=0)

        return X_full, y_full, w_full, w_eval, idx_train, idx_cand, idx_eval

    def _risk_estimation(
        self, prob_true, prob_pred, cost_matrix, sample_weight
    ):
        if prob_true.ndim == 1 and prob_pred.ndim == 1:
            cost_est = cost_matrix[prob_true, :][
                range(len(prob_true)), prob_pred
            ]
            return np.sum(sample_weight * cost_est)
        elif prob_true.ndim == 1 and prob_pred.ndim == 2:
            cost_est = cost_matrix[prob_true, :]
            return np.sum(
                sample_weight[:, np.newaxis]
                * prob_pred
                * cost_est[np.newaxis, :]
            )
        elif prob_true.ndim == 2 and prob_pred.ndim == 1:
            cost_est = cost_matrix[:, prob_pred].T
            return np.sum(
                sample_weight[:, np.newaxis]
                * prob_true
                * cost_est[np.newaxis, :]
            )
        else:
            prob_mat = (
                prob_true[:, :, np.newaxis] @ prob_pred[:, np.newaxis, :]
            )
            return np.sum(
                sample_weight[:, np.newaxis, np.newaxis]
                * prob_mat
                * cost_matrix[np.newaxis, :, :]
            )

    def _logloss_estimation(self, prob_true, prob_pred):
        return -np.sum(prob_true * np.log(prob_pred + np.finfo(float).eps))


[docs]class MonteCarloEER(ExpectedErrorReduction): """Monte Carlo Expected Error Reduction This class implements the expected error method from [1]_ that uses a Monte-Carlo approach to estimate the error. Therefore, it implements the following two steps: - determining ever candidates x label pair and simulate its outcome in the classifier by simulating it, - determining some kind of risk for the new classifier. Parameters ---------- method : string, default='misclassification_loss' The optimization method. Possible values are 'misclassification_loss' and 'log_loss'. cost_matrix: array-like of shape (n_classes, n_classes), default=None Cost matrix with `cost_matrix[i,j]` defining the cost of predicting class `j` for a sample with the actual class `i`. Used for misclassification loss and ignored for log loss. subtract_current : bool, default=False If `True`, the current error estimate is subtracted from the simulated score. This might be helpful to define a stopping criterion. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState or None,d efault=None The random state to use. References ---------- .. [1] Roy, N., & McCallum, A. (2001). Toward optimal active learning through monte carlo estimation of error reduction. ICML, (pp. 441-448). """ def __init__( self, method="misclassification_loss", cost_matrix=None, subtract_current=False, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( enforce_mapping=False, cost_matrix=cost_matrix, missing_label=missing_label, random_state=random_state, ) self.method = method self.subtract_current = subtract_current def _validate_init_params(self): super()._validate_init_params() # Validate method. if not isinstance(self.method, str): raise TypeError( "{} is an invalid type for method. Type {} is " "expected".format(type(self.method), str) ) if self.method not in ["misclassification_loss", "log_loss"]: raise ValueError( f"Supported methods are `misclassification_loss`, or" f"`log_loss` the given one is: {self.method}" ) check_type(self.subtract_current, "subtract_current", bool) if self.method == "log_loss" and self.cost_matrix is not None: raise ValueError( "`cost_matrix` must be None if `method` is set to `log_loss`" ) def _estimate_current_error( self, id_clf, idx_train, idx_cand, idx_eval, w_eval ): if self.subtract_current: probs = id_clf.predict_proba(idx_eval) if self.method == "misclassification_loss": preds = np.argmin(np.dot(probs, self.cost_matrix_), axis=1) err = self._risk_estimation( probs, preds, self.cost_matrix_, w_eval[idx_eval] ) elif self.method == "log_loss": err = self._logloss_estimation(probs, probs) return err else: return super()._estimate_current_error( id_clf, idx_train, idx_cand, idx_eval, w_eval ) def _estimate_error_for_candidate( self, id_clf, idx_cx, cy, idx_train, idx_cand, idx_eval, w_eval ): id_clf.partial_fit(idx_cx, cy, use_base_clf=True, set_base_clf=False) probs = id_clf.predict_proba(idx_eval) if self.method == "misclassification_loss": preds = np.argmin(np.dot(probs, self.cost_matrix_), axis=1) err = self._risk_estimation( probs, preds, self.cost_matrix_, w_eval[idx_eval] ) elif self.method == "log_loss": err = self._logloss_estimation(probs, probs) return err def _precompute_and_fit_clf( self, id_clf, X_full, y_full, idx_train, idx_cand, idx_eval, fit_clf ): id_clf.precompute(idx_train, idx_cand) id_clf.precompute(idx_train, idx_eval) id_clf.precompute(idx_cand, idx_eval) id_clf = super()._precompute_and_fit_clf( id_clf, X_full, y_full, idx_train, idx_cand, idx_eval, fit_clf=fit_clf, ) return id_clf
[docs]class ValueOfInformationEER(ExpectedErrorReduction): """Value of Information (VOI) This class implements the expected error method from [1]_ that estimates the "Value of Information" (VOI). This method can be extended in a way that it also implements [2]_ and [3]_. The default parameters described in [1]_. Therefore, it implements the following two steps: - determining ever candidates x label pair and simulate its outcome in the classifier by simulating it, - determining some kind of risk for the new classifier. Parameters ---------- cost_matrix: array-like, shape (n_classes, n_classes), default=None Cost matrix with `cost_matrix[i,j]` defining the cost of predicting class `j` for a sample with the actual class `i`. Used for misclassification loss and ignored for log loss. consider_unlabeled : bool, default=True If `True`, the error is estimated on the unlabeled samples. consider_labeled : bool, default=True If `True`, the error is estimated on the labeled samples. candidate_to_labeled : bool, default=True If `True`, the candidate with the simulated label is added to the labeled set. As this label is considered to be correct, it will be evaluated under the `consider_labeled` flag then. subtract_current : bool, default=False If `True`, the current error estimate is subtracted from the simulated score. This might be helpful to define a stopping criterion as in [2]_. normalize : bool, default=False If `True` the error terms are normalized by the number of evaluation samples such that the errors represent the average error instead of the summed error. This will be done independently for the simulated and the current error. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. random_state : int or np.random.RandomState or None, default=None The random state to use. References ---------- .. [1] Kapoor, Ashish, Eric Horvitz, and Sumit Basu. "Selective Supervision: Guiding Supervised Learning with Decision-Theoretic Active Learning." IJCAI. Vol. 7. 2007. .. [2] Joshi, A. J., Porikli, F., & Papanikolopoulos, N. P. (2012). Scalable active learning for multiclass image classification. IEEE TrPAMI, 34(11), pp. 2259-2273. .. [3] Margineantu, D. D. (2005). Active cost-sensitive learning. In IJCAI (Vol. 5, pp. 1622-1623). """ def __init__( self, cost_matrix=None, consider_unlabeled=True, consider_labeled=True, candidate_to_labeled=True, subtract_current=False, normalize=False, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( enforce_mapping=True, cost_matrix=cost_matrix, missing_label=missing_label, random_state=random_state, ) self.consider_unlabeled = consider_unlabeled self.consider_labeled = consider_labeled self.candidate_to_labeled = candidate_to_labeled self.subtract_current = subtract_current self.normalize = normalize def _validate_init_params(self): super()._validate_init_params() check_type(self.consider_unlabeled, "consider_unlabeled", bool) check_type(self.consider_labeled, "consider_labeled", bool) check_type(self.candidate_to_labeled, "candidate_to_labeled", bool) check_type(self.subtract_current, "subtract_current", bool) check_type(self.normalize, "normalize", bool)
[docs] def query( self, X, y, clf, sample_weight=None, fit_clf=True, ignore_partial_fit=True, candidates=None, batch_size=1, return_utilities=False, ): """Determines for which candidate samples labels are to be queried. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set, usually complete, i.e., including the labeled and unlabeled samples. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). clf : skactiveml.base.SkactivemlClassifier Model implementing the methods `fit` and `predict_proba`. fit_clf : bool, default=True Defines whether the classifier should be fitted on `X`, `y`, and `sample_weight`. ignore_partial_fit : bool, default=True Relevant in cases where `clf` implements `partial_fit`. If True, the `partial_fit` function is ignored and `fit` is used instead. sample_weight : array-like of shape (n_samples,), default=None Weights of training samples in `X`. candidates : None or array-like of shape (n_candidates), dtype=int or \ array-like of shape (n_candidates, n_features), default=None - If `candidates` is `None`, the unlabeled samples from `(X,y)` are considered as `candidates`. - If `candidates` is of shape `(n_candidates,)` and of type `int`, `candidates` is considered as the indices of the samples in `(X,y)`. - If `candidates` is of shape `(n_candidates, *)`, the candidate samples are directly given in `candidates` (not necessarily contained in `X`). batch_size : int, default=1 The number of samples to be selected in one AL cycle. return_utilities : bool, default=False If `True`, also return the utilities based on the query strategy. Returns ------- query_indices : numpy.ndarray of shape (batch_size,) The query indices indicate for which candidate sample a label is to be queried, e.g., `query_indices[0]` indicates the first selected sample. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. utilities : numpy.ndarray of shape (batch_size, n_samples) or \ numpy.ndarray of shape (batch_size, n_candidates) The utilities of samples after each selected sample of the batch, e.g., `utilities[0]` indicates the utilities used for selecting the first sample (with index `query_indices[0]`) of the batch. Utilities for labeled samples will be set to np.nan. - If `candidates` is `None` or of shape `(n_candidates,)`, the indexing refers to the samples in `X`. - If `candidates` is of shape `(n_candidates, n_features)`, the indexing refers to the samples in `candidates`. """ # TODO check if candidates are only unlabeled ones if given return super().query( X, y, clf, sample_weight=sample_weight, fit_clf=fit_clf, ignore_partial_fit=ignore_partial_fit, candidates=candidates, sample_weight_candidates=None, X_eval=None, sample_weight_eval=None, batch_size=batch_size, return_utilities=return_utilities, )
def _estimate_error_for_candidate( self, id_clf, idx_cx, cy, idx_train, idx_cand, idx_eval, w_eval ): id_clf.partial_fit(idx_cx, cy, use_base_clf=True, set_base_clf=False) # Handle problem that if only one candidate is remaining, this should # be the one to be selected although the error cannot be estimated # as there are no samples left for estimating le = id_clf._le y_eval = id_clf.y[idx_eval] idx_labeled = idx_train[ is_labeled(y_eval, missing_label=self.missing_label_) ] y_labeled = id_clf.y[idx_labeled] idx_unlabeled = idx_train[ is_unlabeled(y_eval, missing_label=self.missing_label_) ] if self.candidate_to_labeled: idx_labeled = np.concatenate([idx_labeled, idx_cx], axis=0) y_labeled = np.concatenate([y_labeled, cy], axis=0) idx_unlabeled = np.setdiff1d( idx_unlabeled, idx_cx, assume_unique=True ) y_labeled_c_id = None if len(idx_labeled) > 0: y_labeled_c_id = le.transform(y_labeled) err = 0 norm = 0 if self.consider_labeled and len(idx_labeled) > 0: norm += len(idx_labeled) probs = id_clf.predict_proba(idx_labeled) err += self._risk_estimation( y_labeled_c_id, probs, self.cost_matrix_, w_eval[idx_labeled] ) if self.consider_unlabeled and len(idx_unlabeled) > 0: norm += len(idx_unlabeled) probs = id_clf.predict_proba(idx_unlabeled) err += self._risk_estimation( probs, probs, self.cost_matrix_, w_eval[idx_unlabeled] ) if self.normalize: if norm == 0: return 0.0 else: return err / norm else: return err def _estimate_current_error( self, id_clf, idx_train, idx_cand, idx_eval, w_eval ): # estimate current utility score if required # TODO: maybe use function for code below to reduce redundancies if self.subtract_current: le = id_clf._le y_eval = id_clf.y[idx_eval] idx_labeled = idx_train[is_labeled(y_eval)] y_labeled = id_clf.y[idx_labeled] idx_unlabeled = idx_train[is_unlabeled(y_eval)] y_labeled_c_id = None if len(idx_labeled) > 0: y_labeled_c_id = le.transform(y_labeled) err = 0 norm = 0 if self.consider_labeled and len(idx_labeled) > 0: norm += len(idx_labeled) probs = id_clf.predict_proba(idx_labeled) err += self._risk_estimation( y_labeled_c_id, probs, self.cost_matrix_, w_eval[idx_labeled], ) if self.consider_unlabeled and len(idx_unlabeled) > 0: norm += len(idx_unlabeled) probs = id_clf.predict_proba(idx_unlabeled) err += self._risk_estimation( probs, probs, self.cost_matrix_, w_eval[idx_unlabeled] ) if self.normalize: return err / norm else: return err else: return super()._estimate_current_error( id_clf, idx_train, idx_cand, idx_eval, w_eval ) def _precompute_and_fit_clf( self, id_clf, X_full, y_full, idx_train, idx_cand, idx_eval, fit_clf ): # TODO: replace the following line by more efficient code id_clf.precompute( idx_train, idx_train, fit_params="all", pred_params="all" ) # # # for cond_prob # id_clf.precompute(idx_train, idx_cand, # fit_params='labeled', pred_params='all') # # for risk estimation # if self.consider_labeled: # id_clf.precompute(idx_train, idx_eval, # fit_params='labeled', pred_params='labeled') # id_clf.precompute(idx_cand, idx_eval, # fit_params='all', pred_params='labeled') # if self.candidate_to_labeled: # # idx_train ('labeled'), idx_cand ('all') exists above # # TODO: consider only equal samples would be sufficient # id_clf.precompute(idx_cand, idx_cand, # fit_params='all', pred_params='all') # if self.consider_unlabeled: # id_clf.precompute(idx_train, idx_eval, # fit_params='labeled', pred_params='unlabeled') # id_clf.precompute(idx_cand, idx_eval, # fit_params='all', pred_params='unlabeled') id_clf = super()._precompute_and_fit_clf( id_clf, X_full, y_full, idx_train, idx_cand, idx_eval, fit_clf=fit_clf, ) return id_clf