"""
Logistic Regression for Multiple Annotators
"""
# Author: Marek Herde <marek.herde@uni-kassel.de>
# Timo Sturm <timo.sturm@student.uni-kassel.de>
import warnings
import numpy as np
from collections.abc import Sequence
from scipy.optimize import minimize
from scipy.special import softmax
from sklearn.utils.validation import (
check_array,
check_is_fitted,
column_or_1d,
)
from ...base import SkactivemlClassifier
from ...utils import (
MISSING_LABEL,
compute_vote_vectors,
is_labeled,
check_scalar,
check_n_features,
)
[docs]
class AnnotatorLogisticRegression(SkactivemlClassifier):
"""Logistic Regression for Crowds
Logistic Regression based on Raykar [1]_ is a classification algorithm
that learns from multiple annotators. Besides, building a model for the
classification task, the algorithm estimates the performance of the
annotators. The performance of an annotator is assumed to only depend on
the true label of a sample and not on the sample itself. Each annotator is
assigned a confusion matrix, where each row is normalized. This contains
the bias of the annotators decisions. These estimated biases are then
used to refine the classifier itself.
The classifier also supports a bayesian view on the problem, for this a
prior distribution over an annotator's confusion matrix is assumed. It also
assumes a prior distribution over the classifiers' weight vectors
corresponding to a regularization.
Parameters
----------
tol : float, default=0.0001
Threshold for stopping the EM-Algorithm and the optimization of
the logistic regression weights. If the change of the respective value
between two steps is smaller than `tol`, the respective algorithm
stops.
max_iter : int, default=100
The maximum number of iterations of the EM-algorithm to be performed.
fit_intercept : bool, default=True
Specifies if a constant (a.k.a. bias or intercept) should be
added to input samples.
annot_prior_full : int or float or array-like, default=1
Determines `A` as the Dirichlet prior for each annotator `l`
(i.e., `A[l] = annot_prior_full * np.ones(n_classes, n_classes)` for
numeric or `A[l] = annot_prior_full[l] * np.ones(n_classes, n_classes)`
for array-like parameter). `A[l,i,j]` is the estimated number of times.
annotator `l` has provided label `j` for a sample of true label `i`.
annot_prior_diag : int or float or array-like, default=0
Adds a value to the diagonal of `A[l]` being the Dirichlet
prior for annotator `l` (i.e., `A[l] += annot_prior_diag *
np.eye(n_classes)` for numeric or `A[l] += annot_prior_diag[l] *
np.ones(n_classes)` for array-like parameter). `A[l,i,j]` is the
estimated number of times annotator `l` has provided label `j` for
a sample of true label `i`.
weights_prior : int or float, default=1
Determines Gamma as the inverse covariance matrix of the
prior distribution for every weight vector
(i.e., `Gamma=weights_prior * np.eye(n_features)`).
As default, the identity matrix is used for each weight vector.
solver : str or callable, default='L-BFGS-B'
Type of solver. Should be 'Nelder-Mead', 'Powell', 'CG',
'BFGS', 'Newton-CG', 'L-BFGS-B', 'TNC', 'COBYLA', 'SLSQP',
'trust-constr', 'dogleg', 'trust-ncg', 'trust-exact', 'trust-krylov',
or custom - a callable object. See `scipy.optimize.minimize` for more
information.
solver_dict : dictionary, default=None
Additional solver options passed to scipy.optimize.minimize. If `None`,
{'maxiter': 100} is passed.
classes : array-like of shape (n_classes), default=None
Holds the label for each class. If none, the classes are determined
during the fit.
missing_label : scalar or string or np.nan or None, default=np.nan
Value to represent a missing label.
cost_matrix : array-like of shape (n_classes, n_classes)
Cost matrix with `cost_matrix[i,j]` indicating cost of predicting class
`classes[j]` for a sample of class `classes[i]`. Can be only set, if
`classes is not None`.
random_state : int or RandomState instance or None, default=None
Determines random number for `predict` method. Pass an int for
reproducible results across multiple method calls.
Attributes
----------
n_annotators_ : int
Number of annotators.
W_ : numpy.ndarray of shape (n_features, n_classes)
The weight vectors of the logistic regression model.
Alpha_ : numpy.ndarray of shape (n_annotators, n_classes, n_classes)
This is a confusion matrix for each annotator, where each
row is normalized. `Alpha_[l,k,c]` describes the probability
that annotator `l` provides the class label `c` for a sample belonging
to class `k`.
classes_ : array-like of shape (n_classes,)
Holds the label for each class after fitting.
cost_matrix_ : array-like of shape (classes, classes)
Cost matrix with `C[i,j]` indicating cost of predicting class
`self.classes_[j]` for a sample of class `classes_[i]`.
References
----------
.. [1] V. C. Raykar, S. Yu, L. H. Zhao, G. H. Valadez, C. Florin, L.
Bogoni, and L. Moy. Learning from Crowds. J. Mach. Learn. Res.,
11(4):1297–1322, 2010.
"""
_ALLOWED_EXTRA_OUTPUTS = {
"logits",
"annotator_perf",
"annotator_class",
}
def __init__(
self,
n_annotators=None,
tol=0.0001,
max_iter=100,
fit_intercept=True,
annot_prior_full=1,
annot_prior_diag=0,
weights_prior=1,
solver="Newton-CG",
solver_dict=None,
classes=None,
cost_matrix=None,
missing_label=MISSING_LABEL,
random_state=None,
):
super().__init__(
classes=classes,
missing_label=missing_label,
cost_matrix=cost_matrix,
random_state=random_state,
)
self.n_annotators = n_annotators
self.tol = tol
self.max_iter = max_iter
self.fit_intercept = fit_intercept
self.annot_prior_full = annot_prior_full
self.annot_prior_diag = annot_prior_diag
self.weights_prior = weights_prior
self.solver = solver
self.solver_dict = solver_dict
[docs]
def fit(self, X, y, sample_weight=None):
"""Fit the model using `X` as samples and `y` as class labels.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Feature matrix representing the samples.
y : array-like of shape (n_samples, n_annotators)
It contains the class labels of the training samples, where
missing labels are represented via `missing_label`.
Specifically, label `y[n, m]` refers to the label of sample
`X[n]` from annotator `m`.
sample_weight : array-like of shape (n_samples, n_annotators)
It contains the weights of the training samples' class labels.
It must have the same shape as `y`. Accordingly, the sample
weights are only used for the initialization of the majority vote
and the computation of the confusion matrix. It is not supported
for the update of logistic regression weights and the expectation
computation.
Returns
-------
self: AnnotatorLogisticRegression,
The `AnnotatorLogisticRegression` is fitted on the training data.
"""
# Check input data.
X, y, sample_weight = self._validate_data(
X=X, y=y, sample_weight=sample_weight, y_ensure_1d=False
)
# Ensure value of 'tol' to be a positive integer.
if self.n_annotators is not None:
check_scalar(
self.n_annotators,
"n_annotators",
min_val=1,
min_inclusive=True,
target_type=int,
)
# Ensure value of 'tol' to be positive.
check_scalar(
self.tol,
"tol",
min_val=0,
min_inclusive=False,
target_type=(int, float),
)
# Ensure value of 'max_iter' to be positive.
check_scalar(
self.max_iter,
"max_iter",
min_val=1,
min_inclusive=True,
target_type=int,
)
# Ensure value of 'fit_intercept' to be boolean.
check_scalar(self.fit_intercept, "fit_intercept", target_type=bool)
# Ensure value of 'solver_dict' to be a dictionary.
solver_dict = (
{"maxiter": 100} if self.solver_dict is None else self.solver_dict
)
check_scalar(solver_dict, "solver_dict", target_type=dict)
# Ensure value of 'weights_prior' to be non-negative.
check_scalar(
self.weights_prior,
"weights_prior",
min_val=0,
min_inclusive=True,
target_type=(int, float),
)
# Set auxiliary variables.
n_classes = len(self.classes_)
if self.n_features_in_ is not None:
if len(y.shape) != 2:
raise ValueError(
"`y` must be an array-like of shape "
"`(n_samples, n_annotators)`."
)
self.n_annotators_ = y.shape[1]
else:
if self.n_annotators is None:
raise ValueError(
"`y` cannot be empty, if `n_annotators` is None."
)
self.n_annotators_ = self.n_annotators
# Check consistent number of annotators.
if (
self.n_annotators is not None
and self.n_annotators != self.n_annotators_
):
raise ValueError(
f"`n_annotators={self.n_annotators}` does not match "
f"{self.n_annotators_} as the number of columns in `y`."
)
# Check input 'annot_prior_full' and 'annot_prior_diag'.
annot_prior = []
for name, prior in [
("annot_prior_full", self.annot_prior_full),
("annot_prior_diag", self.annot_prior_diag),
]:
if isinstance(prior, (int, float)):
prior_array = np.ones(self.n_annotators_) * prior
else:
prior_array = column_or_1d(prior)
if name == "annot_prior_full":
is_invalid_prior = np.sum(prior_array <= 0)
else:
is_invalid_prior = np.sum(prior_array < 0)
if len(prior_array) != self.n_annotators_ or is_invalid_prior:
raise ValueError(
f"'{name}' must be either 'int', 'float' or "
f"array-like with positive values and shape "
f"(n_annotators), got {prior}"
)
annot_prior.append(prior_array)
# Set up prior matrix for each annotator.
A = np.ones((self.n_annotators_, n_classes, n_classes))
for a in range(self.n_annotators_):
A[a] *= annot_prior[0][a]
A[a] += np.eye(n_classes) * annot_prior[1][a]
A_obs = A - 1
A_obs_sum = np.sum(A_obs, axis=-1, keepdims=True)
A_norm = np.divide(
A_obs,
A_obs_sum,
out=np.full_like(A_obs, 1 / n_classes, dtype=float),
where=A_obs_sum != 0,
)
# Fallback, if empty training data has been provided.
if self.n_features_in_ is None:
self.W_ = None
self.Alpha_ = A_norm
return self
# Remove samples without labels.
is_lbld = is_labeled(y, missing_label=-1).any(axis=-1)
X = X[is_lbld]
y = y[is_lbld]
n_samples = X.shape[0]
is_lbld = is_labeled(y, missing_label=-1)
# Insert bias, if 'fit_intercept' is set to 'True'.
n_weights = self.n_features_in_
if self.fit_intercept:
n_weights += 1
X = np.insert(X, 0, values=1, axis=1)
# Ensure sample weights to form a 2d array.
if sample_weight is None:
sample_weight = np.ones_like(y)
# Set initial weights.
self.W_ = np.zeros((n_weights, n_classes))
if n_samples == 0:
self.Alpha_ = A_norm
return self
# Initialize first expectation to infinity such that
# |current - new| < tol is False.
current_expectation = -np.inf
n_iter = 0
# Execute expectation maximization (EM) algorithm.
while n_iter < self.max_iter:
# -----------------------------E-STEP------------------------------
# Compute probabilistic predictions.
P = softmax(X @ self.W_, axis=1)
# Estimate latent ground truth labels.
if n_iter == 0:
# Initialization via majority voting.
Mu = compute_vote_vectors(
y=y,
classes=np.arange(n_classes),
missing_label=-1,
w=sample_weight,
)
Mu /= np.sum(Mu, axis=1, keepdims=True)
else:
# Use current model parameters to estimate ground truth labels.
U = np.ones((n_samples, n_classes))
for c in range(n_classes):
for k in range(n_classes):
y_is_k = y == k
U[:, c] *= np.prod(
self.Alpha_[:, c, k] ** y_is_k, axis=1
)
Mu = P * U
Mu_sum = Mu.sum(axis=1, keepdims=True)
Mu = np.divide(
Mu,
Mu_sum,
out=np.full_like(Mu, 1 / n_classes, dtype=float),
where=Mu_sum != 0,
)
# Compute expectation for current parameters and estimates
# of the ground truth labels.
prior_w = -0.5 * self.weights_prior * np.sum(self.W_**2)
prior_alpha = np.sum(
(A - 1) * np.log(self.Alpha_ + np.finfo(float).eps)
)
log_likelihood = np.sum(
Mu * np.log(P * U + np.finfo(float).eps)
)
new_expectation = log_likelihood + prior_w + prior_alpha
# Stop in the case of convergence.
if np.abs(new_expectation - current_expectation) < self.tol:
break
# Otherwise, update the current expectation value.
current_expectation = new_expectation
# -----------------------------M-STEP------------------------------
# Update the confusion matrices.
Alpha = np.zeros((self.n_annotators_, n_classes, n_classes))
for j in range(self.n_annotators_):
y_j = np.eye(n_classes)[y[is_lbld[:, j], j].astype(int)]
w_j = sample_weight[is_lbld[:, j], j].reshape(-1, 1)
Alpha[j] = (Mu[is_lbld[:, j]].T @ (w_j * y_j)) + A[j] - 1
Alpha_sum = Alpha.sum(axis=-1, keepdims=True)
self.Alpha_ = np.divide(
Alpha,
Alpha_sum,
out=np.full_like(Alpha, 1 / n_classes, dtype=float),
where=Alpha_sum != 0,
)
def error(w):
"""
Compute the cross-entropy loss (negative log-posterior) and
its gradient for a softmax-based classifier with L2
regularization.
This function calculates the error as the sum of the negative
log-likelihood (cross-entropy) of the predicted class
probabilities relative to the target distribution, and an L2
regularization penalty on the weights. It also computes the
gradient of this loss with respect to the weight vector, which
is returned in flattened form. This formulation is suitable for
use with optimization routines such as
`scipy.optimize.minimize`.
Parameters
----------
w : numpy.ndarray of shape (n_weights * n_classes,)
Flattened weight vector. It is reshaped to a
(n_weights, n_classes) weight matrix.
Returns
-------
loss : float
The computed cross-entropy error (negative log-posterior),
including the negative log-likelihood and the
L2 regularization penalty.
grad : numpy.ndarray of shape (n_features * n_classes,)
The gradient of the loss with respect to the weight vector,
flattened to a one-dimensional array
"""
# Reshape weights as matrix.
W = w.reshape((n_weights, n_classes))
# Compute probabilistic predictions.
logits = X.dot(W)
p = softmax(logits, axis=-1)
# Compute loss for probabilistic predictions.
loss = -np.sum(Mu * np.log(p + np.finfo(float).eps))
# Add L2 penalty to loss.
if self.fit_intercept:
loss += 0.5 * self.weights_prior * np.sum(W[1:] ** 2)
else:
loss += 0.5 * self.weights_prior * np.sum(W**2)
# Compute L2 part of the gradient.
if self.fit_intercept:
reg_grad = np.zeros_like(W)
reg_grad[1:, :] = self.weights_prior * W[1:, :]
else:
reg_grad = self.weights_prior * W
# Compute final gradient.
grad = X.T.dot(p - Mu) + reg_grad
return loss, grad.ravel()
def hessp(w, v):
"""
Compute the Hessian-vector product for the error function.
This function computes the product of the Hessian of the error
function with a given vector, which is useful for second-order
optimization routines (e.g. in scipy.optimize.minimize when
a Hessian-vector product is supplied via the 'hessp' argument).
The error function is defined via a softmax-based prediction
model with an added regularization term.
Parameters
----------
w : numpy.ndarray of shape (n_weights * n_classes,)
Flattened weight vector. It is reshaped to a
(n_weights, n_classes) weight matrix.
v : numpy.ndarray of shape (n_weights * n_classes,)
Flattened vector to be multiplied by the Hessian. It is
reshaped to a (n_weights, n_classes) matrix.
Returns
-------
Hv : numpy.ndarray of shape (n_weights * n_classes,)
The product of the Hessian (of the error function) with the
vector v, returned as a flat array.
"""
# Reshape weights as matrix.
W = w.reshape((n_weights, n_classes))
# Compute probabilistic predictions.
logits = X.dot(W)
probas = softmax(logits, axis=-1)
# Compute intermediate results.
V = v.reshape((n_weights, n_classes))
M = X.dot(V)
# For each sample X[i], compute:
# R[i,j] = p[i,j] * ( M[i,j] - sum_k p[i,k]*M[i,k] )
R = probas * (M - np.sum(probas * M, axis=1, keepdims=True))
if self.fit_intercept:
reg_Hv = np.zeros_like(V)
reg_Hv[1:, :] = self.weights_prior * V[1:, :]
else:
reg_Hv = self.weights_prior * V
Hv = X.T.dot(R) + reg_Hv
return Hv.ravel()
# Update weights of the logistic regression model.
with warnings.catch_warnings():
warning_msg = ".* does not use Hessian.* information.*"
warnings.filterwarnings("ignore", message=warning_msg)
warning_msg = ".* does not use gradient information.*"
warnings.filterwarnings("ignore", message=warning_msg)
res = minimize(
error,
x0=np.zeros((n_weights * n_classes)),
method=self.solver,
tol=self.tol,
jac=True,
hessp=hessp,
options=solver_dict,
)
self.W_ = res.x.reshape((n_weights, n_classes))
# Continue with next iteration.
n_iter += 1
return self
[docs]
def predict(
self,
X,
extra_outputs=None,
):
"""Return class predictions for the test samples `X`.
By default, this method returns only the class predictions
`y_pred`. If `extra_outputs` is provided, a tuple is returned whose
first element is `y_pred` and whose remaining elements are the
requested additional forward outputs, in the order specified by
`extra_outputs`.
Parameters
----------
X : array-like of shape (n_samples, ...)
Test samples.
extra_outputs : None or str or sequence of str, default=None
Names of additional outputs to return next to `y_pred`. The names
must be a subset of the following keys:
- "logits" : Additionally return the class-membership logits
`L_class` for the samples in `X`.
- "annotator_perf" : additionally return the estimated
annotator performance probabilities `P_perf` for each
sample–annotator pair.
- "annotator_class" : Additionally return the annotator–class
probability estimates `P_annot` for each sample, class, and
annotator.
Returns
-------
y_pred : numpy.ndarray of shape (n_samples,)
Class predictions of the test samples.
*extras : numpy.ndarray, optional
Only returned if `extra_outputs` is not `None`. In that
case, the method returns a tuple whose first element is `P`
and whose remaining elements correspond to the requested
forward outputs in the order given by `extra_outputs`.
Potential outputs are:
- `L_class` : `np.ndarray` of shape `(n_samples, n_classes)`,
where `L_class[n, c]` is the logit for the class
`classes_[c]` of sample `X[n]`.
- `P_perf` : `np.ndarray` of shape `(n_samples, n_annotators)`,
where `P_perf[n, m]` refers to the estimated label
correctness probability (performance) of annotator `m` when
labeling sample `X[n]`.
- `P_annot` : `np.ndarray` of shape
`(n_samples, n_annotators, n_classes)`, where
`P_annot[n, m, c]` refers to the probability that annotator
`m` provides the class label `c` for sample `X[n]`.
"""
return SkactivemlClassifier.predict(
self,
X=X,
extra_outputs=extra_outputs,
)
[docs]
def predict_proba(
self,
X,
extra_outputs=None,
):
"""Return class probability estimates for the test samples `X`.
By default, this method returns only the class probabilities `P`.
If `extra_outputs` is provided, a tuple is returned whose first
element is `P` and whose remaining elements are the requested
additional forward outputs, in the order specified by
`extra_outputs`.
Parameters
----------
X : array-like of shape (n_samples, ...)
Test samples.
extra_outputs : None or str or sequence of str, default=None
Names of additional outputs to return next to `P`. The names
must be a subset of the following keys:
- "logits" : Additionally return the class-membership logits
`L_class` for the samples in `X`.
- "annotator_perf" : additionally return the estimated
annotator performance probabilities `P_perf` for each
sample–annotator pair.
- "annotator_class" : Additionally return the annotator–class
probability estimates `P_annot` for each sample, class, and
annotator.
Returns
-------
P : numpy.ndarray of shape (n_samples, n_classes)
Class probabilities of the test samples. Classes are ordered
according to `self.classes_`.
*extras : numpy.ndarray, optional
Only returned if `extra_outputs` is not `None`. In that
case, the method returns a tuple whose first element is `P`
and whose remaining elements correspond to the requested
forward outputs in the order given by `extra_outputs`.
Potential outputs are:
- `L_class` : `np.ndarray` of shape `(n_samples, n_classes)`,
where `L_class[n, c]` is the logit for the class
`classes_[c]` of sample `X[n]`.
- `P_perf` : `np.ndarray` of shape `(n_samples, n_annotators)`,
where `P_perf[n, m]` refers to the estimated label
correctness probability (performance) of annotator `m` when
labeling sample `X[n]`.
- `P_annot` : `np.ndarray` of shape
`(n_samples, n_annotators, n_classes)`, where
`P_annot[n, m, c]` refers to the probability that annotator
`m` provides the class label `c` for sample `X[n]`.
Only returned, if `return_annotator_class=True`.
"""
# Check extra outputs format.
if extra_outputs is None:
extra_outputs = []
elif isinstance(extra_outputs, str):
extra_outputs = [extra_outputs]
elif isinstance(extra_outputs, Sequence) and not isinstance(
extra_outputs, bytes
):
extra_outputs = list(extra_outputs)
else:
raise TypeError(
"`extra_outputs` must be None, a string, or a sequence "
f"of strings, got {type(extra_outputs)}."
)
if len(set(extra_outputs)) != len(extra_outputs):
raise ValueError(
"`extra_outputs` must not contain duplicate names."
)
unknown = [
n for n in extra_outputs if n not in self._ALLOWED_EXTRA_OUTPUTS
]
if unknown:
raise ValueError(
f"Requested extra output(s) {unknown!r} are not defined; "
f"allowed names are {sorted(self._ALLOWED_EXTRA_OUTPUTS)!r}."
)
# Check test samples.
check_is_fitted(self)
X = check_array(X)
check_n_features(self, X, reset=False)
# Check whether a bias feature is missing.
if self.fit_intercept:
X = np.insert(X, 0, values=1, axis=1)
# Compute logits and normalize to probabilities.
if self.W_ is None:
L_class = np.zeros((len(X), len(self.classes_)))
else:
L_class = X @ self.W_
P_class = softmax(L_class, axis=1)
# Return only class probabilities as primary output.
if not extra_outputs:
return P_class
# Return request extra outputs next to the class probabilities.
out = [P_class]
for extra in extra_outputs:
if extra == "logits":
out.append(L_class)
if extra == "annotator_perf":
diag_alpha = np.diagonal(self.Alpha_, axis1=1, axis2=2)
P_perf = np.einsum("ik,lk->il", P_class, diag_alpha)
out.append(P_perf)
if extra == "annotator_class":
P_annot = np.einsum("ik,lkc->ilc", P_class, self.Alpha_)
out.append(P_annot)
return tuple(out)