"""
Active Learning with Cost Embedding (CostEmbeddingAL)
This module is modified from
https://github.com/ntucllab/libact/blob/master/libact.
Copyright (c) 2014, National Taiwan University
"""
import warnings
import numpy as np
from joblib import Parallel, delayed
from sklearn import clone
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import euclidean_distances
from sklearn.neighbors import NearestNeighbors
from sklearn.svm import SVR
from sklearn.utils import check_array, check_symmetric
from ..base import SingleAnnotatorPoolQueryStrategy
from ..utils import (
simple_batch,
check_classifier_params,
MISSING_LABEL,
check_scalar,
check_random_state,
check_X_y,
is_labeled,
ExtLabelEncoder,
)
[docs]class CostEmbeddingAL(SingleAnnotatorPoolQueryStrategy):
"""Active Learning with Cost Embedding (ALCE).
Cost sensitive multi-class algorithm.
Assume each class has at least one sample in the labeled pool.
This implementation is based on libact.
Parameters
----------
classes: array-like of shape(n_classes,)
base_regressor : sklearn regressor, optional (default=None)
cost_matrix: array-like of shape (n_classes, n_classes),
optional (default=None)
Cost matrix with `cost_matrix[i,j]` defining the cost of predicting
class j for a sample with the actual class i. Only supported for least
confident variant.
missing_label: str or numeric, optional (default=MISSING_LABEL)
Specifies the symbol that represents a missing label.
random_state : int or np.random.RandomState, optional
(default=None)
Random state for annotator selection.
embed_dim : int, optional (default=None)
If is None, `embed_dim = n_classes`.
mds_params : dict, optional (default=None)
For further information, see
https://scikit-learn.org/stable/modules/generated/sklearn.manifold.MDS.html
nn_params : dict, optional (default=None)
For further information, see
https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.NearestNeighbors.html
References
----------
[1] Kuan-Hao, and Hsuan-Tien Lin. "A Novel Uncertainty Sampling Algorithm
for Cost-sensitive Multiclass Active Learning", In Proceedings of the
IEEE International Conference on Data Mining (ICDM), 2016
"""
def __init__(
self,
classes,
base_regressor=None,
cost_matrix=None,
embed_dim=None,
mds_params=None,
nn_params=None,
missing_label=MISSING_LABEL,
random_state=None,
):
super().__init__(
missing_label=missing_label, random_state=random_state
)
self.classes = classes
self.base_regressor = base_regressor
self.cost_matrix = cost_matrix
self.embed_dim = embed_dim
self.missing_label = missing_label
self.random_state = random_state
self.mds_params = mds_params
self.nn_params = nn_params
[docs] def query(
self,
X,
y,
sample_weight=None,
candidates=None,
batch_size=1,
return_utilities=False,
):
"""Query the next instance to be labeled.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data set, usually complete, i.e., including the labeled
and unlabeled samples.
y : array-like of shape (n_samples,)
Labels of the training data set (possibly including unlabeled ones
indicated by self.MISSING_LABEL).
sample_weight: array-like of shape (n_samples,), optional
(default=None)
Weights of training samples in `X`.
candidates : None or array-like of shape (n_candidates), dtype=int or
array-like of shape (n_candidates, n_features),
optional (default=None)
If candidates is None, the unlabeled samples from (X,y) are
considered as candidates.
If candidates is of shape (n_candidates) and of type int,
candidates is considered as the indices of the samples in (X,y).
If candidates is of shape (n_candidates, n_features), the
candidates are directly given in candidates (not necessarily
contained in X). This is not supported by all query strategies.
batch_size : int, optional (default=1)
The number of samples to be selected in one AL cycle.
return_utilities : bool, optional (default=False)
If True, also return the utilities based on the query strategy.
Returns
-------
query_indices : numpy.ndarray of shape (batch_size,)
The query_indices indicate for which candidate sample a label is
to queried, e.g., `query_indices[0]` indicates the first selected
sample.
If candidates is None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
utilities : numpy.ndarray of shape (batch_size, n_samples) or
numpy.ndarray of shape (batch_size, n_candidates)
The utilities of samples after each selected sample of the batch,
e.g., `utilities[0]` indicates the utilities used for selecting
the first sample (with index `query_indices[0]`) of the batch.
Utilities for labeled samples will be set to np.nan.
If candidates is None or of shape (n_candidates), the indexing
refers to samples in X.
If candidates is of shape (n_candidates, n_features), the indexing
refers to samples in candidates.
"""
# Check standard parameters.
(
X,
y,
candidates,
batch_size,
return_utilities,
) = super()._validate_data(
X=X,
y=y,
candidates=candidates,
batch_size=batch_size,
return_utilities=return_utilities,
reset=True,
)
# Obtain candidates plus mapping.
X_cand, mapping = self._transform_candidates(candidates, X, y)
util_cand = _alce(
X_cand,
X,
y,
self.base_regressor,
self.cost_matrix,
self.classes,
self.embed_dim,
sample_weight,
self.missing_label,
self.random_state_,
self.mds_params,
self.nn_params,
)
if mapping is None:
utilities = util_cand
else:
utilities = np.full(len(X), np.nan)
utilities[mapping] = util_cand
return simple_batch(
utilities,
self.random_state_,
batch_size=batch_size,
return_utilities=return_utilities,
)
def _alce(
X_cand,
X,
y,
base_regressor,
cost_matrix,
classes,
embed_dim,
sample_weight,
missing_label,
random_state,
mds_params,
nn_params,
):
"""Compute the alce score for the candidate instances.
Parameters
----------
X_cand: array-like, shape (n_candidates, n_features)
Unlabeled candidate samples.
X: array-like, shape (n_samples, n_features)
Complete data set.
y: array-like, shape (n_samples)
Labels of the data set.
base_regressor: RegressorMixin
Regressor used for the embedding.
cost_matrix: array-like, shape (n_classes, n_classes)
Cost matrix with cost_matrix[i,j] defining the cost of predicting class
j for a sample with the true class i.
classes: array-like, shape (n_classes)
Array of class labels.
embed_dim: int
Dimension of the embedding.
sample_weight : array-like, shape (n_samples)
Weights for uncertain annotators.
missing_label : scalar | string | np.nan | None
Value to represent a missing label.
random_state : int | np.random.RandomState
Random state for annotator selection.
mds_params : dict
For further information, see
https://scikit-learn.org/stable/modules/generated/sklearn.manifold.MDS.html
nn_params : dict
For further information, see
https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.NearestNeighbors.html
Returns
-------
utilities: np.ndarray, shape (n_candidates)
The utilities of all candidate instances.
"""
# Check base regressor
if base_regressor is None:
base_regressor = SVR()
if not isinstance(base_regressor, RegressorMixin):
raise TypeError("'base_regressor' must be an sklearn regressor")
check_classifier_params(classes, missing_label, cost_matrix)
if cost_matrix is None:
cost_matrix = 1 - np.eye(len(classes))
if np.count_nonzero(cost_matrix) == 0:
raise ValueError(
"The cost matrix must contain at least one positive " "number."
)
# Check the given data
X, y, X_cand, sample_weight, sample_weight_cand = check_X_y(
X,
y,
X_cand,
sample_weight,
force_all_finite=False,
missing_label=missing_label,
)
labeled = is_labeled(y, missing_label=missing_label)
y = ExtLabelEncoder(classes, missing_label).fit_transform(y)
X = X[labeled]
y = y[labeled].astype(int)
sample_weight = sample_weight[labeled]
# If all samples are unlabeled, the strategy randomly selects an instance
if len(X) == 0:
warnings.warn(
"There are no labeled instances. The strategy selects "
"one random instance."
)
return np.ones(len(X_cand))
# Check embedding dimension
embed_dim = len(classes) if embed_dim is None else embed_dim
check_scalar(embed_dim, "embed_dim", int, min_val=1)
# Update mds parameters
mds_params_default = {
"metric": False,
"n_components": embed_dim,
"n_uq": len(classes),
"max_iter": 300,
"eps": 1e-6,
"dissimilarity": "precomputed",
"n_init": 8,
"n_jobs": 1,
"random_state": random_state,
}
if mds_params is not None:
if type(mds_params) is not dict:
raise TypeError("'mds_params' must be a dictionary or None")
mds_params_default.update(mds_params)
mds_params = mds_params_default
# Update nearest neighbor parameters
nn_params = {} if nn_params is None else nn_params
if type(nn_params) is not dict:
raise TypeError("'nn_params' must be a dictionary or None")
regressors = [clone(base_regressor) for _ in range(embed_dim)]
n_classes = len(classes)
dissimilarities = np.zeros((2 * n_classes, 2 * n_classes))
dissimilarities[:n_classes, n_classes:] = cost_matrix
dissimilarities[n_classes:, :n_classes] = cost_matrix.T
W = np.zeros((2 * n_classes, 2 * n_classes))
W[:n_classes, n_classes:] = 1
W[n_classes:, :n_classes] = 1
mds = MDSP(**mds_params)
embedding = mds.fit(dissimilarities).embedding_
class_embed = embedding[:n_classes, :]
nn = NearestNeighbors(n_neighbors=1, **nn_params)
nn.fit(embedding[n_classes:, :])
pred_embed = np.zeros((len(X_cand), embed_dim))
for i in range(embed_dim):
regressors[i].fit(X, class_embed[y, i], sample_weight)
pred_embed[:, i] = regressors[i].predict(X_cand)
dist, _ = nn.kneighbors(pred_embed)
utilities = dist[:, 0]
return utilities
"""
Multi-dimensional Scaling Partial (MDSP)
This module is modified from
https://github.com/scikit-learn/scikit-learn/blob/14031f6/sklearn/manifold/mds.py
by Kuan-Hao Huang.
"""
# author: Nelle Varoquaux <nelle.varoquaux@gmail.com>
# Licence: BSD
def _smacof_single_p(
similarities,
n_uq,
metric=True,
n_components=2,
init=None,
max_iter=300,
verbose=0,
eps=1e-3,
random_state=None,
):
"""
Computes multidimensional scaling using SMACOF algorithm.
Parameters
----------
n_uq
similarities: symmetric ndarray, shape [n * n]
similarities between the points
metric: boolean, optional, default: True
compute metric or nonmetric SMACOF algorithm
n_components: int, optional, default: 2
number of dimension in which to immerse the similarities
overwritten if initial array is provided.
init: {None or ndarray}, optional
if None, randomly chooses the initial configuration
if ndarray, initialize the SMACOF algorithm with this array
max_iter: int, optional, default: 300
Maximum number of iterations of the SMACOF algorithm for a single run
verbose: int, optional, default: 0
level of verbosity
eps: float, optional, default: 1e-6
relative tolerance w.r.t stress to declare converge
random_state: integer or numpy.RandomState, optional
The generator used to initialize the centers. If an integer is
given, it fixes the seed. Defaults to the global numpy random
number generator.
Returns
-------
X: ndarray (n_samples, n_components), float
coordinates of the n_samples points in a n_components-space
stress_: float
The final value of the stress (sum of squared distance of the
disparities and the distances for all constrained points)
n_iter : int
Number of iterations run.
"""
similarities = check_symmetric(similarities, raise_exception=True)
n_samples = similarities.shape[0]
random_state = check_random_state(random_state)
W = np.ones((n_samples, n_samples))
W[:n_uq, :n_uq] = 0.0
W[n_uq:, n_uq:] = 0.0
V = -W
V[np.arange(len(V)), np.arange(len(V))] = W.sum(axis=1)
e = np.ones((n_samples, 1))
Vp = (
np.linalg.inv(V + np.dot(e, e.T) / n_samples)
- np.dot(e, e.T) / n_samples
)
sim_flat = similarities.ravel()
sim_flat_w = sim_flat[sim_flat != 0]
if init is None:
# Randomly choose initial configuration
X = random_state.rand(n_samples * n_components)
X = X.reshape((n_samples, n_components))
else:
# overrides the parameter p
n_components = init.shape[1]
if n_samples != init.shape[0]:
raise ValueError(
"init matrix should be of shape (%d, %d)"
% (n_samples, n_components)
)
X = init
old_stress = None
ir = IsotonicRegression()
for it in range(max_iter):
# Compute distance and monotonic regression
dis = euclidean_distances(X)
if metric:
disparities = similarities
else:
dis_flat = dis.ravel()
# similarities with 0 are considered as missing values
dis_flat_w = dis_flat[sim_flat != 0]
# Compute the disparities using a monotonic regression
disparities_flat = ir.fit_transform(sim_flat_w, dis_flat_w)
disparities = dis_flat.copy()
disparities[sim_flat != 0] = disparities_flat
disparities = disparities.reshape((n_samples, n_samples))
disparities *= np.sqrt(
(n_samples * (n_samples - 1) / 2) / (disparities**2).sum()
)
disparities[similarities == 0] = 0
# Compute stress
_stress = (
W.ravel() * ((dis.ravel() - disparities.ravel()) ** 2)
).sum()
_stress /= 2
# Update X using the Guttman transform
dis[dis == 0] = 1e-5
ratio = disparities / dis
_B = -W * ratio
_B[np.arange(len(_B)), np.arange(len(_B))] += (W * ratio).sum(axis=1)
X = np.dot(Vp, np.dot(_B, X))
dis = np.sqrt((X**2).sum(axis=1)).sum()
if verbose >= 2:
print("it: %d, stress %s" % (it, _stress))
if old_stress is not None:
if (old_stress - _stress / dis) < eps:
if verbose:
print(f"breaking at iteration {it} with stress {_stress}")
break
old_stress = _stress / dis
return X, _stress, it + 1
def smacof_p(
similarities,
n_uq,
metric=True,
n_components=2,
init=None,
n_init=8,
n_jobs=1,
max_iter=300,
verbose=0,
eps=1e-3,
random_state=None,
return_n_iter=False,
):
"""
Computes multidimensional scaling using SMACOF (Scaling by Majorizing a
Complicated Function) algorithm
The SMACOF algorithm is a multidimensional scaling algorithm: it minimizes
a objective function, the *stress*, using a majorization technique. The
Stress Majorization, also known as the Guttman Transform, guarantees a
monotone convergence of Stress, and is more powerful than traditional
techniques such as gradient descent.
The SMACOF algorithm for metric MDS can summarized by the following steps:
1. Set an initial start configuration, randomly or not.
2. Compute the stress
3. Compute the Guttman Transform
4. Iterate 2 and 3 until convergence.
The nonmetric algorithm adds a monotonic regression steps before computing
the stress.
Parameters
----------
similarities : symmetric ndarray, shape (n_samples, n_samples)
similarities between the points
metric : boolean, optional, default: True
compute metric or nonmetric SMACOF algorithm
n_components : int, optional, default: 2
number of dimension in which to immerse the similarities
overridden if initial array is provided.
init : {None or ndarray of shape (n_samples, n_components)}, optional
if None, randomly chooses the initial configuration
if ndarray, initialize the SMACOF algorithm with this array
n_init : int, optional, default: 8
Number of time the smacof_p algorithm will be run with different
initialisation. The final results will be the best output of the
n_init consecutive runs in terms of stress.
n_jobs : int, optional, default: 1
The number of jobs to use for the computation. This works by breaking
down the pairwise matrix into n_jobs even slices and computing them in
parallel.
If -1 all CPUs are used. If 1 is given, no parallel computing code is
used at all, which is useful for debugging. For n_jobs below -1,
(n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one
are used.
max_iter : int, optional, default: 300
Maximum number of iterations of the SMACOF algorithm for a single run
verbose : int, optional, default: 0
level of verbosity
eps : float, optional, default: 1e-6
relative tolerance w.r.t stress to declare converge
random_state : integer or numpy.RandomState, optional
The generator used to initialize the centers. If an integer is
given, it fixes the seed. Defaults to the global numpy random
number generator.
return_n_iter : bool
Whether or not to return the number of iterations.
Returns
-------
X : ndarray (n_samples,n_components)
Coordinates of the n_samples points in a n_components-space
stress : float
The final value of the stress (sum of squared distance of the
disparities and the distances for all constrained points)
n_iter : int
The number of iterations corresponding to the best stress.
Returned only if `return_n_iter` is set to True.
Notes
-----
"Modern Multidimensional Scaling - Theory and Applications" Borg, I.;
Groenen P. Springer Series in Statistics (1997)
"Nonmetric multidimensional scaling: a numerical method" Kruskal, J.
Psychometrika, 29 (1964)
"Multidimensional scaling by optimizing goodness of fit to a nonmetric
hypothesis" Kruskal, J. Psychometrika, 29, (1964)
"""
similarities = check_array(similarities)
random_state = check_random_state(random_state)
if hasattr(init, "__array__"):
init = np.asarray(init).copy()
if not n_init == 1:
warnings.warn(
"Explicit initial positions passed: "
"performing only one init of the MDS instead of %d" % n_init
)
n_init = 1
best_pos, best_stress = None, None
if n_jobs == 1:
for it in range(n_init):
pos, stress, n_iter_ = _smacof_single_p(
similarities,
n_uq,
metric=metric,
n_components=n_components,
init=init,
max_iter=max_iter,
verbose=verbose,
eps=eps,
random_state=random_state,
)
if best_stress is None or stress < best_stress:
best_stress = stress
best_pos = pos.copy()
best_iter = n_iter_
else:
seeds = random_state.randint(np.iinfo(np.int32).max, size=n_init)
results = Parallel(n_jobs=n_jobs, verbose=max(verbose - 1, 0))(
delayed(_smacof_single_p)(
similarities,
n_uq,
metric=metric,
n_components=n_components,
init=init,
max_iter=max_iter,
verbose=verbose,
eps=eps,
random_state=seed,
)
for seed in seeds
)
positions, stress, n_iters = zip(*results)
best = np.argmin(stress)
best_stress = stress[best]
best_pos = positions[best]
best_iter = n_iters[best]
if return_n_iter:
return best_pos, best_stress, best_iter
else:
return best_pos, best_stress
class MDSP(BaseEstimator):
"""Multidimensional scaling
Parameters
----------
metric : boolean, optional, default: True
compute metric or nonmetric SMACOF (Scaling by Majorizing a
Complicated Function) algorithm
n_components : int, optional, default: 2
number of dimension in which to immerse the similarities
overridden if initial array is provided.
n_init : int, optional, default: 4
Number of time the smacof_p algorithm will be run with different
initialisation. The final results will be the best output of the
n_init consecutive runs in terms of stress.
max_iter : int, optional, default: 300
Maximum number of iterations of the SMACOF algorithm for a single run
verbose : int, optional, default: 0
level of verbosity
eps : float, optional, default: 1e-6
relative tolerance w.r.t stress to declare converge
n_jobs : int, optional, default: 1
The number of jobs to use for the computation. This works by breaking
down the pairwise matrix into n_jobs even slices and computing them in
parallel.
If -1 all CPUs are used. If 1 is given, no parallel computing code is
used at all, which is useful for debugging. For n_jobs below -1,
(n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one
are used.
random_state : integer or numpy.RandomState, optional
The generator used to initialize the centers. If an integer is
given, it fixes the seed. Defaults to the global numpy random
number generator.
dissimilarity : string
Which dissimilarity measure to use.
Supported are 'euclidean' and 'precomputed'.
Attributes
----------
embedding_ : array-like, shape [n_components, n_samples]
Stores the position of the dataset in the embedding space
stress_ : float
The final value of the stress (sum of squared distance of the
disparities and the distances for all constrained points)
References
----------
"Modern Multidimensional Scaling - Theory and Applications" Borg, I.;
Groenen P. Springer Series in Statistics (1997)
"Nonmetric multidimensional scaling: a numerical method" Kruskal, J.
Psychometrika, 29 (1964)
"Multidimensional scaling by optimizing goodness of fit to a nonmetric
hypothesis" Kruskal, J. Psychometrika, 29, (1964)
"""
def __init__(
self,
n_components=2,
n_uq=1,
metric=True,
n_init=4,
max_iter=300,
verbose=0,
eps=1e-3,
n_jobs=1,
random_state=None,
dissimilarity="euclidean",
):
self.n_components = n_components
self.n_uq = n_uq
self.dissimilarity = dissimilarity
self.metric = metric
self.n_init = n_init
self.max_iter = max_iter
self.eps = eps
self.verbose = verbose
self.n_jobs = n_jobs
self.random_state = random_state
def fit(self, X, y=None, init=None):
""" Compute the position of the points in the embedding space.
Parameters
----------
X : array, shape=[n_samples, n_features], or [n_samples, n_samples] \
if dissimilarity='precomputed'
Input data.
init : {None or ndarray, shape (n_samples,)}, optional
If None, randomly chooses the initial configuration
if ndarray, initialize the SMACOF algorithm with this array.
"""
self.fit_transform(X, init=init)
return self
def fit_transform(self, X, y=None, init=None):
""" Fit the data from X, and returns the embedded coordinates.
Parameters
----------
X : array, shape=[n_samples, n_features], or [n_samples, n_samples] \
if dissimilarity='precomputed'
Input data.
init : {None or ndarray, shape (n_samples,)}, optional
If None, randomly chooses the initial configuration
if ndarray, initialize the SMACOF algorithm with this array.
"""
X = check_array(X)
if X.shape[0] == X.shape[1] and self.dissimilarity != "precomputed":
warnings.warn(
"The MDS API has changed. `fit` now constructs an"
" dissimilarity matrix from data. To use a custom "
"dissimilarity matrix, set "
"`dissimilarity=precomputed`."
)
if self.dissimilarity == "precomputed":
self.dissimilarity_matrix_ = X
elif self.dissimilarity == "euclidean":
self.dissimilarity_matrix_ = euclidean_distances(X)
else:
raise ValueError(
"Proximity must be 'precomputed' or 'euclidean'."
" Got %s instead" % str(self.dissimilarity)
)
self.embedding_, self.stress_, self.n_iter_ = smacof_p(
self.dissimilarity_matrix_,
self.n_uq,
metric=self.metric,
n_components=self.n_components,
init=init,
n_init=self.n_init,
n_jobs=self.n_jobs,
max_iter=self.max_iter,
verbose=self.verbose,
eps=self.eps,
random_state=self.random_state,
return_n_iter=True,
)
return self.embedding_