Random Sampling#

Idea: Implementation of a wrapper class for pool-based active learning query strategies with a single annotator such that it transforms the query strategy for the single annotator into a query strategy for multiple annotators by choosing an annotator randomly or according to the parameter A_perf and setting the labeled matrix to a labeled vector by an aggregation function, e.g., majority voting. Here the single-annotator strategy is ‘RandomSampling’ and for the chosen sample 3 annotators are chosen at random to provide a label.

Google Colab Note: If the notebook fails to run after installing the needed packages, try to restart the runtime (Ctrl + M) under Runtime -> Restart session.
https://colab.research.google.com/assets/colab-badge.svg
Notebook Dependencies
Uncomment the following cell to install all dependencies for this tutorial.
# !pip install scikit-activeml

import numpy as np
from matplotlib import pyplot as plt, animation
from matplotlib.ticker import MaxNLocator
from sklearn.datasets import make_blobs

from skactiveml.utils import (
    MISSING_LABEL,
    majority_vote,
    is_labeled,
)
from skactiveml.visualization import (
    plot_utilities,
    plot_decision_boundary,
    mesh,
)

from skactiveml.classifier import ParzenWindowClassifier
from skactiveml.pool.multiannotator import SingleAnnotatorWrapper
from skactiveml.pool import RandomSampling

