"""
Logistic Regression for Multiple Annotators
"""
# Author: Marek Herde <marek.herde@uni-kassel.de>
# Timo Sturm <timo.sturm@student.uni-kassel.de>
import warnings
import numpy as np
from scipy.optimize import minimize
from scipy.special import softmax
from sklearn.utils.validation import (
check_array,
check_is_fitted,
column_or_1d,
)
from ...base import SkactivemlClassifier, AnnotatorModelMixin
from ...utils import (
MISSING_LABEL,
compute_vote_vectors,
is_labeled,
check_scalar,
check_n_features,
)
[docs]class AnnotatorLogisticRegression(SkactivemlClassifier, AnnotatorModelMixin):
"""Logistic Regression for Crowds
Logistic Regression based on Raykar [1]_ is a classification algorithm
that learns from multiple annotators. Besides, building a model for the
classification task, the algorithm estimates the performance of the
annotators. The performance of an annotator is assumed to only depend on
the true label of a sample and not on the sample itself. Each annotator is
assigned a confusion matrix, where each row is normalized. This contains
the bias of the annotators decisions. These estimated biases are then
used to refine the classifier itself.
The classifier also supports a bayesian view on the problem, for this a
prior distribution over an annotator's confusion matrix is assumed. It also
assumes a prior distribution over the classifiers' weight vectors
corresponding to a regularization.
Parameters
----------
tol : float, default=0.0001
Threshold for stopping the EM-Algorithm and the optimization of the
the logistic regression weights. If the change of the respective value
between two steps is smaller than `tol`, the respective algorithm
stops.
max_iter : int, default=100
The maximum number of iterations of the EM-algorithm to be performed.
fit_intercept : bool, default=True
Specifies if a constant (a.k.a. bias or intercept) should be
added to input samples.
annot_prior_full : int or float or array-like, default=1
Determines `A` as the Dirichlet prior for each annotator `l`
(i.e., `A[l] = annot_prior_full * np.ones(n_classes, n_classes)` for
numeric or `A[l] = annot_prior_full[l] * np.ones(n_classes, n_classes)`
for array-like parameter). `A[l,i,j]` is the estimated number of times.
annotator `l` has provided label `j` for a sample of true label `i`.
annot_prior_diag : int or float or array-like, default=0
Adds a value to the diagonal of `A[l]` being the Dirichlet
prior for annotator `l` (i.e., `A[l] += annot_prior_diag *
np.eye(n_classes)` for numeric or `A[l] += annot_prior_diag[l] *
np.ones(n_classes)` for array-like parameter). `A[l,i,j]` is the
estimated number of times annotator `l` has provided label `j` for
a sample of true label `i`.
weights_prior : int or float, default=1
Determines Gamma as the inverse covariance matrix of the
prior distribution for every weight vector
(i.e., `Gamma=weights_prior * np.eye(n_features)`).
As default, the identity matrix is used for each weight vector.
solver : str or callable, default='L-BFGS-B'
Type of solver. Should be 'Nelder-Mead', 'Powell', 'CG',
'BFGS', 'Newton-CG', 'L-BFGS-B', 'TNC', 'COBYLA', 'SLSQP',
'trust-constr', 'dogleg', 'trust-ncg', 'trust-exact', 'trust-krylov',
or custom - a callable object. See `scipy.optimize.minimize` for more
information.
solver_dict : dictionary, default=None
Additional solver options passed to scipy.optimize.minimize. If `None`,
{'maxiter': 100} is passed.
classes : array-like of shape (n_classes), default=None
Holds the label for each class. If none, the classes are determined
during the fit.
missing_label : scalar or string or np.nan or None, default=np.nan
Value to represent a missing label.
cost_matrix : array-like of shape (n_classes, n_classes)
Cost matrix with `cost_matrix[i,j]` indicating cost of predicting class
`classes[j]` for a sample of class `classes[i]`. Can be only set, if
`classes is not None`.
random_state : int or RandomState instance or None, default=None
Determines random number for `predict` method. Pass an int for
reproducible results across multiple method calls.
Attributes
----------
n_annotators_ : int
Number of annotators.
W_ : numpy.ndarray of shape (n_features, n_classes)
The weight vectors of the logistic regression model.
Alpha_ : numpy.ndarray of shape (n_annotators, n_classes, n_classes)
This is a confusion matrix for each annotator, where each
row is normalized. `Alpha_[l,k,c]` describes the probability
that annotator `l` provides the class label `c` for a sample belonging
to class `k`.
classes_ : array-like of shape (n_classes,)
Holds the label for each class after fitting.
cost_matrix_ : array-like of shape (classes, classes)
Cost matrix with `C[i,j]` indicating cost of predicting class
`self.classes_[j]` for a sample of class `classes_[i]`.
References
----------
.. [1] V. C. Raykar, S. Yu, L. H. Zhao, G. H. Valadez, C. Florin, L.
Bogoni, and L. Moy. Learning from Crowds. J. Mach. Learn. Res.,
11(4):1297–1322, 2010.
"""
def __init__(
self,
n_annotators=None,
tol=0.0001,
max_iter=100,
fit_intercept=True,
annot_prior_full=1,
annot_prior_diag=0,
weights_prior=1,
solver="Newton-CG",
solver_dict=None,
classes=None,
cost_matrix=None,
missing_label=MISSING_LABEL,
random_state=None,
):
super().__init__(
classes=classes,
missing_label=missing_label,
cost_matrix=cost_matrix,
random_state=random_state,
)
self.n_annotators = n_annotators
self.tol = tol
self.max_iter = max_iter
self.fit_intercept = fit_intercept
self.annot_prior_full = annot_prior_full
self.annot_prior_diag = annot_prior_diag
self.weights_prior = weights_prior
self.solver = solver
self.solver_dict = solver_dict
[docs] def fit(self, X, y, sample_weight=None):
"""Fit the model using `X` as samples and `y` as class labels.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Feature matrix representing the samples.
y : array-like of shape (n_samples,) or (n_samples, n_outputs)
It contains the class labels of the training samples.
The number of class labels may be variable for the samples, where
missing labels are represented the attribute `missing_label`.
sample_weight : array-like of shape (n_samples,) or\
(n_samples, n_outputs)
It contains the weights of the training samples' class labels.
It must have the same shape as `y`. Accordingly, the sample
weights are only used for the initialization of the majority vote
and the computation of the confusion matrix. It is not supported
for the update of logistic regression weights and the expectation
computation.
Returns
-------
self: AnnotatorLogisticRegression,
The `AnnotatorLogisticRegression` is fitted on the training data.
"""
# Check input data.
X, y, sample_weight = self._validate_data(
X=X, y=y, sample_weight=sample_weight, y_ensure_1d=False
)
# Ensure value of 'tol' to be a positive integer.
if self.n_annotators is not None:
check_scalar(
self.n_annotators,
"n_annotators",
min_val=1,
min_inclusive=True,
target_type=int,
)
# Ensure value of 'tol' to be positive.
check_scalar(
self.tol,
"tol",
min_val=0,
min_inclusive=False,
target_type=(int, float),
)
# Ensure value of 'max_iter' to be positive.
check_scalar(
self.max_iter,
"max_iter",
min_val=1,
min_inclusive=True,
target_type=int,
)
# Ensure value of 'fit_intercept' to be boolean.
check_scalar(self.fit_intercept, "fit_intercept", target_type=bool)
# Ensure value of 'solver_dict' to be a dictionary.
solver_dict = (
{"maxiter": 100} if self.solver_dict is None else self.solver_dict
)
check_scalar(solver_dict, "solver_dict", target_type=dict)
# Ensure value of 'weights_prior' to be non-negative.
check_scalar(
self.weights_prior,
"weights_prior",
min_val=0,
min_inclusive=True,
target_type=(int, float),
)
# Set auxiliary variables.
n_classes = len(self.classes_)
if self.n_features_in_ is not None:
if len(y.shape) != 2:
raise ValueError(
"`y` must be an array-like of shape "
"`(n_samples, n_annotators)`."
)
self.n_annotators_ = y.shape[1]
else:
if self.n_annotators is None:
raise ValueError(
"`y` cannot be empty, if `n_annotators` is None."
)
self.n_annotators_ = self.n_annotators
# Check consistent number of annotators.
if (
self.n_annotators is not None
and self.n_annotators != self.n_annotators_
):
raise ValueError(
f"`n_annotators={self.n_annotators}` does not match "
f"{self.n_annotators_} as the number of columns in `y`."
)
# Check input 'annot_prior_full' and 'annot_prior_diag'.
annot_prior = []
for name, prior in [
("annot_prior_full", self.annot_prior_full),
("annot_prior_diag", self.annot_prior_diag),
]:
if isinstance(prior, (int, float)):
prior_array = np.ones(self.n_annotators_) * prior
else:
prior_array = column_or_1d(prior)
if name == "annot_prior_full":
is_invalid_prior = np.sum(prior_array <= 0)
else:
is_invalid_prior = np.sum(prior_array < 0)
if len(prior_array) != self.n_annotators_ or is_invalid_prior:
raise ValueError(
f"'{name}' must be either 'int', 'float' or "
f"array-like with positive values and shape "
f"(n_annotators), got {prior}"
)
annot_prior.append(prior_array)
# Set up prior matrix for each annotator.
A = np.ones((self.n_annotators_, n_classes, n_classes))
for a in range(self.n_annotators_):
A[a] *= annot_prior[0][a]
A[a] += np.eye(n_classes) * annot_prior[1][a]
A_obs = A - 1
A_obs_sum = np.sum(A_obs, axis=-1, keepdims=True)
A_norm = np.divide(
A_obs,
A_obs_sum,
out=np.full_like(A_obs, 1 / n_classes, dtype=float),
where=A_obs_sum != 0,
)
# Fallback, if empty training data has been provided.
if self.n_features_in_ is None:
self.W_ = None
self.Alpha_ = A_norm
return self
# Remove samples without labels.
is_lbld = is_labeled(y, missing_label=-1).any(axis=-1)
X = X[is_lbld]
y = y[is_lbld]
n_samples = X.shape[0]
is_lbld = is_labeled(y, missing_label=-1)
# Insert bias, if 'fit_intercept' is set to 'True'.
n_weights = self.n_features_in_
if self.fit_intercept:
n_weights += 1
X = np.insert(X, 0, values=1, axis=1)
# Ensure sample weights to form a 2d array.
if sample_weight is None:
sample_weight = np.ones_like(y)
# Set initial weights.
self.W_ = np.zeros((n_weights, n_classes))
if n_samples == 0:
self.Alpha_ = A_norm
return self
# Initialize first expectation to infinity such that
# |current - new| < tol is False.
current_expectation = -np.inf
n_iter = 0
# Execute expectation maximization (EM) algorithm.
while n_iter < self.max_iter:
# -----------------------------E-STEP------------------------------
# Compute probabilistic predictions.
P = softmax(X @ self.W_, axis=1)
# Estimate latent ground truth labels.
if n_iter == 0:
# Initialization via majority voting.
Mu = compute_vote_vectors(
y=y,
classes=np.arange(n_classes),
missing_label=-1,
w=sample_weight,
)
Mu /= np.sum(Mu, axis=1, keepdims=True)
else:
# Use current model parameters to estimate ground truth labels.
U = np.ones((n_samples, n_classes))
for c in range(n_classes):
for k in range(n_classes):
y_is_k = y == k
U[:, c] *= np.prod(
self.Alpha_[:, c, k] ** y_is_k, axis=1
)
Mu = P * U
Mu_sum = Mu.sum(axis=1, keepdims=True)
Mu = np.divide(
Mu,
Mu_sum,
out=np.full_like(Mu, 1 / n_classes, dtype=float),
where=Mu_sum != 0,
)
# Compute expectation for current parameters and estimates
# of the ground truth labels.
prior_w = -0.5 * self.weights_prior * np.sum(self.W_**2)
prior_alpha = np.sum(
(A - 1) * np.log(self.Alpha_ + np.finfo(float).eps)
)
log_likelihood = np.sum(
Mu * np.log(P * U + np.finfo(float).eps)
)
new_expectation = log_likelihood + prior_w + prior_alpha
# Stop in the case of convergence.
if np.abs(new_expectation - current_expectation) < self.tol:
break
# Otherwise, update the current expectation value.
current_expectation = new_expectation
# -----------------------------M-STEP------------------------------
# Update the confusion matrices.
Alpha = np.zeros((self.n_annotators_, n_classes, n_classes))
for j in range(self.n_annotators_):
y_j = np.eye(n_classes)[y[is_lbld[:, j], j].astype(int)]
w_j = sample_weight[is_lbld[:, j], j].reshape(-1, 1)
Alpha[j] = (Mu[is_lbld[:, j]].T @ (w_j * y_j)) + A[j] - 1
Alpha_sum = Alpha.sum(axis=-1, keepdims=True)
self.Alpha_ = np.divide(
Alpha,
Alpha_sum,
out=np.full_like(Alpha, 1 / n_classes, dtype=float),
where=Alpha_sum != 0,
)
def error(w):
"""
Compute the cross-entropy loss (negative log-posterior) and
its gradient for a softmax-based classifier with L2
regularization.
This function calculates the error as the sum of the negative
log-likelihood (cross-entropy) of the predicted class
probabilities relative to the target distribution, and an L2
regularization penalty on the weights. It also computes the
gradient of this loss with respect to the weight vector, which
is returned in flattened form. This formulation is suitable for
use with optimization routines such as
`scipy.optimize.minimize`.
Parameters
----------
w : numpy.ndarray of shape (n_weights * n_classes,)
Flattened weight vector. It is reshaped to a
(n_weights, n_classes) weight matrix.
Returns
-------
loss : float
The computed cross-entropy error (negative log-posterior),
including the negative log-likelihood and the
L2 regularization penalty.
grad : numpy.ndarray of shape (n_features * n_classes,)
The gradient of the loss with respect to the weight vector,
flattened to a one-dimensional array
"""
# Reshape weights as matrix.
W = w.reshape((n_weights, n_classes))
# Compute probabilistic predictions.
logits = X.dot(W)
p = softmax(logits, axis=-1)
# Compute loss for probabilistic predictions.
loss = -np.sum(Mu * np.log(p + np.finfo(float).eps))
# Add L2 penalty to loss.
if self.fit_intercept:
loss += 0.5 * self.weights_prior * np.sum(W[1:] ** 2)
else:
loss += 0.5 * self.weights_prior * np.sum(W**2)
# Compute L2 part of the gradient.
if self.fit_intercept:
reg_grad = np.zeros_like(W)
reg_grad[1:, :] = self.weights_prior * W[1:, :]
else:
reg_grad = self.weights_prior * W
# Compute final gradient.
grad = X.T.dot(p - Mu) + reg_grad
return loss, grad.ravel()
def hessp(w, v):
"""
Compute the Hessian-vector product for the error function.
This function computes the product of the Hessian of the error
function with a given vector, which is useful for second-order
optimization routines (e.g. in scipy.optimize.minimize when
a Hessian-vector product is supplied via the 'hessp' argument).
The error function is defined via a softmax-based prediction
model with an added regularization term.
Parameters
----------
w : numpy.ndarray of shape (n_weights * n_classes,)
Flattened weight vector. It is reshaped to a
(n_weights, n_classes) weight matrix.
v : numpy.ndarray of shape (n_weights * n_classes,)
Flattened vector to be multiplied by the Hessian. It is
reshaped to a (n_weights, n_classes) matrix.
Returns
-------
Hv : numpy.ndarray of shape (n_weights * n_classes,)
The product of the Hessian (of the error function) with the
vector v, returned as a flat array.
"""
# Reshape weights as matrix.
W = w.reshape((n_weights, n_classes))
# Compute probabilistic predictions.
logits = X.dot(W)
probas = softmax(logits, axis=-1)
# Compute intermediate results.
V = v.reshape((n_weights, n_classes))
M = X.dot(V)
# For each sample X[i], compute:
# R[i,j] = p[i,j] * ( M[i,j] - sum_k p[i,k]*M[i,k] )
R = probas * (M - np.sum(probas * M, axis=1, keepdims=True))
if self.fit_intercept:
reg_Hv = np.zeros_like(V)
reg_Hv[1:, :] = self.weights_prior * V[1:, :]
else:
reg_Hv = self.weights_prior * V
Hv = X.T.dot(R) + reg_Hv
return Hv.ravel()
# Update weights of the logistic regression model.
with warnings.catch_warnings():
warning_msg = ".* does not use Hessian.* information.*"
warnings.filterwarnings("ignore", message=warning_msg)
warning_msg = ".* does not use gradient information.*"
warnings.filterwarnings("ignore", message=warning_msg)
res = minimize(
error,
x0=np.zeros((n_weights * n_classes)),
method=self.solver,
tol=self.tol,
jac=True,
hessp=hessp,
options=solver_dict,
)
self.W_ = res.x.reshape((n_weights, n_classes))
# Continue with next iteration.
n_iter += 1
return self
[docs] def predict_proba(self, X):
"""Return probability estimates for the test data `X`.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples.
Returns
-------
P : numpy.ndarray of shape (n_samples, classes)
The class probabilities of the test samples. Classes are ordered
according to the attribute `self.classes_`.
"""
# Check test samples.
check_is_fitted(self)
X = check_array(X)
check_n_features(self, X, reset=False)
# Check whether a bias feature is missing.
if self.fit_intercept:
X = np.insert(X, 0, values=1, axis=1)
# Compute and normalize probabilities.
if self.W_ is not None:
P = softmax(X @ self.W_, axis=1)
else:
return np.full(
(len(X), len(self.classes_)), fill_value=1 / len(self.classes_)
)
return P
[docs] def predict_annotator_perf(self, X):
"""Calculates the probability that an annotator provides the true label
for a given sample. The true label is hereby provided by the
classification model. The label provided by an annotator `l` is based
on their confusion matrix (i.e., attribute `self.Alpha_[l]`).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples.
Returns
-------
P_annot : numpy.ndarray of shape (n_samples, classes)
`P_annot[i,l]` is the probability, that annotator l provides the
correct class label for sample `X[i]`.
"""
# Compute class probabilities.
P = self.predict_proba(X)
# Get correctness probabilities for each annotator per class.
diag_Alpha = np.array(
[np.diagonal(self.Alpha_[j]) for j in range(self.Alpha_.shape[0])]
)
# Compute correctness probabilities for each annotator per sample.
P_annot = P @ diag_Alpha.T
return P_annot