Source code for skactiveml.stream._stream_probabilistic_al

import numpy as np
from sklearn import clone
from sklearn.utils import check_array, check_consistent_length

from ..classifier import ParzenWindowClassifier
from .budgetmanager import BalancedIncrementalQuantileFilter
from ..base import (
    SingleAnnotatorStreamQueryStrategy,
    SkactivemlClassifier,
    BudgetManager,
)
from ..pool import cost_reduction
from ..utils import (
    check_type,
    check_scalar,
    call_func,
    check_budget_manager,
)


[docs]class StreamProbabilisticAL(SingleAnnotatorStreamQueryStrategy): """Probabilistic Active Learning in Datastreams StreamProbabilisticAL [1]_ is an extension to Multi-Class Probabilistic Active Learning [2]_ (McPAL) (see `pool.ProbabilisticAL`). It uses McPAL to assess the spatial utility. The Balanced Incremental Quantile Filter (`BalancedIncrementalQuantileFilter`), that is implemented within the default budget manager, is used to evaluate the temporal utility (see `stream.budgetmanager.BalancedIncrementalQuantileFilter`). Parameters ---------- metric : str or callable, default=None The metric must a be None or a valid kernel as defined by the function `sklearn.metrics.pairwise.pairwise_kernels`. The kernel is used to calculate the frequency of labels near the candidates and multiplied with the probabilities returned by the `clf` to get a kernel frequency estimate for each class. If `metric` is set to `None`, the `predict_freq` function of the `clf` will be used instead. If this is not defined, an Exception is raised. metric_dict : dict, default=None Any further parameters are passed directly to the kernel function. If metric_dict is None and metric is 'rbf' metric_dict is set to {'gamma': 'mean'}. prior : float, default=1.0e-3 The prior value that is passed onto ProbabilisticAL (see `pool.ProbabilisticAL`). m_max : float, default=2 The m_max value that is passed onto ProbabilisticAL (see `pool.ProbabilisticAL`). budget_manager : BudgetManager, default=None The BudgetManager which models the budgeting constraint used in the stream-based active learning setting. if set to `None`, `BalancedIncrementalQuantileFilter` will be used by default. The budget manager will be initialized based on the following conditions: - If only a `budget` is given, the default budget manager is initialized with the given budget. - If only a budget manager is given, use the budget manager. - If both are not given, the default budget manager with the default budget. - If both are given, and the budget differs from `budgetmanager.budget`, throw a warning and the budget manager is used as is. budget : float, default=None Specifies the ratio of samples which are allowed to be sampled, with `0 <= budget <= 1`. If `budget` is `None`, it is replaced with the default budget 0.1. random_state : int or RandomState instance, default=None Controls the randomness of the estimator. References ---------- .. [1] D. Kottke, G. Krempl, and M. Spiliopoulou. Probabilistic Active Learning in Datastreams. In Adv. Intell. Data Anal., pages 145–157, 2015. .. [2] D. Kottke, G. Krempl, D. Lang, J. Teschner, and M. Spiliopoulou. Multi-class Probabilistic Active Learning. In Eur. Conf. Artif. Intell., pages 586–594, 2016. """ def __init__( self, metric=None, metric_dict=None, prior=1.0e-3, m_max=2, budget_manager=None, budget=None, random_state=None, ): super().__init__(budget=budget, random_state=random_state) self.budget_manager = budget_manager self.prior = prior self.m_max = m_max self.metric = metric self.metric_dict = metric_dict
[docs] def query( self, candidates, clf, X=None, y=None, sample_weight=None, fit_clf=False, utility_weight=None, return_utilities=False, ): """Determines for which candidate samples labels are to be queried. The query startegy determines the most useful samples in candidates, which can be acquired within the budgeting constraint specified by `budget`. Please note that, this method does not change the internal state of the query strategy. To adapt the query strategy to the selected candidates, use `update(...)`. Parameters ---------- candidates : {array-like, sparse matrix} of shape\ (n_candidates, n_features) The samples which may be queried. Sparse matrices are accepted only if they are supported by the base query strategy. clf : skactiveml.base.SkactivemlClassifier Model implementing the methods `fit` and `predict_proba`. X : array-like of shape (n_samples, n_features), default=None Training data set used to fit the classifier. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). sample_weight : array-like of shape (n_samples,), default=None Weights of training samples in `X`. fit_clf : bool, default=False Defines whether the classifier should be fitted on `X`, `y`, and `sample_weight`. utility_weight : array-like of shape (n_candidate_samples), default=None Densities for each sample in `candidates`. return_utilities : bool, default=False If `True`, also return the `utilities` based on the query strategy. Returns ------- queried_indices : np.ndarray of shape (n_queried_indices,) The indices of samples in candidates whose labels are queried, with `0 <= queried_indices <= n_candidates`. utilities: np.ndarray of shape (n_candidates,), The utilities based on the query strategy. Only provided if `return_utilities` is `True`. """ ( candidates, clf, X, y, sample_weight, fit_clf, utility_weight, return_utilities, ) = self._validate_data( candidates=candidates, clf=clf, X=X, y=y, sample_weight=sample_weight, fit_clf=fit_clf, utility_weight=utility_weight, return_utilities=return_utilities, ) if self.metric is not None: if self.metric_dict is None and self.metric == "rbf": self.metric_dict = {"gamma": "mean"} pwc = ParzenWindowClassifier( metric=self.metric, metric_dict=self.metric_dict, missing_label=clf.missing_label, classes=clf.classes, ) pwc.fit(X=X, y=y, sample_weight=sample_weight) n = pwc.predict_freq(candidates).sum(axis=1, keepdims=True) pred_proba = clf.predict_proba(candidates) k_vec = n * pred_proba else: k_vec = clf.predict_freq(candidates) utilities = cost_reduction(k_vec, prior=self.prior, m_max=self.m_max) utilities *= utility_weight queried_indices = self.budget_manager_.query_by_utility(utilities) if return_utilities: return queried_indices, utilities else: return queried_indices
[docs] def update( self, candidates, queried_indices, budget_manager_param_dict=None ): """Updates the budget manager and the count for seen and queried labels. This function should be used in conjunction with the `query` function. Parameters ---------- candidates : {array-like, sparse matrix} of shape\ (n_candidates, n_features) The samples which may be queried. Sparse matrices are accepted only if they are supported by the base query strategy. queried_indices : np.ndarray of shape (n_queried_indices,) The indices of samples in candidates whose labels are queried, with `0 <= queried_indices <= n_candidates`. budget_manager_param_dict : dict, default=None Optional kwargs for `budget_manager`. Returns ------- self : SingleAnnotatorStreamQueryStrategy The query strategy returns itself, after it is updated. """ # check if a budgetmanager is set if not hasattr(self, "budget_manager_"): check_type( self.budget_manager, "budget_manager_", BudgetManager, type(None), ) self.budget_manager_ = check_budget_manager( self.budget, self.budget_manager, BalancedIncrementalQuantileFilter, ) budget_manager_param_dict = ( {} if budget_manager_param_dict is None else budget_manager_param_dict ) call_func( self.budget_manager_.update, candidates=candidates, queried_indices=queried_indices, **budget_manager_param_dict, ) return self
def _validate_data( self, candidates, clf, X, y, sample_weight, fit_clf, utility_weight, return_utilities, reset=True, **check_candidates_params, ): """Validate input data and set or check the `n_features_in_` attribute. Parameters ---------- candidates : {array-like, sparse matrix} of shape\ (n_candidates, n_features) The samples which may be queried. Sparse matrices are accepted only if they are supported by the base query strategy. clf : skactiveml.base.SkactivemlClassifier Model implementing the methods `fit` and `predict_proba`. X : array-like of shape (n_samples, n_features), default=None Training data set used to fit the classifier. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). sample_weight : array-like of shape (n_samples,), default=None Weights of training samples in `X`. fit_clf : bool, default=False Defines whether the classifier should be fitted on `X`, `y`, and `sample_weight`. utility_weight: array-like of shape (n_candidates,) Densities for each sample in `candidates`. return_utilities : bool, default=False If `True`, also return the utilities based on the query strategy. reset : bool, default=True Whether to reset the `n_features_in_` attribute. If False, the input will be checked for consistency with data provided when reset was last True. **check_candidates_params : kwargs Parameters passed to :func:`sklearn.utils.check_array`. Returns ------- candidates: np.ndarray, shape (n_candidates, n_features) Checked candidate samples. clf : SkactivemlClassifier Checked model implementing the methods `fit` and `predict_freq`. X: np.ndarray, shape (n_samples, n_features) Checked training data set. y: np.ndarray, shape (n_samples) Checked training labels. sampling_weight: np.ndarray, shape (n_candidates) Checked training sample weight. fit_clf : bool, Checked boolean value of `fit_clf`. utility_weight: array-like of shape (n_candidates,) Checked utility weights. return_utilities : bool, Checked boolean value of `return_utilities`. """ candidates, return_utilities = super()._validate_data( candidates, return_utilities, reset=reset, **check_candidates_params, ) # check if a budgetmanager is set if not hasattr(self, "budget_manager_"): check_type( self.budget_manager, "budget_manager_", BudgetManager, type(None), ) self.budget_manager_ = check_budget_manager( self.budget, self.budget_manager, BalancedIncrementalQuantileFilter, ) X, y, sample_weight = self._validate_X_y_sample_weight( X, y, sample_weight ) clf = self._validate_clf(clf, X, y, sample_weight, fit_clf) utility_weight = self._validate_utility_weight( utility_weight, candidates ) if self.metric is None and not hasattr(clf, "predict_freq"): raise TypeError( "clf has no predict_freq and metric was set to None" ) check_scalar( self.prior, "prior", float, min_val=0, min_inclusive=False ) check_scalar(self.m_max, "m_max", int, min_val=0, min_inclusive=False) self._validate_random_state() return ( candidates, clf, X, y, sample_weight, fit_clf, utility_weight, return_utilities, ) def _validate_X_y_sample_weight(self, X, y, sample_weight): """Validate if X, y and sample_weight are numeric and of equal length. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data set used to fit the classifier. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). sample_weight : array-like of shape (n_samples,) Weights of training samples in `X`. Returns ------- X : array-like of shape (n_samples, n_features) Checked training data set. y : array-like of shape (n_samples) Checked labels of the input samples `X`. Converts `y` to a numpy array. """ if sample_weight is not None: sample_weight = np.array(sample_weight) check_consistent_length(sample_weight, y) if X is not None and y is not None: X = check_array(X) y = np.array(y) check_consistent_length(X, y) return X, y, sample_weight def _validate_clf(self, clf, X, y, sample_weight, fit_clf): """Validate if `clf` is a valid `SkactivemlClassifier`. If `clf` is untrained and `fit_clf`=`True`, `clf` is trained using X, y and sample_weight. Parameters ---------- clf : skactiveml.base.SkactivemlClassifier Model implementing the methods `fit` and `predict_proba`. X : array-like of shape (n_samples, n_features), default=None Training data set used to fit the classifier. y : array-like of shape (n_samples,) Labels of the training data set (possibly including unlabeled ones indicated by `self.missing_label`). sample_weight : array-like of shape (n_samples,), default=None Weights of training samples in `X`. fit_clf : bool, default=False Defines whether the classifier should be fitted on `X`, `y`, and `sample_weight`. Returns ------- clf : skactiveml.base.SkactivemlClassifier Checked model implementing the methods `fit` and `predict_freq`. """ # Check if the classifier and its arguments are valid. check_type(clf, "clf", SkactivemlClassifier) check_type(fit_clf, "fit_clf", bool) if fit_clf: if sample_weight is None: clf = clone(clf).fit(X, y) else: clf = clone(clf).fit(X, y, sample_weight) return clf def _validate_utility_weight(self, utility_weight, candidates): """Validate if utility_weight is numeric and of equal length as candidates. Parameters ---------- candidates : {array-like, sparse matrix} of shape\ (n_candidates, n_features) The samples which may be queried. Sparse matrices are accepted only if they are supported by the base query strategy. utility_weight: array-like of shape (n_candidates,) Densities for each sample in `candidates`. Returns ------- utility_weight : array-like of shape (n_candidates) Checked densities for each sample in `candidates`. """ if utility_weight is None: utility_weight = np.ones(len(candidates)) utility_weight = check_array(utility_weight, ensure_2d=False) check_consistent_length(utility_weight, candidates) return utility_weight