Source code for skactiveml.classifier.multiannotator._annotator_logistic_regression

"""
Logistic Regression for Multiple Annotators
"""

# Author: Marek Herde <marek.herde@uni-kassel.de>
#         Timo Sturm <timo.sturm@student.uni-kassel.de>

import warnings
import numpy as np

from collections.abc import Sequence
from scipy.optimize import minimize
from scipy.special import softmax
from sklearn.utils.validation import (
    check_array,
    check_is_fitted,
    column_or_1d,
)

from ...base import SkactivemlClassifier
from ...utils import (
    MISSING_LABEL,
    compute_vote_vectors,
    is_labeled,
    check_scalar,
    check_n_features,
)


[docs] class AnnotatorLogisticRegression(SkactivemlClassifier): """Logistic Regression for Crowds Logistic Regression based on Raykar [1]_ is a classification algorithm that learns from multiple annotators. Besides, building a model for the classification task, the algorithm estimates the performance of the annotators. The performance of an annotator is assumed to only depend on the true label of a sample and not on the sample itself. Each annotator is assigned a confusion matrix, where each row is normalized. This contains the bias of the annotators decisions. These estimated biases are then used to refine the classifier itself. The classifier also supports a bayesian view on the problem, for this a prior distribution over an annotator's confusion matrix is assumed. It also assumes a prior distribution over the classifiers' weight vectors corresponding to a regularization. Parameters ---------- tol : float, default=0.0001 Threshold for stopping the EM-Algorithm and the optimization of the logistic regression weights. If the change of the respective value between two steps is smaller than `tol`, the respective algorithm stops. max_iter : int, default=100 The maximum number of iterations of the EM-algorithm to be performed. fit_intercept : bool, default=True Specifies if a constant (a.k.a. bias or intercept) should be added to input samples. annot_prior_full : int or float or array-like, default=1 Determines `A` as the Dirichlet prior for each annotator `l` (i.e., `A[l] = annot_prior_full * np.ones(n_classes, n_classes)` for numeric or `A[l] = annot_prior_full[l] * np.ones(n_classes, n_classes)` for array-like parameter). `A[l,i,j]` is the estimated number of times. annotator `l` has provided label `j` for a sample of true label `i`. annot_prior_diag : int or float or array-like, default=0 Adds a value to the diagonal of `A[l]` being the Dirichlet prior for annotator `l` (i.e., `A[l] += annot_prior_diag * np.eye(n_classes)` for numeric or `A[l] += annot_prior_diag[l] * np.ones(n_classes)` for array-like parameter). `A[l,i,j]` is the estimated number of times annotator `l` has provided label `j` for a sample of true label `i`. weights_prior : int or float, default=1 Determines Gamma as the inverse covariance matrix of the prior distribution for every weight vector (i.e., `Gamma=weights_prior * np.eye(n_features)`). As default, the identity matrix is used for each weight vector. solver : str or callable, default='L-BFGS-B' Type of solver. Should be 'Nelder-Mead', 'Powell', 'CG', 'BFGS', 'Newton-CG', 'L-BFGS-B', 'TNC', 'COBYLA', 'SLSQP', 'trust-constr', 'dogleg', 'trust-ncg', 'trust-exact', 'trust-krylov', or custom - a callable object. See `scipy.optimize.minimize` for more information. solver_dict : dictionary, default=None Additional solver options passed to scipy.optimize.minimize. If `None`, {'maxiter': 100} is passed. classes : array-like of shape (n_classes), default=None Holds the label for each class. If none, the classes are determined during the fit. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. cost_matrix : array-like of shape (n_classes, n_classes) Cost matrix with `cost_matrix[i,j]` indicating cost of predicting class `classes[j]` for a sample of class `classes[i]`. Can be only set, if `classes is not None`. random_state : int or RandomState instance or None, default=None Determines random number for `predict` method. Pass an int for reproducible results across multiple method calls. Attributes ---------- n_annotators_ : int Number of annotators. W_ : numpy.ndarray of shape (n_features, n_classes) The weight vectors of the logistic regression model. Alpha_ : numpy.ndarray of shape (n_annotators, n_classes, n_classes) This is a confusion matrix for each annotator, where each row is normalized. `Alpha_[l,k,c]` describes the probability that annotator `l` provides the class label `c` for a sample belonging to class `k`. classes_ : array-like of shape (n_classes,) Holds the label for each class after fitting. cost_matrix_ : array-like of shape (classes, classes) Cost matrix with `C[i,j]` indicating cost of predicting class `self.classes_[j]` for a sample of class `classes_[i]`. References ---------- .. [1] V. C. Raykar, S. Yu, L. H. Zhao, G. H. Valadez, C. Florin, L. Bogoni, and L. Moy. Learning from Crowds. J. Mach. Learn. Res., 11(4):1297–1322, 2010. """ _ALLOWED_EXTRA_OUTPUTS = { "logits", "annotator_perf", "annotator_class", } def __init__( self, n_annotators=None, tol=0.0001, max_iter=100, fit_intercept=True, annot_prior_full=1, annot_prior_diag=0, weights_prior=1, solver="Newton-CG", solver_dict=None, classes=None, cost_matrix=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( classes=classes, missing_label=missing_label, cost_matrix=cost_matrix, random_state=random_state, ) self.n_annotators = n_annotators self.tol = tol self.max_iter = max_iter self.fit_intercept = fit_intercept self.annot_prior_full = annot_prior_full self.annot_prior_diag = annot_prior_diag self.weights_prior = weights_prior self.solver = solver self.solver_dict = solver_dict
[docs] def fit(self, X, y, sample_weight=None): """Fit the model using `X` as samples and `y` as class labels. Parameters ---------- X : array-like of shape (n_samples, n_features) Feature matrix representing the samples. y : array-like of shape (n_samples, n_annotators) It contains the class labels of the training samples, where missing labels are represented via `missing_label`. Specifically, label `y[n, m]` refers to the label of sample `X[n]` from annotator `m`. sample_weight : array-like of shape (n_samples, n_annotators) It contains the weights of the training samples' class labels. It must have the same shape as `y`. Accordingly, the sample weights are only used for the initialization of the majority vote and the computation of the confusion matrix. It is not supported for the update of logistic regression weights and the expectation computation. Returns ------- self: AnnotatorLogisticRegression, The `AnnotatorLogisticRegression` is fitted on the training data. """ # Check input data. X, y, sample_weight = self._validate_data( X=X, y=y, sample_weight=sample_weight, y_ensure_1d=False ) # Ensure value of 'tol' to be a positive integer. if self.n_annotators is not None: check_scalar( self.n_annotators, "n_annotators", min_val=1, min_inclusive=True, target_type=int, ) # Ensure value of 'tol' to be positive. check_scalar( self.tol, "tol", min_val=0, min_inclusive=False, target_type=(int, float), ) # Ensure value of 'max_iter' to be positive. check_scalar( self.max_iter, "max_iter", min_val=1, min_inclusive=True, target_type=int, ) # Ensure value of 'fit_intercept' to be boolean. check_scalar(self.fit_intercept, "fit_intercept", target_type=bool) # Ensure value of 'solver_dict' to be a dictionary. solver_dict = ( {"maxiter": 100} if self.solver_dict is None else self.solver_dict ) check_scalar(solver_dict, "solver_dict", target_type=dict) # Ensure value of 'weights_prior' to be non-negative. check_scalar( self.weights_prior, "weights_prior", min_val=0, min_inclusive=True, target_type=(int, float), ) # Set auxiliary variables. n_classes = len(self.classes_) if self.n_features_in_ is not None: if len(y.shape) != 2: raise ValueError( "`y` must be an array-like of shape " "`(n_samples, n_annotators)`." ) self.n_annotators_ = y.shape[1] else: if self.n_annotators is None: raise ValueError( "`y` cannot be empty, if `n_annotators` is None." ) self.n_annotators_ = self.n_annotators # Check consistent number of annotators. if ( self.n_annotators is not None and self.n_annotators != self.n_annotators_ ): raise ValueError( f"`n_annotators={self.n_annotators}` does not match " f"{self.n_annotators_} as the number of columns in `y`." ) # Check input 'annot_prior_full' and 'annot_prior_diag'. annot_prior = [] for name, prior in [ ("annot_prior_full", self.annot_prior_full), ("annot_prior_diag", self.annot_prior_diag), ]: if isinstance(prior, (int, float)): prior_array = np.ones(self.n_annotators_) * prior else: prior_array = column_or_1d(prior) if name == "annot_prior_full": is_invalid_prior = np.sum(prior_array <= 0) else: is_invalid_prior = np.sum(prior_array < 0) if len(prior_array) != self.n_annotators_ or is_invalid_prior: raise ValueError( f"'{name}' must be either 'int', 'float' or " f"array-like with positive values and shape " f"(n_annotators), got {prior}" ) annot_prior.append(prior_array) # Set up prior matrix for each annotator. A = np.ones((self.n_annotators_, n_classes, n_classes)) for a in range(self.n_annotators_): A[a] *= annot_prior[0][a] A[a] += np.eye(n_classes) * annot_prior[1][a] A_obs = A - 1 A_obs_sum = np.sum(A_obs, axis=-1, keepdims=True) A_norm = np.divide( A_obs, A_obs_sum, out=np.full_like(A_obs, 1 / n_classes, dtype=float), where=A_obs_sum != 0, ) # Fallback, if empty training data has been provided. if self.n_features_in_ is None: self.W_ = None self.Alpha_ = A_norm return self # Remove samples without labels. is_lbld = is_labeled(y, missing_label=-1).any(axis=-1) X = X[is_lbld] y = y[is_lbld] n_samples = X.shape[0] is_lbld = is_labeled(y, missing_label=-1) # Insert bias, if 'fit_intercept' is set to 'True'. n_weights = self.n_features_in_ if self.fit_intercept: n_weights += 1 X = np.insert(X, 0, values=1, axis=1) # Ensure sample weights to form a 2d array. if sample_weight is None: sample_weight = np.ones_like(y) # Set initial weights. self.W_ = np.zeros((n_weights, n_classes)) if n_samples == 0: self.Alpha_ = A_norm return self # Initialize first expectation to infinity such that # |current - new| < tol is False. current_expectation = -np.inf n_iter = 0 # Execute expectation maximization (EM) algorithm. while n_iter < self.max_iter: # -----------------------------E-STEP------------------------------ # Compute probabilistic predictions. P = softmax(X @ self.W_, axis=1) # Estimate latent ground truth labels. if n_iter == 0: # Initialization via majority voting. Mu = compute_vote_vectors( y=y, classes=np.arange(n_classes), missing_label=-1, w=sample_weight, ) Mu /= np.sum(Mu, axis=1, keepdims=True) else: # Use current model parameters to estimate ground truth labels. U = np.ones((n_samples, n_classes)) for c in range(n_classes): for k in range(n_classes): y_is_k = y == k U[:, c] *= np.prod( self.Alpha_[:, c, k] ** y_is_k, axis=1 ) Mu = P * U Mu_sum = Mu.sum(axis=1, keepdims=True) Mu = np.divide( Mu, Mu_sum, out=np.full_like(Mu, 1 / n_classes, dtype=float), where=Mu_sum != 0, ) # Compute expectation for current parameters and estimates # of the ground truth labels. prior_w = -0.5 * self.weights_prior * np.sum(self.W_**2) prior_alpha = np.sum( (A - 1) * np.log(self.Alpha_ + np.finfo(float).eps) ) log_likelihood = np.sum( Mu * np.log(P * U + np.finfo(float).eps) ) new_expectation = log_likelihood + prior_w + prior_alpha # Stop in the case of convergence. if np.abs(new_expectation - current_expectation) < self.tol: break # Otherwise, update the current expectation value. current_expectation = new_expectation # -----------------------------M-STEP------------------------------ # Update the confusion matrices. Alpha = np.zeros((self.n_annotators_, n_classes, n_classes)) for j in range(self.n_annotators_): y_j = np.eye(n_classes)[y[is_lbld[:, j], j].astype(int)] w_j = sample_weight[is_lbld[:, j], j].reshape(-1, 1) Alpha[j] = (Mu[is_lbld[:, j]].T @ (w_j * y_j)) + A[j] - 1 Alpha_sum = Alpha.sum(axis=-1, keepdims=True) self.Alpha_ = np.divide( Alpha, Alpha_sum, out=np.full_like(Alpha, 1 / n_classes, dtype=float), where=Alpha_sum != 0, ) def error(w): """ Compute the cross-entropy loss (negative log-posterior) and its gradient for a softmax-based classifier with L2 regularization. This function calculates the error as the sum of the negative log-likelihood (cross-entropy) of the predicted class probabilities relative to the target distribution, and an L2 regularization penalty on the weights. It also computes the gradient of this loss with respect to the weight vector, which is returned in flattened form. This formulation is suitable for use with optimization routines such as `scipy.optimize.minimize`. Parameters ---------- w : numpy.ndarray of shape (n_weights * n_classes,) Flattened weight vector. It is reshaped to a (n_weights, n_classes) weight matrix. Returns ------- loss : float The computed cross-entropy error (negative log-posterior), including the negative log-likelihood and the L2 regularization penalty. grad : numpy.ndarray of shape (n_features * n_classes,) The gradient of the loss with respect to the weight vector, flattened to a one-dimensional array """ # Reshape weights as matrix. W = w.reshape((n_weights, n_classes)) # Compute probabilistic predictions. logits = X.dot(W) p = softmax(logits, axis=-1) # Compute loss for probabilistic predictions. loss = -np.sum(Mu * np.log(p + np.finfo(float).eps)) # Add L2 penalty to loss. if self.fit_intercept: loss += 0.5 * self.weights_prior * np.sum(W[1:] ** 2) else: loss += 0.5 * self.weights_prior * np.sum(W**2) # Compute L2 part of the gradient. if self.fit_intercept: reg_grad = np.zeros_like(W) reg_grad[1:, :] = self.weights_prior * W[1:, :] else: reg_grad = self.weights_prior * W # Compute final gradient. grad = X.T.dot(p - Mu) + reg_grad return loss, grad.ravel() def hessp(w, v): """ Compute the Hessian-vector product for the error function. This function computes the product of the Hessian of the error function with a given vector, which is useful for second-order optimization routines (e.g. in scipy.optimize.minimize when a Hessian-vector product is supplied via the 'hessp' argument). The error function is defined via a softmax-based prediction model with an added regularization term. Parameters ---------- w : numpy.ndarray of shape (n_weights * n_classes,) Flattened weight vector. It is reshaped to a (n_weights, n_classes) weight matrix. v : numpy.ndarray of shape (n_weights * n_classes,) Flattened vector to be multiplied by the Hessian. It is reshaped to a (n_weights, n_classes) matrix. Returns ------- Hv : numpy.ndarray of shape (n_weights * n_classes,) The product of the Hessian (of the error function) with the vector v, returned as a flat array. """ # Reshape weights as matrix. W = w.reshape((n_weights, n_classes)) # Compute probabilistic predictions. logits = X.dot(W) probas = softmax(logits, axis=-1) # Compute intermediate results. V = v.reshape((n_weights, n_classes)) M = X.dot(V) # For each sample X[i], compute: # R[i,j] = p[i,j] * ( M[i,j] - sum_k p[i,k]*M[i,k] ) R = probas * (M - np.sum(probas * M, axis=1, keepdims=True)) if self.fit_intercept: reg_Hv = np.zeros_like(V) reg_Hv[1:, :] = self.weights_prior * V[1:, :] else: reg_Hv = self.weights_prior * V Hv = X.T.dot(R) + reg_Hv return Hv.ravel() # Update weights of the logistic regression model. with warnings.catch_warnings(): warning_msg = ".* does not use Hessian.* information.*" warnings.filterwarnings("ignore", message=warning_msg) warning_msg = ".* does not use gradient information.*" warnings.filterwarnings("ignore", message=warning_msg) res = minimize( error, x0=np.zeros((n_weights * n_classes)), method=self.solver, tol=self.tol, jac=True, hessp=hessp, options=solver_dict, ) self.W_ = res.x.reshape((n_weights, n_classes)) # Continue with next iteration. n_iter += 1 return self
[docs] def predict( self, X, extra_outputs=None, ): """Return class predictions for the test samples `X`. By default, this method returns only the class predictions `y_pred`. If `extra_outputs` is provided, a tuple is returned whose first element is `y_pred` and whose remaining elements are the requested additional forward outputs, in the order specified by `extra_outputs`. Parameters ---------- X : array-like of shape (n_samples, ...) Test samples. extra_outputs : None or str or sequence of str, default=None Names of additional outputs to return next to `y_pred`. The names must be a subset of the following keys: - "logits" : Additionally return the class-membership logits `L_class` for the samples in `X`. - "annotator_perf" : additionally return the estimated annotator performance probabilities `P_perf` for each sample–annotator pair. - "annotator_class" : Additionally return the annotator–class probability estimates `P_annot` for each sample, class, and annotator. Returns ------- y_pred : numpy.ndarray of shape (n_samples,) Class predictions of the test samples. *extras : numpy.ndarray, optional Only returned if `extra_outputs` is not `None`. In that case, the method returns a tuple whose first element is `P` and whose remaining elements correspond to the requested forward outputs in the order given by `extra_outputs`. Potential outputs are: - `L_class` : `np.ndarray` of shape `(n_samples, n_classes)`, where `L_class[n, c]` is the logit for the class `classes_[c]` of sample `X[n]`. - `P_perf` : `np.ndarray` of shape `(n_samples, n_annotators)`, where `P_perf[n, m]` refers to the estimated label correctness probability (performance) of annotator `m` when labeling sample `X[n]`. - `P_annot` : `np.ndarray` of shape `(n_samples, n_annotators, n_classes)`, where `P_annot[n, m, c]` refers to the probability that annotator `m` provides the class label `c` for sample `X[n]`. """ return SkactivemlClassifier.predict( self, X=X, extra_outputs=extra_outputs, )
[docs] def predict_proba( self, X, extra_outputs=None, ): """Return class probability estimates for the test samples `X`. By default, this method returns only the class probabilities `P`. If `extra_outputs` is provided, a tuple is returned whose first element is `P` and whose remaining elements are the requested additional forward outputs, in the order specified by `extra_outputs`. Parameters ---------- X : array-like of shape (n_samples, ...) Test samples. extra_outputs : None or str or sequence of str, default=None Names of additional outputs to return next to `P`. The names must be a subset of the following keys: - "logits" : Additionally return the class-membership logits `L_class` for the samples in `X`. - "annotator_perf" : additionally return the estimated annotator performance probabilities `P_perf` for each sample–annotator pair. - "annotator_class" : Additionally return the annotator–class probability estimates `P_annot` for each sample, class, and annotator. Returns ------- P : numpy.ndarray of shape (n_samples, n_classes) Class probabilities of the test samples. Classes are ordered according to `self.classes_`. *extras : numpy.ndarray, optional Only returned if `extra_outputs` is not `None`. In that case, the method returns a tuple whose first element is `P` and whose remaining elements correspond to the requested forward outputs in the order given by `extra_outputs`. Potential outputs are: - `L_class` : `np.ndarray` of shape `(n_samples, n_classes)`, where `L_class[n, c]` is the logit for the class `classes_[c]` of sample `X[n]`. - `P_perf` : `np.ndarray` of shape `(n_samples, n_annotators)`, where `P_perf[n, m]` refers to the estimated label correctness probability (performance) of annotator `m` when labeling sample `X[n]`. - `P_annot` : `np.ndarray` of shape `(n_samples, n_annotators, n_classes)`, where `P_annot[n, m, c]` refers to the probability that annotator `m` provides the class label `c` for sample `X[n]`. Only returned, if `return_annotator_class=True`. """ # Check extra outputs format. if extra_outputs is None: extra_outputs = [] elif isinstance(extra_outputs, str): extra_outputs = [extra_outputs] elif isinstance(extra_outputs, Sequence) and not isinstance( extra_outputs, bytes ): extra_outputs = list(extra_outputs) else: raise TypeError( "`extra_outputs` must be None, a string, or a sequence " f"of strings, got {type(extra_outputs)}." ) if len(set(extra_outputs)) != len(extra_outputs): raise ValueError( "`extra_outputs` must not contain duplicate names." ) unknown = [ n for n in extra_outputs if n not in self._ALLOWED_EXTRA_OUTPUTS ] if unknown: raise ValueError( f"Requested extra output(s) {unknown!r} are not defined; " f"allowed names are {sorted(self._ALLOWED_EXTRA_OUTPUTS)!r}." ) # Check test samples. check_is_fitted(self) X = check_array(X) check_n_features(self, X, reset=False) # Check whether a bias feature is missing. if self.fit_intercept: X = np.insert(X, 0, values=1, axis=1) # Compute logits and normalize to probabilities. if self.W_ is None: L_class = np.zeros((len(X), len(self.classes_))) else: L_class = X @ self.W_ P_class = softmax(L_class, axis=1) # Return only class probabilities as primary output. if not extra_outputs: return P_class # Return request extra outputs next to the class probabilities. out = [P_class] for extra in extra_outputs: if extra == "logits": out.append(L_class) if extra == "annotator_perf": diag_alpha = np.diagonal(self.Alpha_, axis1=1, axis2=2) P_perf = np.einsum("ik,lk->il", P_class, diag_alpha) out.append(P_perf) if extra == "annotator_class": P_annot = np.einsum("ik,lkc->ilc", P_class, self.Alpha_) out.append(P_annot) return tuple(out)