This page was generated from examples/ad_advvae_mnist.ipynb.

Adversarial VAE detection on MNIST

Method

The adversarial VAE detector is first trained on a batch of unlabeled but normal (not adversarial) data. The loss is however different from traditional VAE training and focuses on minimizing the KL-divergence between a classifier’s predictions on the original and reconstructed data. When an adversarial instance is fed to the VAE, the KL-divergence between the predictions on the adversarial example and the reconstruction is large. The reconstruction does not contain the adversarial artefacts and has a very different prediction distribution. As a result, the adversarial instance is flagged.

Dataset

MNIST consists of 60,000 28 by 28 grayscale train and 10,000 test images representing the numbers 0 to 9.

[1]:
import logging
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
tf.keras.backend.clear_session()
from tensorflow.keras.layers import Activation, Conv2D, Conv2DTranspose, Dense, Dropout
from tensorflow.keras.layers import Flatten, Input, InputLayer, Reshape, MaxPooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical

# Adversarial attack method. The latest release of the `cleverhans` package does
# not support TensrFlow 2 yet, so we need to install from the master branch:
# pip install git+https://github.com/tensorflow/cleverhans.git#egg=cleverhans
from cleverhans.future.tf2.attacks import projected_gradient_descent

from alibi_detect.models.losses import loss_adv_vae
from alibi_detect.ad import AdversarialVAE
from alibi_detect.utils.saving import save_detector, load_detector, save_tf_model, load_tf_model
from alibi_detect.utils.visualize import plot_instance_score

logger = tf.get_logger()
logger.setLevel(logging.ERROR)

Load MNIST data

[2]:
train, test = tf.keras.datasets.mnist.load_data()
X_train, y_train = train
X_test, y_test = test

X_train = X_train.reshape(-1, 28, 28, 1).astype('float32') / 255
X_test = X_test.reshape(-1, 28, 28, 1).astype('float32') / 255
y_train = to_categorical(y_train, 10)
y_test = to_categorical(y_test, 10)
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
(60000, 28, 28, 1) (60000, 10) (10000, 28, 28, 1) (10000, 10)

Create and train MNIST model

The pretrained outlier and adversarial detectors used in the example notebooks can be found here. You can either manually download the relevant files in the model_mnist folder to e.g. the local directory my_dir. Alternatively, if you have Google Cloud SDK installed, you can download the whole folder as follows:

!gsutil cp -r gs://seldon-models/alibi-detect/model_mnist my_dir
[3]:
load_mnist_model = False
[4]:
filepath = './model_mnist/'  # change to directory where model is downloaded
if load_mnist_model:
    model = load_tf_model(filepath)
else:
    # define model
    inputs = Input(shape=(X_train.shape[1:]))
    x = Conv2D(64, 2, padding='same', activation='relu')(inputs)
    x = MaxPooling2D(pool_size=2)(x)
    x = Dropout(.3)(x)

    x = Conv2D(32, 2, padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=2)(x)
    x = Dropout(.3)(x)

    x = Flatten()(x)
    x = Dense(256, activation='relu')(x)
    x = Dropout(.5)(x)
    logits = Dense(10, name='logits')(x)
    outputs = Activation('softmax', name='softmax')(logits)
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])

    # train model
    model.fit(X_train,
              y_train,
              epochs=5,
              batch_size=256,
              verbose=0,
              validation_data=(X_test, y_test)
              )

    # save model
    save_tf_model(model, filepath)

Evaluate model:

[5]:
# evaluate model
results = model.evaluate(X_test, y_test, batch_size=256, verbose=0)
print('Test loss: {:.4f} -- accuracy: {:.4f}'.format(results[0], results[1]))
Test loss: 0.0419 -- accuracy: 0.9865

Train Adversarial VAE for MNIST

You can again either manually download the relevant files in the ad_vae_mnist folder to e.g. the local directory my_dir or use gsutil:

!gsutil cp -r gs://seldon-models/alibi-detect/ad_vae_mnist my_dir
[6]:
load_adversarial_detector = False
[7]:
filepath = './ad_vae_mnist/'  # change to directory where model is downloaded
if load_adversarial_detector:  # load pretrained adversarial detector
    ad = load_detector(filepath)
else:  # define model, initialize, train and save adversarial detector
    latent_dim = 50

    encoder_net = tf.keras.Sequential(
      [
          InputLayer(input_shape=(28, 28, 1)),
          Conv2D(64, 4, strides=2, padding='same', activation=tf.nn.relu),
          Conv2D(128, 4, strides=2, padding='same', activation=tf.nn.relu),
          Conv2D(512, 4, strides=2, padding='same', activation=tf.nn.relu)
      ])

    decoder_net = tf.keras.Sequential(
      [
          InputLayer(input_shape=(latent_dim,)),
          Dense(7*7*32, activation=tf.nn.relu),
          Reshape(target_shape=(7, 7, 32)),
          Conv2DTranspose(64, 3, strides=2, padding='same', activation=tf.nn.relu),
          Conv2DTranspose(32, 3, strides=2, padding='same', activation=tf.nn.relu),
          Conv2DTranspose(1, 3, strides=1, padding='same', activation='sigmoid')
      ])

    # initialize adversarial detector
    ad = AdversarialVAE(threshold=.5,  # threshold for adversarial score
                        model=model,
                        encoder_net=encoder_net,  # can also pass VAE model instead
                        decoder_net=decoder_net,  # of separate encoder and decoder
                        latent_dim=latent_dim,
                        samples=2,  # nb of samples drawn by VAE
                        beta=0.  # weight on KL-divergence loss term of latent space
                       )

    # train
    ad.fit(X_train,
           loss_fn=loss_adv_vae,
           w_model=1.,  # weight on KL-divergence loss term of model predictions
           w_recon=0.,  # weight on elbo loss term
           epochs=2,
           verbose=True)

    # save the trained outlier detector
    save_detector(ad, filepath)
