Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aimacode
GitHub Repository: aimacode/aima-python
Path: blob/master/deep_learning4e.py
615 views
1
"""Deep learning. (Chapters 20)"""
2
3
import random
4
import statistics
5
6
import numpy as np
7
from keras import Sequential, optimizers
8
from keras.layers import Embedding, SimpleRNN, Dense
9
from keras.preprocessing import sequence
10
11
from utils4e import (conv1D, gaussian_kernel, element_wise_product, vector_add, random_weights,
12
scalar_vector_product, map_vector, mean_squared_error_loss)
13
14
15
class Node:
16
"""
17
A single unit of a layer in a neural network
18
:param weights: weights between parent nodes and current node
19
:param value: value of current node
20
"""
21
22
def __init__(self, weights=None, value=None):
23
self.value = value
24
self.weights = weights or []
25
26
27
class Layer:
28
"""
29
A layer in a neural network based on a computational graph.
30
:param size: number of units in the current layer
31
"""
32
33
def __init__(self, size):
34
self.nodes = np.array([Node() for _ in range(size)])
35
36
def forward(self, inputs):
37
"""Define the operation to get the output of this layer"""
38
raise NotImplementedError
39
40
41
class Activation:
42
43
def function(self, x):
44
return NotImplementedError
45
46
def derivative(self, x):
47
return NotImplementedError
48
49
def __call__(self, x):
50
return self.function(x)
51
52
53
class Sigmoid(Activation):
54
55
def function(self, x):
56
return 1 / (1 + np.exp(-x))
57
58
def derivative(self, value):
59
return value * (1 - value)
60
61
62
class ReLU(Activation):
63
64
def function(self, x):
65
return max(0, x)
66
67
def derivative(self, value):
68
return 1 if value > 0 else 0
69
70
71
class ELU(Activation):
72
73
def __init__(self, alpha=0.01):
74
self.alpha = alpha
75
76
def function(self, x):
77
return x if x > 0 else self.alpha * (np.exp(x) - 1)
78
79
def derivative(self, value):
80
return 1 if value > 0 else self.alpha * np.exp(value)
81
82
83
class LeakyReLU(Activation):
84
85
def __init__(self, alpha=0.01):
86
self.alpha = alpha
87
88
def function(self, x):
89
return max(x, self.alpha * x)
90
91
def derivative(self, value):
92
return 1 if value > 0 else self.alpha
93
94
95
class Tanh(Activation):
96
97
def function(self, x):
98
return np.tanh(x)
99
100
def derivative(self, value):
101
return 1 - (value ** 2)
102
103
104
class SoftMax(Activation):
105
106
def function(self, x):
107
return np.exp(x) / np.sum(np.exp(x))
108
109
def derivative(self, x):
110
return np.ones_like(x)
111
112
113
class SoftPlus(Activation):
114
115
def function(self, x):
116
return np.log(1. + np.exp(x))
117
118
def derivative(self, x):
119
return 1. / (1. + np.exp(-x))
120
121
122
class Linear(Activation):
123
124
def function(self, x):
125
return x
126
127
def derivative(self, x):
128
return np.ones_like(x)
129
130
131
class InputLayer(Layer):
132
"""1D input layer. Layer size is the same as input vector size."""
133
134
def __init__(self, size=3):
135
super().__init__(size)
136
137
def forward(self, inputs):
138
"""Take each value of the inputs to each unit in the layer."""
139
assert len(self.nodes) == len(inputs)
140
for node, inp in zip(self.nodes, inputs):
141
node.value = inp
142
return inputs
143
144
145
class OutputLayer(Layer):
146
"""1D softmax output layer in 19.3.2."""
147
148
def __init__(self, size=3):
149
super().__init__(size)
150
151
def forward(self, inputs, activation=SoftMax):
152
assert len(self.nodes) == len(inputs)
153
res = activation().function(inputs)
154
for node, val in zip(self.nodes, res):
155
node.value = val
156
return res
157
158
159
class DenseLayer(Layer):
160
"""
161
1D dense layer in a neural network.
162
:param in_size: (int) input vector size
163
:param out_size: (int) output vector size
164
:param activation: (Activation object) activation function
165
"""
166
167
def __init__(self, in_size=3, out_size=3, activation=Sigmoid):
168
super().__init__(out_size)
169
self.out_size = out_size
170
self.inputs = None
171
self.activation = activation()
172
# initialize weights
173
for node in self.nodes:
174
node.weights = random_weights(-0.5, 0.5, in_size)
175
176
def forward(self, inputs):
177
self.inputs = inputs
178
res = []
179
# get the output value of each unit
180
for unit in self.nodes:
181
val = self.activation.function(np.dot(unit.weights, inputs))
182
unit.value = val
183
res.append(val)
184
return res
185
186
187
class ConvLayer1D(Layer):
188
"""
189
1D convolution layer of in neural network.
190
:param kernel_size: convolution kernel size
191
"""
192
193
def __init__(self, size=3, kernel_size=3):
194
super().__init__(size)
195
# init convolution kernel as gaussian kernel
196
for node in self.nodes:
197
node.weights = gaussian_kernel(kernel_size)
198
199
def forward(self, features):
200
# each node in layer takes a channel in the features
201
assert len(self.nodes) == len(features)
202
res = []
203
# compute the convolution output of each channel, store it in node.val
204
for node, feature in zip(self.nodes, features):
205
out = conv1D(feature, node.weights)
206
res.append(out)
207
node.value = out
208
return res
209
210
211
class MaxPoolingLayer1D(Layer):
212
"""
213
1D max pooling layer in a neural network.
214
:param kernel_size: max pooling area size
215
"""
216
217
def __init__(self, size=3, kernel_size=3):
218
super().__init__(size)
219
self.kernel_size = kernel_size
220
self.inputs = None
221
222
def forward(self, features):
223
assert len(self.nodes) == len(features)
224
res = []
225
self.inputs = features
226
# do max pooling for each channel in features
227
for i in range(len(self.nodes)):
228
feature = features[i]
229
# get the max value in a kernel_size * kernel_size area
230
out = [max(feature[i:i + self.kernel_size])
231
for i in range(len(feature) - self.kernel_size + 1)]
232
res.append(out)
233
self.nodes[i].value = out
234
return res
235
236
237
class BatchNormalizationLayer(Layer):
238
"""Batch normalization layer."""
239
240
def __init__(self, size, eps=0.001):
241
super().__init__(size)
242
self.eps = eps
243
# self.weights = [beta, gamma]
244
self.weights = [0, 0]
245
self.inputs = None
246
247
def forward(self, inputs):
248
# mean value of inputs
249
mu = sum(inputs) / len(inputs)
250
# standard error of inputs
251
stderr = statistics.stdev(inputs)
252
self.inputs = inputs
253
res = []
254
# get normalized value of each input
255
for i in range(len(self.nodes)):
256
val = [(inputs[i] - mu) * self.weights[0] / np.sqrt(self.eps + stderr ** 2) + self.weights[1]]
257
res.append(val)
258
self.nodes[i].value = val
259
return res
260
261
262
def init_examples(examples, idx_i, idx_t, o_units):
263
"""Init examples from dataset.examples."""
264
265
inputs, targets = {}, {}
266
for i, e in enumerate(examples):
267
# input values of e
268
inputs[i] = [e[i] for i in idx_i]
269
270
if o_units > 1:
271
# one-hot representation of e's target
272
t = [0 for i in range(o_units)]
273
t[e[idx_t]] = 1
274
targets[i] = t
275
else:
276
# target value of e
277
targets[i] = [e[idx_t]]
278
279
return inputs, targets
280
281
282
def stochastic_gradient_descent(dataset, net, loss, epochs=1000, l_rate=0.01, batch_size=1, verbose=False):
283
"""
284
Gradient descent algorithm to update the learnable parameters of a network.
285
:return: the updated network
286
"""
287
examples = dataset.examples # init data
288
289
for e in range(epochs):
290
total_loss = 0
291
random.shuffle(examples)
292
weights = [[node.weights for node in layer.nodes] for layer in net]
293
294
for batch in get_batch(examples, batch_size):
295
inputs, targets = init_examples(batch, dataset.inputs, dataset.target, len(net[-1].nodes))
296
# compute gradients of weights
297
gs, batch_loss = BackPropagation(inputs, targets, weights, net, loss)
298
# update weights with gradient descent
299
weights = [x + y for x, y in zip(weights, [np.array(tg) * -l_rate for tg in gs])]
300
total_loss += batch_loss
301
302
# update the weights of network each batch
303
for i in range(len(net)):
304
if weights[i].size != 0:
305
for j in range(len(weights[i])):
306
net[i].nodes[j].weights = weights[i][j]
307
308
if verbose:
309
print("epoch:{}, total_loss:{}".format(e + 1, total_loss))
310
311
return net
312
313
314
def adam(dataset, net, loss, epochs=1000, rho=(0.9, 0.999), delta=1 / 10 ** 8,
315
l_rate=0.001, batch_size=1, verbose=False):
316
"""
317
[Figure 19.6]
318
Adam optimizer to update the learnable parameters of a network.
319
Required parameters are similar to gradient descent.
320
:return the updated network
321
"""
322
examples = dataset.examples
323
324
# init s,r and t
325
s = [[[0] * len(node.weights) for node in layer.nodes] for layer in net]
326
r = [[[0] * len(node.weights) for node in layer.nodes] for layer in net]
327
t = 0
328
329
# repeat util converge
330
for e in range(epochs):
331
# total loss of each epoch
332
total_loss = 0
333
random.shuffle(examples)
334
weights = [[node.weights for node in layer.nodes] for layer in net]
335
336
for batch in get_batch(examples, batch_size):
337
t += 1
338
inputs, targets = init_examples(batch, dataset.inputs, dataset.target, len(net[-1].nodes))
339
340
# compute gradients of weights
341
gs, batch_loss = BackPropagation(inputs, targets, weights, net, loss)
342
343
# update s,r,s_hat and r_gat
344
s = vector_add(scalar_vector_product(rho[0], s),
345
scalar_vector_product((1 - rho[0]), gs))
346
r = vector_add(scalar_vector_product(rho[1], r),
347
scalar_vector_product((1 - rho[1]), element_wise_product(gs, gs)))
348
s_hat = scalar_vector_product(1 / (1 - rho[0] ** t), s)
349
r_hat = scalar_vector_product(1 / (1 - rho[1] ** t), r)
350
351
# rescale r_hat
352
r_hat = map_vector(lambda x: 1 / (np.sqrt(x) + delta), r_hat)
353
354
# delta weights
355
delta_theta = scalar_vector_product(-l_rate, element_wise_product(s_hat, r_hat))
356
weights = vector_add(weights, delta_theta)
357
total_loss += batch_loss
358
359
# update the weights of network each batch
360
for i in range(len(net)):
361
if weights[i]:
362
for j in range(len(weights[i])):
363
net[i].nodes[j].weights = weights[i][j]
364
365
if verbose:
366
print("epoch:{}, total_loss:{}".format(e + 1, total_loss))
367
368
return net
369
370
371
def BackPropagation(inputs, targets, theta, net, loss):
372
"""
373
The back-propagation algorithm for multilayer networks in only one epoch, to calculate gradients of theta.
374
:param inputs: a batch of inputs in an array. Each input is an iterable object
375
:param targets: a batch of targets in an array. Each target is an iterable object
376
:param theta: parameters to be updated
377
:param net: a list of predefined layer objects representing their linear sequence
378
:param loss: a predefined loss function taking array of inputs and targets
379
:return: gradients of theta, loss of the input batch
380
"""
381
382
assert len(inputs) == len(targets)
383
o_units = len(net[-1].nodes)
384
n_layers = len(net)
385
batch_size = len(inputs)
386
387
gradients = [[[] for _ in layer.nodes] for layer in net]
388
total_gradients = [[[0] * len(node.weights) for node in layer.nodes] for layer in net]
389
390
batch_loss = 0
391
392
# iterate over each example in batch
393
for e in range(batch_size):
394
i_val = inputs[e]
395
t_val = targets[e]
396
397
# forward pass and compute batch loss
398
for i in range(1, n_layers):
399
layer_out = net[i].forward(i_val)
400
i_val = layer_out
401
batch_loss += loss(t_val, layer_out)
402
403
# initialize delta
404
delta = [[] for _ in range(n_layers)]
405
406
previous = np.array([layer_out[i] - t_val[i] for i in range(o_units)])
407
h_layers = n_layers - 1
408
409
# backward pass
410
for i in range(h_layers, 0, -1):
411
layer = net[i]
412
derivative = np.array([layer.activation.derivative(node.value) for node in layer.nodes])
413
delta[i] = previous * derivative
414
# pass to layer i-1 in the next iteration
415
previous = np.matmul([delta[i]], theta[i])[0]
416
# compute gradient of layer i
417
gradients[i] = [scalar_vector_product(d, net[i].inputs) for d in delta[i]]
418
419
# add gradient of current example to batch gradient
420
total_gradients = vector_add(total_gradients, gradients)
421
422
return total_gradients, batch_loss
423
424
425
def get_batch(examples, batch_size=1):
426
"""Split examples into multiple batches"""
427
for i in range(0, len(examples), batch_size):
428
yield examples[i: i + batch_size]
429
430
431
class NeuralNetworkLearner:
432
"""
433
Simple dense multilayer neural network.
434
:param hidden_layer_sizes: size of hidden layers in the form of a list
435
"""
436
437
def __init__(self, dataset, hidden_layer_sizes, l_rate=0.01, epochs=1000, batch_size=10,
438
optimizer=stochastic_gradient_descent, loss=mean_squared_error_loss, verbose=False, plot=False):
439
self.dataset = dataset
440
self.l_rate = l_rate
441
self.epochs = epochs
442
self.batch_size = batch_size
443
self.optimizer = optimizer
444
self.loss = loss
445
self.verbose = verbose
446
self.plot = plot
447
448
input_size = len(dataset.inputs)
449
output_size = len(dataset.values[dataset.target])
450
451
# initialize the network
452
raw_net = [InputLayer(input_size)]
453
# add hidden layers
454
hidden_input_size = input_size
455
for h_size in hidden_layer_sizes:
456
raw_net.append(DenseLayer(hidden_input_size, h_size))
457
hidden_input_size = h_size
458
raw_net.append(DenseLayer(hidden_input_size, output_size))
459
self.raw_net = raw_net
460
461
def fit(self, X, y):
462
self.learned_net = self.optimizer(self.dataset, self.raw_net, loss=self.loss, epochs=self.epochs,
463
l_rate=self.l_rate, batch_size=self.batch_size, verbose=self.verbose)
464
return self
465
466
def predict(self, example):
467
n_layers = len(self.learned_net)
468
469
layer_input = example
470
layer_out = example
471
472
# get the output of each layer by forward passing
473
for i in range(1, n_layers):
474
layer_out = self.learned_net[i].forward(np.array(layer_input).reshape((-1, 1)))
475
layer_input = layer_out
476
477
return layer_out.index(max(layer_out))
478
479
480
class PerceptronLearner:
481
"""
482
Simple perceptron neural network.
483
"""
484
485
def __init__(self, dataset, l_rate=0.01, epochs=1000, batch_size=10, optimizer=stochastic_gradient_descent,
486
loss=mean_squared_error_loss, verbose=False, plot=False):
487
self.dataset = dataset
488
self.l_rate = l_rate
489
self.epochs = epochs
490
self.batch_size = batch_size
491
self.optimizer = optimizer
492
self.loss = loss
493
self.verbose = verbose
494
self.plot = plot
495
496
input_size = len(dataset.inputs)
497
output_size = len(dataset.values[dataset.target])
498
499
# initialize the network, add dense layer
500
self.raw_net = [InputLayer(input_size), DenseLayer(input_size, output_size)]
501
502
def fit(self, X, y):
503
self.learned_net = self.optimizer(self.dataset, self.raw_net, loss=self.loss, epochs=self.epochs,
504
l_rate=self.l_rate, batch_size=self.batch_size, verbose=self.verbose)
505
return self
506
507
def predict(self, example):
508
layer_out = self.learned_net[1].forward(np.array(example).reshape((-1, 1)))
509
return layer_out.index(max(layer_out))
510
511
512
def keras_dataset_loader(dataset, max_length=500):
513
"""
514
Helper function to load keras datasets.
515
:param dataset: keras data set type
516
:param max_length: max length of each input sequence
517
"""
518
# init dataset
519
(X_train, y_train), (X_val, y_val) = dataset
520
if max_length > 0:
521
X_train = sequence.pad_sequences(X_train, maxlen=max_length)
522
X_val = sequence.pad_sequences(X_val, maxlen=max_length)
523
return (X_train[10:], y_train[10:]), (X_val, y_val), (X_train[:10], y_train[:10])
524
525
526
def SimpleRNNLearner(train_data, val_data, epochs=2, verbose=False):
527
"""
528
RNN example for text sentimental analysis.
529
:param train_data: a tuple of (training data, targets)
530
Training data: ndarray taking training examples, while each example is coded by embedding
531
Targets: ndarray taking targets of each example. Each target is mapped to an integer
532
:param val_data: a tuple of (validation data, targets)
533
:param epochs: number of epochs
534
:param verbose: verbosity mode
535
:return: a keras model
536
"""
537
538
total_inputs = 5000
539
input_length = 500
540
541
# init data
542
X_train, y_train = train_data
543
X_val, y_val = val_data
544
545
# init a the sequential network (embedding layer, rnn layer, dense layer)
546
model = Sequential()
547
model.add(Embedding(total_inputs, 32, input_length=input_length))
548
model.add(SimpleRNN(units=128))
549
model.add(Dense(1, activation='sigmoid'))
550
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
551
552
# train the model
553
model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=epochs, batch_size=128, verbose=verbose)
554
555
return model
556
557
558
def AutoencoderLearner(inputs, encoding_size, epochs=200, verbose=False):
559
"""
560
Simple example of linear auto encoder learning producing the input itself.
561
:param inputs: a batch of input data in np.ndarray type
562
:param encoding_size: int, the size of encoding layer
563
:param epochs: number of epochs
564
:param verbose: verbosity mode
565
:return: a keras model
566
"""
567
568
# init data
569
input_size = len(inputs[0])
570
571
# init model
572
model = Sequential()
573
model.add(Dense(encoding_size, input_dim=input_size, activation='relu', kernel_initializer='random_uniform',
574
bias_initializer='ones'))
575
model.add(Dense(input_size, activation='relu', kernel_initializer='random_uniform', bias_initializer='ones'))
576
577
# update model with sgd
578
sgd = optimizers.SGD(lr=0.01)
579
model.compile(loss='mean_squared_error', optimizer=sgd, metrics=['accuracy'])
580
581
# train the model
582
model.fit(inputs, inputs, epochs=epochs, batch_size=10, verbose=verbose)
583
584
return model
585
586