Here's some code that I've written for implementing a Convolutional Neural Network for recognising handwritten digits from the MNIST dataset over the last two days (after a lot of research into figuring out how to convert mathematical equations into code).
""" Convolutional Neural Network """
import numpy as np
import sklearn.datasets
import random
import math
from skimage.measure import block_reduce
from scipy.signal import convolve
import time
def reLU(z): # activation function
return z * (z > 0)
""" ------------------------------------------------------------------------------- """
class ConvPoolLayer:
def __init__(self, in_dim, filter_dim, pool_dim=None, conv_stride=1):
self.in_dim = in_dim
self.out_dim = (filter_dim[0], int(round(((0.0 + in_dim[-2] - filter_dim[-2]) / conv_stride + 1) / pool_dim[-2])), \
int(round(((0.0 + in_dim[-1] - filter_dim[-1]) / conv_stride + 1) / pool_dim[-1]))) \
if pool_dim \
else \
(num_filters, ((in_dim[-2] - filter_dim[-2]) / conv_stride + 1), \
((in_dim[-1] - filter_dim[-1]) / conv_stride + 1) )
self.filter_dim = filter_dim
self.pool_dim = pool_dim
self.W = np.random.randn(*filter_dim) * np.sqrt(2.0 / (sum(filter_dim))).astype(np.float32)
self.B = np.zeros(((in_dim[-1] - filter_dim[-1]) / conv_stride + 1, 1)).astype(np.float32)
def feedforward(self, x, W, b, step):
self.x = x.reshape(step, self.in_dim[-2], self.in_dim[-1])
activation = reLU(np.array([convolve(self.x, w, mode='valid') for w in W]) + b.reshape(1, -1, 1))
if self.pool_dim:
return block_reduce(activation, block_size=tuple([1] + list(self.pool_dim)), func=np.max)
else:
return activation
def backpropagate(self, delta, W, index):
delta = delta.reshape(len(W), 1, int((np.prod(delta.shape) // len(W)) ** 0.5), -1)
if self.pool_dim:
delta = delta.repeat(self.pool_dim[-2], axis=2).repeat(self.pool_dim[-1], axis=3) # may have to change this for maxpooling
dw = np.array([np.rot90(convolve(self.x[index].reshape(1, self.in_dim[-2], self.in_dim[-1]), np.rot90(d, 2), mode='valid'), 2) for d in delta])
db = np.sum(np.array([np.sum(d, axis=(1, )).reshape(-1, 1) for d in delta]), axis=0)
return None, dw, db
class FullyConnectedLayer:
def __init__(self, in_size, out_size):
self.in_size = in_size
self.out_size = out_size
self.W = np.random.randn(out_size, in_size) * np.sqrt(2.0 / (in_size + out_size)).astype(np.float32)
self.B = np.zeros((out_size, 1)).astype(np.float32)
def feedforward(self, x, w, b, step):
self.x = x.reshape(step, -1)
activation = reLU(np.dot(w, self.x.T) + b).T
return activation
def backpropagate(self, delta, w, index):
dw = np.multiply(delta, self.x[index])
db = delta
delta = np.dot(w.T, delta) * (self.x[index].reshape(-1, 1) > 0)
return delta, dw, db
class SoftmaxLayer:
def __init__(self, in_size, out_size):
self.in_size = in_size
self.out_size = out_size
self.W = np.random.randn(out_size, in_size) * np.sqrt(2.0 / (in_size + out_size)).astype(np.float32)
self.B = np.zeros((out_size, 1)).astype(np.float32)
def feedforward(self, x, w, b, step):
self.x = x.reshape(step, -1)
return reLU(np.dot(w, self.x.T) + b).T
def backpropagate(self, t, y, w, index):
t = np.exp(t)
t /= np.sum(t)
delta = (t - y) * (t > 0)
dw = np.multiply(delta, self.x[index])
db = delta
delta = np.dot(w.T, delta) * (self.x[index].reshape(-1, 1) > 0)
return delta, dw, db
""" ------------------------------------------------------------------------------- """
class ConvolutionalNeuralNet:
def __init__(self, layers, learning_rate=0.01, reg_lambda=0.05):
self.layers = []
self.W = []
self.B = []
for l in layers:
if l['type'].lower() == 'conv':
self.layers.append(ConvPoolLayer(**l['args']))
elif l['type'].lower() == 'fc':
self.layers.append(FullyConnectedLayer(**l['args']))
else:
self.layers.append(SoftmaxLayer(**l['args']))
self.layers[-1].layer_type = l['type']
self.W.append(self.layers[-1].W)
self.B.append(self.layers[-1].B)
self.W = np.array(self.W)
self.B = np.array(self.B)
self.num_layers = len(layers)
self.learning_rate = learning_rate
self.reg_lambda = reg_lambda
def __feedforward(self, x):
for i in range(len(self.layers)):
x = self.layers[i].feedforward(x, self.W[i], self.B[i], step=1)
return x
def __backpropagation(self, inputs, targets, is_val=False):
# forward pass
step = len(inputs)
for i in range(len(self.layers)):
inputs = self.layers[i].feedforward(inputs, self.W[i], self.B[i], step)
# backward pass
weight_gradients = np.array([np.zeros(w.shape) for w in self.W])
bias_gradients = np.array([np.zeros(b.shape) for b in self.B])
for i in range(len(targets)):
delta, dw, db = self.layers[-1].backpropagate(inputs[i].reshape(-1, 1), targets[i], self.W[-1], index=i)
weight_gradients[-1] += dw
bias_gradients[-1] += db
for j in xrange(2, self.num_layers + 1):
delta, dw, db = self.layers[-j].backpropagate(delta, self.W[-j], index=i)
weight_gradients[-j] += dw
bias_gradients[-j] += db
if is_val:
weight_gradients += self.reg_lambda * weight_gradients
self.W += -self.learning_rate * weight_gradients
self.B += -self.learning_rate * bias_gradients
def train(self, training_data, validation_data, epochs=10):
acc = 0
step, val_step = 25, 25
inputs = [data[0] for data in training_data]
targets = [data[1] for data in training_data]
val_inputs = [x[0] for x in validation_data]
val_targets = [x[1] for x in validation_data]
for i in xrange(epochs):
for j in xrange(0, len(inputs), step):
self.__backpropagation(np.array(inputs[j : j + step]), targets[j : j + step])
if validation_data:
for j in xrange(0, len(val_inputs), val_step):
self.__backpropagation(np.array(val_inputs[j : j + val_step]), val_targets[j : j + val_step], is_val=True)
print("{} epoch(s) done".format(i + 1))
# new_acc = CN.test(test_data)
# acc = new_acc
# print "Accuracy:", str(acc) + "%"
self.learning_rate -= self.learning_rate * 0.35
print("Training done.")
def test(self, test_data):
test_results = [(np.argmax(self.__feedforward(x[0])), np.argmax(x[1])) for x in test_data]
return float(sum([int(x == y) for (x, y) in test_results])) / len(test_data) * 100
def dump(self, file):
pickle.dump(self, open(file, "wb"))
if __name__ == "__main__":
global test_data
def transform_target(y):
t = np.zeros((10, 1))
t[int(y)] = 1.0
return t
total = 5000
training = int(total * 0.70)
val = int(total * 0.15)
test = int(total * 0.15)
mnist = sklearn.datasets.fetch_mldata('MNIST original', data_home='./data')
data = list(zip(mnist.data, mnist.target))
random.shuffle(data)
data = data[:total]
data = [(x[0].astype(bool).astype(int).reshape(-1,), transform_target(x[1])) for x in data]
train_data = data[:training]
val_data = data[training:training + val]
test_data = data[training + val:]
print "Data fetched"
CN = ConvolutionalNeuralNet(layers=[{ 'type': 'conv',
'args':
{ 'in_dim' : (1, 28, 28),
'filter_dim' : (1, 1, 3, 3), # number of filters, (z, x, y) dims
'pool_dim' : (1, 2, 2),
},
},
{ 'type': 'fc',
'args':
{
'in_size' : 1 * 169,
'out_size' : 50,
}
},
{ 'type': 'softmax',
'args':
{
'in_size' : 50,
'out_size' : 10,
}
}, ], learning_rate=0.01, reg_lambda=0.05)
s = time.time()
CN.train(train_data, val_data, epochs=3)
e = time.time()
print "Network trained"
print "Accuracy:", str(CN.test(test_data)) + "%"
print "Time taken: ", (e - s)
I have not used Theano or any other frameworks. My goal is:
- Optimise the feedforward and back-propagation functions for all layers
- Make the network robust under all training conditions (not sure if it is yet)
- Replace the existing scipy.signal.convolve and skimage.measure.block_reduce with implementations that are faster, if possible.
- Replace mean pooling with max pooling (I've kept mean pooling for now since it is easier to back-propagate).
I have also assumed, for the sake of simplicity, that:
- The first layer is always a convolutional layer
- There is only one convolutional layer in the network
Previously, I had made it possible to include more convolutional layers, but I was not sure whether I was back-propagating properly between convolutional layers, plus it was more generic so it ran slower.
Comments and suggestions welcome.