Source code for skactiveml.classifier.multiannotator._annotator_logistic_regression

"""
Logistic Regression for Multiple Annotators
"""

# Author: Marek Herde <marek.herde@uni-kassel.de>
#         Timo Sturm <timo.sturm@student.uni-kassel.de>

import warnings

import numpy as np
from scipy.optimize import minimize
from scipy.special import softmax
from scipy.stats import dirichlet
from scipy.stats import multivariate_normal as multi_normal
from sklearn.utils.validation import check_array, check_is_fitted, column_or_1d

from ...base import SkactivemlClassifier, AnnotatorModelMixin
from ...utils import (
    MISSING_LABEL,
    compute_vote_vectors,
    rand_argmax,
    ext_confusion_matrix,
)


[docs]class AnnotatorLogisticRegression(SkactivemlClassifier, AnnotatorModelMixin): """AnnotatorLogisticRegression Logistic Regression based on Raykar [1] is a classification algorithm that learns from multiple annotators. Besides, building a model for the classification task, the algorithm estimates the performance of the annotators. The performance of an annotator is assumed to only depend on the true label of a sample and not on the sample itself. Each annotator is assigned a confusion matrix, where each row is normalized. This contains the bias of the annotators decisions. These estimated biases are then used to refine the classifier itself. The classifier also supports a bayesian view on the problem, for this a prior distribution over an annotator's confusion matrix is assumed. It also assumes a prior distribution over the classifiers weight vectors corresponding to a regularization. Parameters ---------- tol : float, default=1.e-2 Threshold for stopping the EM-Algorithm, if the change of the expectation value between two steps is smaller than tol, the fit algorithm stops. max_iter : int, default=100 The maximum number of iterations of the EM-algorithm to be performed. fit_intercept : bool, default=True Specifies if a constant (a.k.a. bias or intercept) should be added to input samples. annot_prior_full : int or float or array-like, default=1 This parameter determines A as the Dirichlet prior for each annotator l (i.e., A[l] = annot_prior_full * np.ones(n_classes, n_classes) for numeric or A[l] = annot_prior_full[l] * np.ones(n_classes, n_classes) for array-like parameter). A[l,i,j] is the estimated number of times. annotator l has provided label j for an instance of true label i. annot_prior_diag : int or float or array-like, default=0 This parameter adds a value to the diagonal of A[l] being the Dirichlet prior for annotator l (i.e., A[l] += annot_prior_diag * np.eye(n_classes) for numeric or A[l] += annot_prior_diag[l] * np.ones(n_classes) for array-like parameter). A[l,i,j] is the estimated number of times annotator l has provided label j for an instance of true label i. weights_prior : int or float, default=1 Determines Gamma as the inverse covariance matrix of the prior distribution for every weight vector (i.e., Gamma=weights_prior * np.eye(n_features)). As default, the identity matrix is used for each weight vector. solver : str or callable, default='Newton-CG' Type of solver. Should be 'Nelder-Mead', 'Powell', 'CG', 'BFGS', 'Newton-CG', 'L-BFGS-B', 'TNC', 'COBYLA', 'SLSQP', 'trust-constr', 'dogleg', 'trust-ncg', 'trust-exact', 'trust-krylov', or custom - a callable object. See scipy.optimize.minimize for more information. solver_dict : dictionary, default=None Additional solver options passed to scipy.optimize.minimize. If None, {'maxiter': 5} is passed. classes : array-like of shape (n_classes), default=None Holds the label for each class. If none, the classes are determined during the fit. missing_label : scalar or string or np.nan or None, default=np.nan Value to represent a missing label. cost_matrix : array-like of shape (n_classes, n_classes) Cost matrix with cost_matrix[i,j] indicating cost of predicting class classes[j] for a sample of class classes[i]. Can be only set, if classes is not none. random_state : int or RandomState instance or None, optional (default=None) Determines random number for 'predict' method. Pass an int for reproducible results across multiple method calls. Attributes ---------- n_annotators_ : int Number of annotators. W_ : numpy.ndarray of shape (n_features, n_classes) The weight vectors of the logistic regression model. Alpha_ : numpy.ndarray of shape (n_annotators, n_classes, n_classes) This is a confusion matrix for each annotator, where each row is normalized. `Alpha_[l,k,c]` describes the probability that annotator l provides the class label c for a sample belonging to class k. classes_ : array-like of shape (n_classes) Holds the label for each class after fitting. cost_matrix_ : array-like of shape (classes, classes) Cost matrix with C[i,j] indicating cost of predicting class classes_[j] for a sample of class classes_[i]. References ---------- .. [1] `Raykar, V. C., Yu, S., Zhao, L. H., Valadez, G. H., Florin, C., Bogoni, L., & Moy, L. (2010). Learning from crowds. Journal of Machine Learning Research, 11(4).`_ """ def __init__( self, tol=1.0e-2, max_iter=100, fit_intercept=True, annot_prior_full=1, annot_prior_diag=0, weights_prior=1, solver="Newton-CG", solver_dict=None, classes=None, cost_matrix=None, missing_label=MISSING_LABEL, random_state=None, ): super().__init__( classes=classes, missing_label=missing_label, cost_matrix=cost_matrix, random_state=random_state, ) self.tol = tol self.max_iter = max_iter self.fit_intercept = fit_intercept self.annot_prior_full = annot_prior_full self.annot_prior_diag = annot_prior_diag self.weights_prior = weights_prior self.solver = solver self.solver_dict = solver_dict
[docs] def fit(self, X, y, sample_weight=None): """Fit the model using X as training data and y as class labels. Parameters ---------- X : matrix-like, shape (n_samples, n_features) The sample matrix X is the feature matrix representing the samples. y : array-like, shape (n_samples) or (n_samples, n_outputs) It contains the class labels of the training samples. The number of class labels may be variable for the samples, where missing labels are represented the attribute 'missing_label'. sample_weight : array-like, shape (n_samples) or (n_samples, n_outputs) It contains the weights of the training samples' class labels. It must have the same shape as y. Returns ------- self: AnnotatorLogisticRegression, The AnnotatorLogisticRegression is fitted on the training data. """ # Check input data. X, y, sample_weight = self._validate_data( X=X, y=y, sample_weight=sample_weight, y_ensure_1d=False ) self._check_n_features(X, reset=True) # Ensure value of 'tol' to be positive. if not isinstance(self.tol, float): raise TypeError( "`tol` must be an instance of float, not {}.".format( type(self.tol) ) ) if self.tol <= 0: raise ValueError("`tol`= {}, must be > 0.".format(self.tol)) # Ensure value of 'max_iter' to be positive. if not isinstance(self.max_iter, int): raise TypeError( "`max_iter` must be an instance of int, not {}.".format( type(self.max_iter) ) ) if self.max_iter <= 0: raise ValueError( "`max_iter`= {}, must be an integer >= 1.".format(self.tol) ) if not isinstance(self.fit_intercept, bool): raise TypeError( "'fit_intercept' must be of type 'bool', got {}".format( type(self.fit_intercept) ) ) solver_dict = ( {"maxiter": 5} if self.solver_dict is None else self.solver_dict ) # Check weights prior. if not isinstance(self.weights_prior, (int, float)): raise TypeError( "'weights_prior' must be of a positive 'int' or " "'float', got {}".format(type(self.weights_prior)) ) if self.weights_prior < 0: raise ValueError( "'weights_prior' must be of a positive 'int' or " "'float', got {}".format(self.weights_prior) ) # Check for empty training data. if self.n_features_in_ is None: return self if len(y.shape) != 2: raise ValueError( "`y` must be an array-like of shape " "`(n_samples, n_annotators)`." ) # Insert bias, if 'fit_intercept' is set to 'True'. if self.fit_intercept: X = np.insert(X, 0, values=1, axis=1) # Ensure sample weights to form a 2d array. if sample_weight is None: sample_weight = np.ones_like(y) # Set auxiliary variables. n_samples = X.shape[0] n_features = X.shape[1] n_classes = len(self.classes_) self.n_annotators_ = y.shape[1] identity_matrix = np.eye(n_classes) # Convert Gamma to matrix, if it is a number: Gamma = self.weights_prior * np.eye(n_features) all_zeroes = not np.any(Gamma) Gamma_tmp = Gamma if all_zeroes else np.linalg.inv(Gamma) # Check input 'annot_prior_full' and 'annot_prior_diag'. annot_prior = [] for name, prior in [ ("annot_prior_full", self.annot_prior_full), ("annot_prior_diag", self.annot_prior_diag), ]: if isinstance(prior, int or float): prior_array = np.ones(self.n_annotators_) * prior else: prior_array = column_or_1d(prior) if name == "annot_prior_full": is_invalid_prior = np.sum(prior_array <= 0) else: is_invalid_prior = np.sum(prior_array < 0) if len(prior_array) != self.n_annotators_ or is_invalid_prior: raise ValueError( "'{}' must be either 'int', 'float' or " "array-like with positive values and shape " "(n_annotators), got {}".format(name, prior) ) annot_prior.append(prior_array) # Set up prior matrix for each annotator. A = np.ones((self.n_annotators_, n_classes, n_classes)) for a in range(self.n_annotators_): A[a] *= annot_prior[0][a] A[a] += np.eye(n_classes) * annot_prior[1][a] # Init Mu (i.e., estimates of true labels) with (weighted) majority # voting. Mu = compute_vote_vectors( y=y, classes=np.arange(n_classes), missing_label=-1, w=sample_weight, ) Mu_sum = np.sum(Mu, axis=1) is_zero = Mu_sum == 0 Mu[~is_zero] /= Mu_sum[~is_zero, np.newaxis] Mu[is_zero] = 1 / n_classes # Set initial weights. self.W_ = np.zeros((n_features, n_classes)) # Use majority vote to initialize alpha, alpha_j is the confusion # matrix of annotator j. y_majority = rand_argmax(Mu, random_state=self.random_state, axis=1) self.Alpha_ = ext_confusion_matrix( y_true=y_majority, y_pred=y, normalize="true", missing_label=-1, classes=np.arange(n_classes), ) # Initialize first expectation to infinity such that # |current - new| < tol is False. current_expectation = -np.inf # Execute expectation maximization (EM) algorithm. self.n_iter_ = 0 while self.n_iter_ < self.max_iter: # E-step: P = softmax(X @ self.W_, axis=1) V = self._calc_V(y, self.Alpha_) Mu = self._calc_Mu(V, P) new_expectation = self._calc_expectation( Mu, P, V, Gamma, A, self.Alpha_, self.W_ ) # Stop EM, if it converges (to a local maximum). if ( current_expectation == new_expectation or (new_expectation - current_expectation) < self.tol ): break # Update expectation value. current_expectation = new_expectation # M-Step: self._Alpha = self._calc_Alpha(y, Mu, A, sample_weight) def error(w): """ Evaluate cross-entropy error of weights for scipy.minimize. Parameters ---------- w : ndarray, shape (n_features * n_classes) Weights for which cross-entropy error is to be computed. Returns ------- G : flaot Computed cross-entropy error. """ W = w.reshape(n_features, n_classes) P_W = softmax(X @ W, axis=1) prior_W = 0 for c_idx in range(n_classes): prior_W += multi_normal.logpdf( x=W[:, c_idx], cov=Gamma_tmp, allow_singular=True ) log = np.sum(Mu * np.log(P_W * V + np.finfo(float).eps)) log += prior_W return -log / n_samples def grad(w): """ Compute gradient of error function for scipy.minimize. Parameters ---------- w : ndarray, shape (n_features * n_classes) Weights whose gradient is to be computed. Returns ------- G : narray, shape (n_features * n_classes) Computed gradient of weights. """ W = w.reshape(n_features, n_classes) P_W = softmax(X @ W, axis=1) G = (X.T @ (P_W - Mu) + Gamma @ W).ravel() return G / n_samples def hessian(w): """ Compute Hessian matrix of error function for scipy.minimize. Parameters ---------- w : numpy.ndarray, shape (n_features * n_classes) Weights whose Hessian matrix is to be computed. Returns ------- H : numpy.narray, shape (n_features * n_classes, n_features * n_classes) Computed Hessian matrix of weights. """ W = w.reshape(n_features, n_classes) H = np.empty((n_classes * n_features, n_classes * n_features)) P_W = softmax(X @ W, axis=1) for k in range(n_classes): for j in range(n_classes): diagonal = P_W[:, j] * ( identity_matrix[k, j] - P_W[:, k] ) D = np.diag(diagonal) H_kj = X.T @ D @ X + Gamma H[ k * n_features : (k + 1) * n_features, j * n_features : (j + 1) * n_features, ] = H_kj return H / n_samples with warnings.catch_warnings(): warning_msg = ".*Method .* does not use Hessian information.*" warnings.filterwarnings("ignore", message=warning_msg) warning_msg = ".*Method .* does not use gradient information.*" warnings.filterwarnings("ignore", message=warning_msg) res = minimize( error, x0=self.W_.ravel(), method=self.solver, tol=self.tol, jac=grad, hess=hessian, options=solver_dict, ) self.W_ = res.x.reshape((n_features, n_classes)) self.n_iter_ += 1 return self
[docs] def predict_proba(self, X): """Return probability estimates for the test data `X`. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. Returns ------- P : numpy.ndarray of shape (n_samples, classes) The class probabilities of the test samples. Classes are ordered according to `classes_`. """ # Check test samples. check_is_fitted(self) X = check_array(X) self._check_n_features(X, reset=False) # Prediction without training data. if self.n_features_in_ is None: return np.ones((len(X), len(self.classes_))) / len(self.classes_) # Check whether a bias feature is missing. if self.fit_intercept: X = np.insert(X, 0, values=1, axis=1) # Compute and normalize probabilities. P = softmax(X @ self.W_, axis=1) return P
[docs] def predict_annotator_perf(self, X): """Calculates the probability that an annotator provides the true label for a given sample. The true label is hereby provided by the classification model. The label provided by an annotator l is based on his/her confusion matrix (i.e., attribute `Alpha_[l]`). Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. Returns ------- P_annot : numpy.ndarray of shape (n_samples, classes) `P_annot[i,l]` is the probability, that annotator l provides the correct class label for sample `X[i]`. """ # Prediction without training data. if self.n_features_in_ is None: return np.ones((len(X), 1)) / len(self.classes_) # Compute class probabilities. P = self.predict_proba(X) # Get correctness probabilities for each annotator per class. diag_Alpha = np.array( [np.diagonal(self._Alpha[j]) for j in range(self._Alpha.shape[0])] ) # Compute correctness probabilities for each annotator per sample. P_annot = P @ diag_Alpha.T return P_annot
@staticmethod def _calc_V(y, Alpha): """Calculates a value used for updating Mu and the expectation. Parameters ---------- y: numpy.ndarray of shape (n_samples, n_annotators) The class labels provided by the annotators for all samples. Alpha: numpy.ndarray of shape (n_annotators, n_classes, n_classes) annot_prior vector (n_annotators, n_classes, n_classes) containing the new estimates for Alpha. This is effectively a confusion matrix for each annotator, where each row is normalized. Returns ------- out: numpy.ndarray Vector of shape (n_samples, n_classes). """ n_samples, _, n_classes = ( y.shape[0], y.shape[1], Alpha.shape[1], ) V = np.ones((n_samples, n_classes)) for c in range(n_classes): for k in range(n_classes): y_is_k = y == k V[:, c] *= np.prod(Alpha[:, c, k] ** y_is_k, axis=1) return V @staticmethod def _calc_Alpha(y, Mu, A, sample_weight): """Calculates the class-dependent performance estimates of the annotators. Parameters ---------- y : numpy.ndarray of shape (n_samples, n_annotators) The class labels provided by the annotators for all samples. Mu : numpy.ndarray of shape (n_samples, n_classes) Mu[i,k] contains the probability of a sample X[i] to be of class classes_[k] estimated according to the EM-algorithm. A : numpy.ndarray of shape (n_annotators, n_classes, n_classes) A[l,i,j] is the estimated number of times. annotator l has provided label j for an instance of true label i. sample_weight : numpy.ndarray of shape (n_samples, n_annotators) It contains the weights of the training samples' class labels. It must have the same shape as y. Returns ---------- new_Alpha : numpy.ndarray of shape (n_annotators, n_classes, n_classes) This is a confusion matrix for each annotator, where each row is normalized. `new_Alpha[l,k,c]` describes the probability that annotator l provides the class label c for a sample belonging to class k. """ n_annotators, n_classes = y.shape[1], Mu.shape[1] new_Alpha = np.zeros((n_annotators, n_classes, n_classes)) not_nan_y = ~np.isnan(y) for j in range(n_annotators): # Only take those rows from Y, where Y is not NaN: y_j = np.eye(n_classes)[y[not_nan_y[:, j], j].astype(int)] w_j = sample_weight[not_nan_y[:, j], j].reshape(-1, 1) new_Alpha[j] = (Mu[not_nan_y[:, j]].T @ (w_j * y_j)) + A[j] - 1 # Lazy normalization: (The real normalization factor # (sum_i=1^N mu_i,c + sum_k=0^K-1 A_j,c,k - K) is omitted here) with np.errstate(all="ignore"): new_Alpha = new_Alpha / new_Alpha.sum(axis=2, keepdims=True) new_Alpha = np.nan_to_num(new_Alpha, nan=1.0 / n_classes) return new_Alpha @staticmethod def _calc_Mu(V, P): """Calculates the new estimate for Mu, using Bayes' theorem. Parameters ---------- V : numpy.ndarray, shape (n_samples, n_classes) Describes an intermediate result. P : numpy.ndarray, shape (n_samples, n_classes) P[i,k] contains the probabilities of sample X[i] belonging to class classes_[k], as estimated by the classifier (i.e., sigmoid(W.T, X[i])). Returns ------- new_Mu : numpy.ndarray new_Mu[i,k] contains the probability of a sample X[i] to be of class classes_[k] estimated according to the EM-algorithm. """ new_Mu = P * V new_Mu_sum = np.sum(new_Mu, axis=1) is_zero = new_Mu_sum == 0 new_Mu[~is_zero] /= new_Mu_sum[~is_zero, np.newaxis] new_Mu[is_zero] = 1 / P.shape[1] return new_Mu @staticmethod def _calc_expectation(Mu, P, V, Gamma, A, Alpha, W): """Calculates the conditional expectation in the E-step of the EM-Algorithm, given the observations and the current estimates of the classifier. Parameters ---------- Mu : numpy.ndarray, shape (n_samples, n_classes) Mu[i,k] contains the probability of a sample X[i] to be of class classes_[k] estimated according to the EM-algorithm. V : numpy.ndarray, shape (n_samples, n_classes) Describes an intermediate result. P : numpy.ndarray, shape (n_samples, n_classes) P[i,k] contains the probabilities of sample X[i] belonging to class classes_[k], as estimated by the classifier (i.e., sigmoid(W.T, X[i])). Returns ------- expectation : float The conditional expectation. """ # Evaluate prior of weight vectors. all_zeroes = not np.any(Gamma) Gamma = Gamma if all_zeroes else np.linalg.inv(Gamma) prior_W = np.sum( [ multi_normal.logpdf(x=W[:, k], cov=Gamma, allow_singular=True) for k in range(W.shape[1]) ] ) # Evaluate prior of alpha matrices. prior_Alpha = np.sum( [ [ dirichlet.logpdf(x=Alpha[j, k, :], alpha=A[j, k, :]) for k in range(Alpha.shape[1]) ] for j in range(Alpha.shape[0]) ] ) # Evaluate log-likelihood for data. log_likelihood = np.sum(Mu * np.log(P * V + np.finfo(float).eps)) expectation = log_likelihood + prior_W + prior_Alpha return expectation