random_state = np.random.RandomState(0)
rng = np.random.default_rng(seed=0)
# Build a dataset.
X_all, y_true_all = make_blobs(
    n_samples=400,
    n_features=2,
    centers=[[0, 1], [-3, 0.5], [-1, -1], [2, 1], [1, -0.5]],
    cluster_std=0.7,
    random_state=random_state,
)
X, X_test = X_all[: len(X_all) // 2], X_all[len(X_all) // 2 :]
y_true_all = y_true_all % 2
y_true, y_true_test = (
    y_true_all[: len(X_all) // 2],
    y_true_all[len(X_all) // 2 :],
)
n_annotators = 5
y_annot = np.zeros(shape=(len(X), n_annotators), dtype=int)
annotator_error_prob = np.linspace(0.0, 0.3, num=n_annotators)
for i, p in enumerate(annotator_error_prob):
    y_noise = rng.binomial(1, p, len(X))
    y_annot[:, i] = y_noise ^ y_true
y = np.full(shape=y_annot.shape, fill_value=MISSING_LABEL)
y_mv = majority_vote(y, missing_label=MISSING_LABEL, random_state=random_state)
# Initialise the classifier.
clf = ParzenWindowClassifier(classes=[0, 1], random_state=random_state)

# Initialise the query strategy.
qs = SingleAnnotatorWrapper(strategy=RandomSampling(random_state=random_state), random_state=random_state)

# Preparation for plotting.
fig = plt.figure(figsize=(7, 5))
ax1 = plt.subplot2grid((3, 1), (0, 0), rowspan=2)
ax2 = plt.subplot2grid((3, 1), (2, 0), rowspan=1)
feature_bound = [[min(X[:, 0]), min(X[:, 1])], [max(X[:, 0]), max(X[:, 1])]]
artists = []

# Active learning cycle:
n_cycles = 20
for c in range(n_cycles):
    # Fit the classifier with current labels.
    clf.fit(X, y_mv)

    # Fit the annotation performance model
    if np.all(np.any(is_labeled(y), axis=0)):
        A_perf_clf = np.sum(
            np.where(is_labeled(y), y_annot == clf.predict(X)[:, None], 0),
            axis=0,
        ) / np.sum(is_labeled(y), axis=0)
    else:
        A_perf_clf = None

    A_perf_clf_individual = np.full(n_annotators, np.nan)
    has_labels = np.any(is_labeled(y), axis=0)
    A_perf_clf_individual[has_labels] = np.sum(
        np.where(
            is_labeled(y)[:, has_labels],
            y_annot[:, has_labels] == clf.predict(X)[:, None],
            0,
        ),
        axis=0,
    ) / np.sum(is_labeled(y)[:, has_labels], axis=0)

    # Query the next sample(s).
    query_idx = qs.query(X=X, y=y, batch_size=3, n_annotators_per_sample=3)

    # Capture the current plot state.
    coll_old = list(ax1.collections) + list(ax2.collections)
    title = ax1.text(
        0.5,
        1.05,
        f"Decision boundary after acquiring {c} labels\n"
        f"Test Accuracy: {clf.score(X_test, y_true_test):.4f}",
        size=plt.rcParams["axes.titlesize"],
        ha="center",
        transform=ax1.transAxes,
    )

    y_mv = majority_vote(y, random_state=0)
    is_labeled_sample = np.any(is_labeled(y), axis=1)
    is_correctly_labeled_sample = is_labeled_sample & (y_mv == y_true)
    is_wrongly_labeled_sample = is_labeled_sample & (y_mv != y_true)

    axes = [ax1, ax2]
    # axes = plot_annotator_utilities(ma_qs, X=X, y=y, clf=clf, axes=axes, feature_bound=bound)
    X_mesh, Y_mesh, mesh_samples = mesh(feature_bound, 25)
    _, utilities = qs.query(
        X=X, y=y, batch_size=3, n_annotators_per_sample=3, return_utilities=True, candidates=mesh_samples
    )
    ax1.contourf(
        X_mesh,
        Y_mesh,
        np.mean(utilities[0], axis=1).reshape(X_mesh.shape),
        **{"cmap": "Greens", "alpha": 0.75},
    )
    # for a in range(n_annotators):
    plot_decision_boundary(clf, ax=ax1, feature_bound=feature_bound)
    ax1.scatter(
        X[~is_labeled_sample, 0],
        X[~is_labeled_sample, 1],
        c=y_true[~is_labeled_sample],
        cmap="coolwarm",
        marker=".",
        zorder=2,
        s=10,
    )
    ax1.scatter(
        X[is_correctly_labeled_sample, 0],
        X[is_correctly_labeled_sample, 1],
        c=y_mv[is_correctly_labeled_sample],
        cmap="coolwarm",
        marker="o",
        s=20,
        zorder=100,
        vmin=0,
        vmax=1,
    )
    ax1.scatter(
        X[is_wrongly_labeled_sample, 0],
        X[is_wrongly_labeled_sample, 1],
        c=y_mv[is_wrongly_labeled_sample],
        cmap="coolwarm",
        marker="x",
        s=20,
        zorder=100,
        vmin=0,
        vmax=1,
    )
    ax1.scatter(
        X[is_labeled_sample, 0],
        X[is_labeled_sample, 1],
        c="grey",
        alpha=0.8,
        marker=".",
        edgecolors="black",
        s=300,
    )
    ax1.set_xlabel("Feature 1")
    ax1.set_ylabel("Feature 2")

    requests_per_annotator = np.sum(is_labeled(y), axis=0)
    bar_labels = ax2.bar(
        np.arange(n_annotators),
        requests_per_annotator,
        width=0.4,
        color="grey",
    )

    ax2.set_xlabel("Annotators")
    ax2.set_xticks(
        np.arange(n_annotators),
        [f"(AP={1-ep})" for ep in annotator_error_prob],
    )
    ax2.set_ylabel("Requested Labels")
    text_elements = []
    for i in range(n_annotators):
        if not np.isnan(A_perf_clf_individual[i]):
            text = ax2.text(
                i,
                requests_per_annotator[i] + 0.1,
                r"($\widehat{\text{AP}}$=" + f"{A_perf_clf_individual[i]:.2})",
                horizontalalignment="center",
                color="black",
                fontsize=10,
            )
            text_elements.append(text)
    ax2.yaxis.set_major_locator(MaxNLocator(integer=True, nbins=5))

    coll_new = list(ax1.collections) + list(ax2.collections)
    coll_new.append(title)
    artists.append(
        [x for x in coll_new if x not in coll_old]
        + bar_labels.get_children()
        + text_elements
    )

    # Update labels based on query.
    y[query_idx[:, 0], query_idx[:, 1]] = y_annot[
        query_idx[:, 0], query_idx[:, 1]
    ]
lower_y_limit, upper_y_limit = ax2.get_ylim()
ax2.set_ylim((lower_y_limit, upper_y_limit * 1.2))
ani = animation.ArtistAnimation(fig, artists, interval=1000, blit=True)
../../../_images/pool_multi_annotator_legend1.png

Total running time of the script: (0 minutes 7.512 seconds)

Gallery generated by Sphinx-Gallery