I have been exploring various versions of Variational Autoencoder. I wanted to apply it to generating sequences of words originally, (see if I could train it to write a cover letter actually), so I was particularly interested in forms that were designed to deal with sequences. Several papers described constructing a VAE with an LSTM as the encoder and decoder, other papers said this did not work. All tensorflow demo code seemed to use the MNIST data set, which really wasn't sequence at all, so I decided that I would explore things with some petting zoo data that actually was a sequence. Possibly some data that involved classifying several different types of sequences. I looked through the UCI Machine Learning Archive and settled on one that seemed simplest to work with. So let's fetch the data. It consists of time series of 40 time steps, with three different classes, and 5000 examples.
import os
filename = "waveform-+noise.data"
if not os.path.isfile(filename):
import urllib.request
urllib.request.urlretrieve ("https://archive.ics.uci.edu/ml/machine-learning-databases/waveform/waveform-+noise.data.Z",
"waveform-+noise.data.Z")
os.system('/bin/uncompress {fname}'.format(fname = "waveform-+noise.data.Z"))
My thoughts about the design of the model: take a bunch of multi-variate time series data and cook it down to a single variable time series output. For example: many inputs to a nerve cell producing the axon output, or several economic indicators producing a stock price, etc. Many problems seem to have this shape, and the Variational Autoencoder design ought to be able to deal with such a structure. Also, I thought, using an RNN on the input, and a Multi-Layer Perceptron on the output might be interesting. The RNN might capture the structure of the time series and render it into a 'flat and square' representation on the variational layer. The the MLP would reproduce the original neuron output, etc., as a 'flat and square' representation. This might be something that was human comprehensible if one could figure out how to display it. So, now on to coding.
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import tensorflow as tf
M = 5000
data_length = 40
class_dim = 3
rnn_neurons = 64
dense_neurons = 256
num_layers = 2
latent_dim = 2
default_batch_size = 25
time_steps = data_length
learning_rate = 1e-4
GV = 1e-7
features = 1
epochs = 20
def create_dataset(filename):
data = np.genfromtxt(filename, delimiter=',').astype(np.float32)
classes = data[:, -1].astype(np.int32)
data = data[:, :-1]
data = data/np.max(np.abs(data))
indexes = np.arange(data.shape[0])
while True:
samp = np.random.choice(indexes, default_batch_size)
X = data[samp].reshape(default_batch_size, time_steps, features)
cl = classes[samp]
yield X, X, cl
class EncoderRNN(object):
def __init__(self, x, n_steps, input_size, output_size,
cell_size, num_layers, default_batch_size):
self.x = x
self.n_steps = n_steps
self.input_size = input_size
self.output_size = output_size
self.cell_size = cell_size
self.num_layers = num_layers
self.default_batch_size = default_batch_size
# tf Graph input
with tf.name_scope('encoder_inputs'):
self.batch_size = tf.placeholder(dtype=tf.int32)
with tf.variable_scope('EncoderRNN'):
self.add_rnn()
with tf.variable_scope('out_hidden'):
self.add_encoder_output()
def add_rnn(self,):
def make_cell():
cell = tf.contrib.rnn.GRUCell(self.cell_size)
return cell
with tf.variable_scope("mu_network"):
mu_network = tf.nn.rnn_cell.MultiRNNCell(
[make_cell() for _ in range(self.num_layers)])
self.mu_cell_outputs, _ = tf.nn.dynamic_rnn(
cell=mu_network,
inputs=self.x,
dtype=tf.float32,
)
with tf.variable_scope("logvar_network"):
logvar_network = tf.nn.rnn_cell.MultiRNNCell(
[make_cell() for _ in range(self.num_layers)])
self.logvar_cell_outputs, _ = tf.nn.dynamic_rnn(
cell=logvar_network,
inputs=self.x,
dtype=tf.float32,
)
def add_encoder_output(self):
Ws_mu = self._weight_variable([self.cell_size,
self.output_size], name='ws_mu')
bs_mu = self._bias_variable([self.output_size, ], name='bs_mu')
Ws_logvar = self._weight_variable([self.cell_size,
self.output_size],
name='ws_logvar')
bs_logvar = self._bias_variable([self.output_size, ],
name='bs_logvar')
mu_out = tf.transpose(self.mu_cell_outputs, [1, 0, 2])
logvar_out = tf.transpose(self.logvar_cell_outputs, [1, 0, 2])
l_mu = tf.gather(mu_out, int(mu_out.get_shape()[0]) - 1)
l_logvar = tf.gather(logvar_out, int(logvar_out.get_shape()[0]) - 1)
with tf.name_scope('Wx_plus_b'):
# Outputs mu and log of variance
self.mu = tf.matmul(l_mu, Ws_mu) + bs_mu
self.logvar = tf.matmul(l_logvar, Ws_logvar) + bs_logvar
def _weight_variable(self, shape, name='weights'):
# Best for sigmoid
initializer = tf.contrib.layers.xavier_initializer()
# Best for relu
# initializer = tf.contrib.layers.variance_scaling_initializer()
return tf.get_variable(shape=shape, name=name, initializer=initializer)
def _bias_variable(self, shape, name='biases'):
# initializer = tf.constant_initializer(0.1)
initializer = tf.constant_initializer(0.)
return tf.get_variable(name=name, shape=shape, initializer=initializer)
# Should not need dropout on decoder. Have flattened all information to a
# code, don't want to throw any away.
class DecoderMLP(object):
def __init__(self, z, latent_dim, output_size, default_batch_size):
self.z = z
self.input_size = latent_dim
self.output_size = output_size
self.default_batch_size = default_batch_size
# tf Graph input
with tf.variable_scope('DecoderMLP'):
self.add_mlp()
def add_mlp(self):
# First hidden
w0 = self._weight_variable([self.input_size, self.output_size],
name='w0')
b0 = self._bias_variable([self.output_size, ], name='b0')
h0 = tf.matmul(self.z, w0) + b0
h0 = tf.nn.tanh(h0)
# Second hidden
w1 = self._weight_variable([self.output_size, self.output_size],
name='w1')
b1 = self._bias_variable([self.output_size, ], name='b1')
h1 = tf.matmul(h0, w1) + b1
h1 = tf.nn.tanh(h1)
# output layer-mu
wo = self._weight_variable([self.output_size, self.output_size],
name='wo')
bo = self._bias_variable([self.output_size], name='bo')
self.y = tf.nn.tanh(tf.matmul(h1, wo) + bo)
self.y = tf.reshape(self.y, [-1, time_steps, 1])
def _weight_variable(self, shape, name='weights'):
initializer = tf.contrib.layers.xavier_initializer()
return tf.get_variable(shape=shape, name=name, initializer=initializer)
def _bias_variable(self, shape, name='biases'):
initializer = tf.constant_initializer(0.)
return tf.get_variable(name=name, shape=shape, initializer=initializer)
class VAE(object):
def __init__(self, time_steps, features, latent_dim, encoder_hidden,
decoder_hidden, num_layers, default_batch_size,
learning_rate):
self.time_steps = time_steps
self.features = features
self.latent_dim = latent_dim
self.encoder_hidden = encoder_hidden
self.decoder_hidden = decoder_hidden
self.num_layers = num_layers
self.default_batch_size = default_batch_size
self.LR = learning_rate
with tf.name_scope('train_inputs'):
self.x = tf.placeholder("float", [None, self.time_steps,
self.features])
self.x_target = tf.placeholder("float", [None, self.time_steps,
1])
self.batch_size = tf.placeholder(dtype=tf.int32)
with tf.name_scope('test_inputs'):
self.x_encode = tf.placeholder("float", [1, self.time_steps,
self.features])
self.z_in = tf.placeholder("float", [1, self.latent_dim])
with tf.variable_scope('VAE'):
self.build_codec()
with tf.variable_scope('loss'):
self.build_loss()
with tf.name_scope('train'):
self.train_op = tf.train.RMSPropOptimizer(self.LR).minimize(
self.loss)
def build_codec(self):
self.encoder = EncoderRNN(self.x,
self.time_steps,
self.features,
self.latent_dim,
self.encoder_hidden,
self.num_layers,
self.default_batch_size)
self.mu = self.encoder.mu
self.logvar = self.encoder.logvar
epsilon = tf.random_normal(tf.shape(self.logvar), name='epsilon')
std_encoder = tf.exp(0.5 * self.logvar)
self.z = self.mu + tf.multiply(std_encoder, epsilon)
self.decoder = DecoderMLP(self.z, self.latent_dim, self.time_steps,
self.default_batch_size)
print("Built coder and decoder........................")
def build_loss(self):
print("Building loss functions........................")
self.y = tf.clip_by_value(self.decoder.y, GV, 1 - GV)
self.BCE = tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(
logits=self.y, labels=self.x_target), reduction_indices=1)
self.KLD = -0.5 * tf.reduce_sum(
1 + self.logvar - tf.square(self.mu) - tf.exp(self.logvar),
axis=1)
self.KLD = tf.reduce_mean(self.KLD)
self.BCE = tf.reduce_mean(self.BCE)
self.MSE = tf.reduce_sum(tf.squared_difference(self.x_target, self.y),
1)
# A mostly bogus idea.
self.loss = tf.reduce_mean(self.KLD + self.MSE * self.BCE)
print("Yee Ha!!! Built it!")
def plot_training(epoch_set, avg_set, mse_set):
print("Training Plot")
plt.close('all')
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.title('Training Loss')
plt.plot(epoch_set, avg_set, linestyle='--', marker='o',
ms=5, markerfacecolor='None',
markeredgecolor='blue', markeredgewidth=2,
label='training loss')
plt.xlabel('epoch')
plt.ylabel('average cost')
plt.legend()
plt.subplot(1, 2, 2)
plt.title('Training MSE')
plt.plot(epoch_set, mse_set, linestyle='--', marker='o',
ms=5, markerfacecolor='None',
markeredgecolor='green', markeredgewidth=2,
label='training mse')
plt.xlabel('epoch')
plt.ylabel('mse')
plt.show()
def attribute_color(cl):
colors = {0:'tab:blue', 1:'tab:orange', 2:'tab:green', 3:'tab:red',
4:'tab:purple', 5:'tab:brown', 6:'tab:pink', 7:'tab:gray',
8:'tab:olive', 9:'tab:cyan'}
return colors.get(cl)
def plot_var_layer(var_out, classes):
# plt.close('all')
color_cl = list()
for cl in classes:
color_cl.append(attribute_color(cl))
plt.figure(figsize=(8, 7))
plt.scatter(var_out[:, 0], var_out[:, 1], c=color_cl)
plt.title("Variational Layer Classes")
# plt.colorbar(ticks=np.array(labels))
#
# plt.colorbar(m)
#
# plt.colorbar()
plt.grid()
plt.show()
def run_model():
# Graph input
print("batch_size:", default_batch_size, "time_steps:",
"latent_dim:", latent_dim, "rnn_neurons:", rnn_neurons,
"dense_neurons:", dense_neurons, "num_layers:", num_layers,
time_steps, "features:", features, "learning_rate:", learning_rate)
tf.reset_default_graph()
vae = VAE(time_steps, features, latent_dim, rnn_neurons,
dense_neurons, num_layers, default_batch_size,
learning_rate)
sess = tf.Session()
saver = tf.train.Saver()
init = tf.global_variables_initializer()
sess.run(init)
avg_set = []
epoch_set = []
mse_set = []
total_batch = M // default_batch_size
for epoch in range(epochs):
# _current_state = init_zero
data = create_dataset(filename)
avg_cost = 0.0
for i in range(total_batch):
data_x, data_y, _ = next(data)
_train, _loss, mse, kld, bce = sess.run(
[vae.train_op, vae.loss, vae.MSE, vae.KLD,
vae.BCE],
# [vae.train_op],
feed_dict={vae.x: data_x,
vae.x_target: data_y,
vae.batch_size: default_batch_size})
avg_cost += _loss
avg_cost = avg_cost / total_batch
mse = mse.sum() / default_batch_size
avg_set.append(avg_cost)
epoch_set.append(epoch + 1)
mse_set.append(mse)
print("epoch %d: loss: %03.4f mse: %03.4f kld: %03.4f bce: %03.4f"
% (epoch, _loss, mse, kld, bce))
var_out = []
classes = []
# _current_state = init_zero
for i in range(200):
data_x, data_y, cl = next(data)
classes.append(cl)
_z = sess.run(
[vae.z], feed_dict={vae.x: data_x,
vae.batch_size: default_batch_size})
var_out.append(_z[0])
var_out = np.array(var_out)
var_out = var_out.reshape(-1, 2)
classes = np.array(classes)
classes = classes.reshape(-1)
sess.close()
plot_training(epoch_set, avg_set, mse_set)
plot_var_layer(var_out, classes)
# return mse, var_out
run_model()
Now to see whether the model can run with multi-variate input. Let's try decomposing the signals above into several sinusoids using the Fourier Series, and see if it can still separate the classes.
time_steps = 40
features = 7
class_dim = 3
default_batch_size = 1
N = time_steps
fr_bands = features
T = 1.0
def fourier_series(x, T=1.0, N=time_steps, fr_bands=10):
t2 = np.linspace(-T/2, T/2, N+1)
t = t2[1:]
dt = T / N
omega = 2.0 * np.pi / T
a0 = x.sum()
F = np.zeros((fr_bands, N))
# x = x - a0
for i in range(fr_bands):
a = (2. / T) * (x * np.cos((i + 1) * omega * t) * dt).sum()
b = (2. / T) * (x * np.sin((i + 1) * omega * t) * dt).sum()
f = a * np.cos((i + 1) * omega * t) \
+ b * np.sin((i + 1) * omega * t)
F[i] = f
# x = x - f
return F
def create_dataset(filename):
data = np.genfromtxt(filename, delimiter=',').astype(np.float32)
classes = data[:, -1].astype(np.int32)
data = data[:, :-1]
data = data/np.max(np.abs(data))
indexes = np.arange(data.shape[0])
while True:
samp = np.random.choice(indexes, default_batch_size)
Y = data[samp]
X = np.zeros((default_batch_size, time_steps, features))
for b in range(default_batch_size):
X[b, :, :] = fourier_series(Y[b], N=time_steps,
fr_bands=features).T
cl = classes[samp]
Y = Y.reshape(default_batch_size, time_steps, 1)
yield X, Y, cl
Now, let's have a look at what the data and the reconstruction looks like.
data = create_dataset(filename)
x, y, cl = next(data)
y = y.reshape(time_steps, ).astype('float64')
t2 = np.linspace(-T/2, T/2, N+1)
t = t2[1:]
# plt.close('all')
plt.plot(t, y, 'r', label="Orig. Sig.")
for i in range(fr_bands):
plt.plot(t, x[0, :, i], 'k--', alpha=0.2, label="sine {:d}".format(i))
plt.plot(t, x[0, :, :].sum(axis=1), 'g', label="Recon. Sig.")
plt.title("Fourier Series {:d} bands".format(fr_bands))
plt.legend()
plt.show()
This might not be enough resolution, but it might already be near what the RAM and CPU on my machine can deal with.
# Reset batch size
default_batch_size = 5
run_model()
It runs successfully with multi-variate time series input, and can still separate the 3 classes as I had hoped. So, now the challenge is to try this model with more realistic data, and to try to output an image of the Decoder hidden layers to see if they might capture a flattened version of the structure of the time series data fed to the Encoder. At present, I don't have a clear idea of how to produce useful images or other representations of the multi-layer perceptron inside the Decoder, but I will leave this to the next blog post, as this is already a lot work through for one post.