Interval Estimation Threshold#

Idea: Interval Estimation Threshold (IEThresh) selects samples based on ‘Uncertainty Sampling’. The selected samples are labeled by the annotators whose estimated annotation performances are equal or greater than an adaptive threshold. The strategy assumes all annotators to be available and is not defined otherwise. To deal with this case nonetheless value-annotator pairs are first ranked according to the amount of annotators available for the given value in candidates and are than ranked according to IntervalEstimationThreshold.

Google Colab Note: If the notebook fails to run after installing the needed packages, try to restart the runtime (Ctrl + M) under Runtime -> Restart session.
https://colab.research.google.com/assets/colab-badge.svg
Notebook Dependencies
Uncomment the following cell to install all dependencies for this tutorial.
# !pip install scikit-activeml

import numpy as np
from matplotlib import pyplot as plt, animation
from matplotlib.ticker import MaxNLocator
from sklearn.datasets import make_blobs

from skactiveml.utils import (
    MISSING_LABEL,
    majority_vote,
    is_labeled,
)
from skactiveml.visualization import (
    plot_utilities,
    plot_decision_boundary,
    mesh,
)

from skactiveml.classifier import ParzenWindowClassifier
from skactiveml.pool.multiannotator import IntervalEstimationThreshold
from skactiveml.pool.multiannotator import IntervalEstimationAnnotModel