938/938 [=] - 302s 322ms/step - loss: 0.2373
938/938 [=] - 245s 261ms/step - loss: 0.0259

Generate adversarial instances

The cleverhans adversarial attack methods assume that the model outputs logits, so we will create a modified model by simply removing the softmax output layer:

[8]:
model_logits = Model(inputs=model.inputs, outputs=model.layers[-2].output)

Select observations for which we will create adversarial instances:

[9]:
ids = np.arange(5)
X_to_adv = X_test[ids]
print(X_to_adv.shape)
(5, 28, 28, 1)

Launch adversarial attack. Follow the Basic Iterative Method (Kurakin et al. 2016) when rand_init is set to 0 or the Madry et al. (2017) method when rand_minmax is larger than 0:

[10]:
X_adv = projected_gradient_descent(model_logits,
                                   X_to_adv,
                                   eps=2.,
                                   eps_iter=1.,
                                   nb_iter=10,
                                   norm=2,
                                   clip_min=X_train.min(),
                                   clip_max=X_train.max(),
                                   rand_init=None,
                                   rand_minmax=.3,
                                   targeted=False,
                                   sanity_checks=False
                                  ).numpy()

Visualize adversarial instances with model predictions:

[11]:
y_pred = np.argmax(model(X_to_adv).numpy(), axis=-1)
y_pred_adv = np.argmax(model(X_adv).numpy(), axis=-1)

n_rows = X_to_adv.shape[0]
n_cols = 2
figsize = (10, 20)
img_shape = (28, 28)

fig, axes = plt.subplots(nrows=n_rows, ncols=n_cols, figsize=figsize)

n_subplot = 1
for i in range(n_rows):
    plt.subplot(n_rows, n_cols, n_subplot)
    plt.axis('off')
    if i == 0:
        plt.title('Original'
                  '\nPred: {}'.format(y_pred[i]))
    else:
        plt.title('Pred: {}'.format(y_pred[i]))
    plt.imshow(X_to_adv[i].reshape(img_shape))
    n_subplot += 1

    plt.subplot(n_rows, n_cols, n_subplot)
    plt.axis('off')
    if i == 0:
        plt.title('Adversarial'
                  '\nPred: {}'.format(y_pred_adv[i]))
    else:
        plt.title('Pred: {}'.format(y_pred_adv[i]))
    plt.imshow(X_adv[i].reshape(img_shape))
    n_subplot += 1
plt.show()
../_images/examples_ad_advvae_mnist_19_0.png

Detect adversarial instances

[12]:
X = np.concatenate([X_to_adv, X_adv], axis=0)
print(X.shape)
(10, 28, 28, 1)
[13]:
ad_preds = ad.predict(X, return_instance_score=True)

Plot adversarial score for each instance:

[14]:
labels = ['Normal', 'Adversarial']
target = np.array([0 if i < X_to_adv.shape[0] else 1 for i in range(X.shape[0])])
plot_instance_score(ad_preds, target, labels, ad.threshold)
../_images/examples_ad_advvae_mnist_24_0.png

The adversarial detector easily separates the normal from adversarial instances which predict other classes than the original class. We can plot the individual instances again with their adversarial scores:

[15]:
scores = ad_preds['data']['instance_score']

fig, axes = plt.subplots(nrows=n_rows, ncols=n_cols, figsize=figsize)

n_subplot = 1
for i in range(n_rows):
    plt.subplot(n_rows, n_cols, n_subplot)
    plt.axis('off')
    if i == 0:
        plt.title('Original'
                  '\nPred: {}'
                  '\nAdv score: {:.4f}'.format(y_pred[i], scores[i]))
    else:
        plt.title('Pred: {}'
                  '\nAdv score: {:.4f}'.format(y_pred[i], scores[i]))
    plt.imshow(X_to_adv[i].reshape(img_shape))
    n_subplot += 1

    plt.subplot(n_rows, n_cols, n_subplot)
    plt.axis('off')
    if i == 0:
        plt.title('Adversarial'
                  '\nPred: {}'
                  '\nAdv score: {:.4f}'.format(y_pred_adv[i], scores[i + n_rows]))
    else:
        plt.title('Pred: {}'
                  '\nAdv score: {:.4f}'.format(y_pred_adv[i], scores[i + n_rows]))
    plt.imshow(X_adv[i].reshape(img_shape))
    n_subplot += 1
plt.show()
../_images/examples_ad_advvae_mnist_26_0.png