Stream-based Active Learning - Getting Started#

In this notebook, we will show how stream-based active learning strategies are used and compared them to one another. We showcase the methods available in the skactiveml stream package.

[1]:
import numpy as np
import sklearn.datasets
import matplotlib as mlp
import matplotlib.pyplot as plt
from collections import deque
from scipy.ndimage import gaussian_filter1d
from skactiveml.classifier import ParzenWindowClassifier
from skactiveml.stream import StreamRandomSampling, PeriodicSampling
from skactiveml.stream import (
    FixedUncertainty,
    VariableUncertainty,
    Split,
    StreamProbabilisticAL,
    StreamDensityBasedAL,
    CognitiveDualQueryStrategyRan,
    CognitiveDualQueryStrategyFixUn,
    CognitiveDualQueryStrategyRanVarUn,
    CognitiveDualQueryStrategyVarUn,
)
from skactiveml.utils import call_func
mlp.rcParams["figure.facecolor"] = "white"

Initialize Stream Parameters#

Before the experiments can start, we need to construct a random data set. For this, we specify the necessary parameters in the cell below. We specify the length of the data stream (stream_length) and the size of the sliding window that defines the available training data (training_size). Additionally, we define a parameter (fit_clf) to decide if X and y are needed and should be used.

[2]:
# number of instances that are provided to the classifier
init_train_length = 10
# the length of the data stream
stream_length = 5000
# the size of the sliding window that limits the training data
training_size = 1000
# the parameter dedicated to decide if the classifier needs to be refited with X and y.
fit_clf = False

Random Seed Generation#

To make the experiments repeatable, we will use the random_state object to generate all other random seeds, such that we only need to explicitly specify a single random seed. The get_randomseed function simplifies the generation of a new random seed using the random_state object.

[3]:
# random state that is used to generate random seeds
random_state = np.random.RandomState(0)

def get_randomseed(random_state):
    return random_state.randint(2**31-1)

Generate And Initialize Data Set#

The next block initializes the tested data set. We use scikit-learn to generate a random dataset with our pre-defined stream length. The data set consists of multiple parts. X represents the location of the instance within the feature space. The class for each instance is denoted by y. For models that need at least some initial training data, we generate samples to train an initial model. These are denoted by the suffix _init, while all data used within the active learning cycle are denoted by the suffix _stream. For this notebook we evaluate the performance of each query strategy using Prequential Evaluation. If a hold-out test dataset is used, it should be initialized here as well.

[4]:
X, y_centers = sklearn.datasets.make_blobs(
    n_samples=init_train_length + stream_length,
    centers=30,
    random_state=get_randomseed(random_state),
    shuffle=True)
y = y_centers % 2
X_init = X[:init_train_length, :]
y_init = y[:init_train_length]
X_stream = X[init_train_length:, :]
y_stream = y[init_train_length:]
[5]:
plt.scatter(X[:, 0], X[:, 1], c=y)
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('Data set');
plt.show()
../../_images/generated_tutorials_20_stream_getting_started_9_0.png

Initialize Query Strategies#

Next, we initialize the classifier and the base query strategies that we want to compare. To guarantee that the classifier is not affected by previous repetitions, we use factory functions to separate the classifier for each experiment run. Since all query strategies have a default budget managager we use that for the sake of simplicity.

[6]:
clf_factory = lambda: ParzenWindowClassifier(classes=[0,1], random_state=get_randomseed(random_state))
query_strategies = {
    'StreamRandomSampling': StreamRandomSampling(random_state=get_randomseed(random_state)),
    'PeriodicSampler': PeriodicSampling(random_state=get_randomseed(random_state)),
    # 'FixedUncertainty': FixedUncertainty(random_state=get_randomseed(random_state)),
    # 'VariableUncertainty': VariableUncertainty(random_state=get_randomseed(random_state)),
    'Split': Split(random_state=get_randomseed(random_state)),
    'DBALStream': StreamDensityBasedAL(random_state=get_randomseed(random_state)),
    # 'CogDQSRan': CognitiveDualQueryStrategyRan(random_state=get_randomseed(random_state), force_full_budget=True),
    # 'CogDQSFixUn': CognitiveDualQueryStrategyFixUn(random_state=get_randomseed(random_state), force_full_budget=True),
    # 'CogDQSVarUn': CognitiveDualQueryStrategyVarUn(random_state=get_randomseed(random_state), force_full_budget=True),
    'CogDQSRanVarUn': CognitiveDualQueryStrategyRanVarUn(random_state=get_randomseed(random_state), force_full_budget=True),
    'StreamProbabilisticAL': StreamProbabilisticAL(random_state=get_randomseed(random_state)),
}

Start Active Learning Cycle#

After all, variables are initialized, we can start the experiment. The experiment loop below goes through all query strategies defined in query_strategies. For each experiment run, the average accuracy of the selected query strategies will be displayed. Lastly, the accuracy over time will be plotted.

[7]:
for query_strategy_name, query_strategy in query_strategies.items():
    clf = clf_factory()
    # initializing the training data
    X_train = deque(maxlen=training_size)
    X_train.extend(X_init)
    y_train = deque(maxlen=training_size)
    y_train.extend(y_init)
    # train the model with the initially available data
    clf.fit(X_train, y_train)
    # initialize the list that stores the result of the classifier's prediction
    correct_classifications = []
    count = 0
    for t, (x_t, y_t) in enumerate(zip(X_stream, y_stream)):
        # create stream samples
        X_cand = x_t.reshape([1, -1])
        y_cand = y_t
        # train the classifier
        clf.fit(X_train, y_train)
        correct_classifications.append(clf.predict(X_cand)[0] == y_cand)
        # check whether to sample the instance or not
        # call_func is used since a classifier is not needed for RandomSampling and PeriodicSampling
        sampled_indices, utilities = call_func(query_strategy.query, candidates=X_cand, clf=clf, return_utilities=True, fit_clf=fit_clf)
        # create budget_manager_param_dict for BalancedIncrementalQuantileFilter used by StreamProbabilisticAL
        budget_manager_param_dict = {"utilities": utilities}
        # update the query strategy and budget_manager to calculate the right budget
        call_func(query_strategy.update, candidates=X_cand, queried_indices=sampled_indices, budget_manager_param_dict=budget_manager_param_dict)
        # count the number of queries
        count += len(sampled_indices)
        # add X_cand to X_train
        X_train.append(x_t)
        # add label or missing_label to y_train
        y_train.append(y_cand if len(sampled_indices) > 0 else clf.missing_label)
    # calculate and show the average accuracy
    print("Query Strategy: ", query_strategy_name, ", Avg Accuracy: ", np.mean(correct_classifications), ", Acquisition count:", count)
    plt.plot(gaussian_filter1d(np.array(correct_classifications, dtype=float), 50), label=query_strategy_name)
plt.legend();
Query Strategy:  StreamRandomSampling , Avg Accuracy:  0.7642 , Acquisition count: 498
Query Strategy:  PeriodicSampler , Avg Accuracy:  0.7764 , Acquisition count: 500
Query Strategy:  Split , Avg Accuracy:  0.7416 , Acquisition count: 522
Query Strategy:  DBALStream , Avg Accuracy:  0.7828 , Acquisition count: 500
Query Strategy:  CogDQSRanVarUn , Avg Accuracy:  0.7834 , Acquisition count: 521
Query Strategy:  StreamProbabilisticAL , Avg Accuracy:  0.7966 , Acquisition count: 498
../../_images/generated_tutorials_20_stream_getting_started_13_1.png