random_state = np.random.RandomState(0)
rng = np.random.default_rng(seed=0)
# Build a dataset.
X_all, y_true_all = make_blobs(
    n_samples=400,
    n_features=2,
    centers=[[0, 1], [-3, 0.5], [-1, -1], [2, 1], [1, -0.5]],
    cluster_std=0.7,
    random_state=random_state,
)
X, X_test = X_all[: len(X_all) // 2], X_all[len(X_all) // 2 :]
y_true_all = y_true_all % 2
y_true, y_true_test = (
    y_true_all[: len(X_all) // 2],
    y_true_all[len(X_all) // 2 :],
)
n_annotators = 5
y_annot = np.zeros(shape=(len(X), n_annotators), dtype=int)
annotator_error_prob = np.linspace(0.0, 0.3, num=n_annotators)
for i, p in enumerate(annotator_error_prob):
    y_noise = rng.binomial(1, p, len(X))
    y_annot[:, i] = y_noise ^ y_true
y = np.full(shape=y_annot.shape, fill_value=MISSING_LABEL)
y_mv = majority_vote(y, missing_label=MISSING_LABEL, random_state=random_state)
# Initialise the classifier.
clf = ParzenWindowClassifier(classes=[0, 1], random_state=random_state)

# Initialise the query strategy.
qs = IntervalEstimationThreshold(random_state=random_state)

# Preparation for plotting.
fig = plt.figure(figsize=(7, 5))
ax1 = plt.subplot2grid((3, 1), (0, 0), rowspan=2)
ax2 = plt.subplot2grid((3, 1), (2, 0), rowspan=1)
feature_bound = [[min(X[:, 0]), min(X[:, 1])], [max(X[:, 0]), max(X[:, 1])]]
artists = []

# Active learning cycle:
n_cycles = 20
for c in range(n_cycles):
    # Fit the classifier with current labels.
    clf.fit(X, y_mv)

    # Fit the annotation performance model
    if np.all(np.any(is_labeled(y), axis=0)):
        A_perf_clf = np.sum(
            np.where(is_labeled(y), y_annot == clf.predict(X)[:, None], 0),
            axis=0,
        ) / np.sum(is_labeled(y), axis=0)
    else:
        A_perf_clf = None

    A_perf_clf_individual = np.full(n_annotators, np.nan)
    has_labels = np.any(is_labeled(y), axis=0)
    A_perf_clf_individual[has_labels] = np.sum(
        np.where(
            is_labeled(y)[:, has_labels],
            y_annot[:, has_labels] == clf.predict(X)[:, None],
            0,
        ),
        axis=0,
    ) / np.sum(is_labeled(y)[:, has_labels], axis=0)

    # Query the next sample(s).
    query_idx = qs.query(X=X, y=y, clf=clf, fit_clf=False, batch_size=3)

    # Capture the current plot state.
    coll_old = list(ax1.collections) + list(ax2.collections)
    title = ax1.text(
        0.5,
        1.05,
        f"Decision boundary after acquiring {c} labels\n"
        f"Test Accuracy: {clf.score(X_test, y_true_test):.4f}",
        size=plt.rcParams["axes.titlesize"],
        ha="center",
        transform=ax1.transAxes,
    )

    y_mv = majority_vote(y, random_state=0)
    is_labeled_sample = np.any(is_labeled(y), axis=1)
    is_correctly_labeled_sample = is_labeled_sample & (y_mv == y_true)
    is_wrongly_labeled_sample = is_labeled_sample & (y_mv != y_true)

    axes = [ax1, ax2]
    # axes = plot_annotator_utilities(ma_qs, X=X, y=y, clf=clf, axes=axes, feature_bound=bound)
    X_mesh, Y_mesh, mesh_samples = mesh(feature_bound, 25)
    _, utilities = qs.query(
        X=X, y=y, clf=clf, fit_clf=False, batch_size=3, return_utilities=True, candidates=mesh_samples
    )
    ax1.contourf(
        X_mesh,
        Y_mesh,
        np.mean(utilities[0], axis=1).reshape(X_mesh.shape),
        **{"cmap": "Greens", "alpha": 0.75},
    )
    # for a in range(n_annotators):
    plot_decision_boundary(clf, ax=ax1, feature_bound=feature_bound)
    ax1.scatter(
        X[~is_labeled_sample, 0],
        X[~is_labeled_sample, 1],
        c=y_true[~is_labeled_sample],
        cmap="coolwarm",
        marker=".",
        zorder=2,
        s=10,
    )
    ax1.scatter(
        X[is_correctly_labeled_sample, 0],
        X[is_correctly_labeled_sample, 1],
        c=y_mv[is_correctly_labeled_sample],
        cmap="coolwarm",
        marker="o",
        s=20,
        zorder=100,
        vmin=0,
        vmax=1,
    )
    ax1.scatter(
        X[is_wrongly_labeled_sample, 0],
        X[is_wrongly_labeled_sample, 1],
        c=y_mv[is_wrongly_labeled_sample],
        cmap="coolwarm",
        marker="x",
        s=20,
        zorder=100,
        vmin=0,
        vmax=1,
    )
    ax1.scatter(
        X[is_labeled_sample, 0],
        X[is_labeled_sample, 1],
        c="grey",
        alpha=0.8,
        marker=".",
        edgecolors="black",
        s=300,
    )
    ax1.set_xlabel("Feature 1")
    ax1.set_ylabel("Feature 2")

    requests_per_annotator = np.sum(is_labeled(y), axis=0)
    bar_labels = ax2.bar(
        np.arange(n_annotators),
        requests_per_annotator,
        width=0.4,
        color="grey",
    )

    ax2.set_xlabel("Annotators")
    ax2.set_xticks(
        np.arange(n_annotators),
        [f"(AP={1-ep})" for ep in annotator_error_prob],
    )
    ax2.set_ylabel("Requested Labels")
    text_elements = []
    for i in range(n_annotators):
        if not np.isnan(A_perf_clf_individual[i]):
            text = ax2.text(
                i,
                requests_per_annotator[i] + 0.1,
                r"($\widehat{\text{AP}}$=" + f"{A_perf_clf_individual[i]:.2})",
                horizontalalignment="center",
                color="black",
                fontsize=10,
            )
            text_elements.append(text)
    ax2.yaxis.set_major_locator(MaxNLocator(integer=True, nbins=5))

    coll_new = list(ax1.collections) + list(ax2.collections)
    coll_new.append(title)
    artists.append(
        [x for x in coll_new if x not in coll_old]
        + bar_labels.get_children()
        + text_elements
    )

    # Update labels based on query.
    y[query_idx[:, 0], query_idx[:, 1]] = y_annot[
        query_idx[:, 0], query_idx[:, 1]
    ]
lower_y_limit, upper_y_limit = ax2.get_ylim()
ax2.set_ylim((lower_y_limit, upper_y_limit * 1.2))
ani = animation.ArtistAnimation(fig, artists, interval=1000, blit=True)
../../../_images/pool_multi_annotator_legend1.png

Total running time of the script: (0 minutes 5.776 seconds)

Gallery generated by Sphinx-Gallery