Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Aniket025
GitHub Repository: Aniket025/Medical-Prescription-OCR
Path: blob/master/Model-1/GapClassifier-BiRNN-Attention.ipynb
427 views
Kernel: Python 3

CNN with Bidirctional RNN - Char Classification

Using TensorFlow

TODO

Try tweak POS_SPAN.... (Chagne completely) Impl CNN inside model
import numpy as np import pandas as pd import matplotlib.pyplot as plt import math import tensorflow as tf import tensorflow.contrib.seq2seq as seq2seq import cv2 import time %matplotlib notebook # Increase size of plots plt.rcParams['figure.figsize'] = (9.0, 5.0) # Helpers from ocr.helpers import implt from ocr.mlhelpers import TrainingPlot, DataSet from ocr.imgtransform import coordinates_remap from ocr.datahelpers import loadWordsData, correspondingShuffle from ocr.tfhelpers import Graph, create_cell tf.reset_default_graph() sess = tf.InteractiveSession() print("OpenCV: " + cv2.__version__) print("Numpy: " + np.__version__) print("TensorFlow: " + tf.__version__)
OpenCV: 3.1.0 Numpy: 1.13.1 TensorFlow: 1.3.0

Loading Images

images, _, gaplines = loadWordsData(['data/words/'], loadGaplines=True)
Loading words... Number of Images: 1008

Settings

PAD = 0 # Value for PADding images POS = 1 # Values of positive and negative label 0/-1 NEG = 0 POS_SPAN = 1 # Number of positive values around true position (5 is too high) POS_WEIGHT = 10 # Weighting possitive values in loss counting slider_size = (60, 30) # Height is set to 60 by data and width should be even slider_step = 2 # Number of pixels slider moving N_INPUT = 1800 # Size of sequence input vector will depend on CNN num_buckets = 10 n_classes = 2 # Number of different outputs rnn_layers = 2 # 4 - 2 - 256 rnn_residual_layers = 1 # HAVE TO be smaller than encoder_layers rnn_units = 128 attention_size = 64 learning_rate = 3e-4 # 1e-4 dropout = 0.3 # Percentage of dopped out data train_set = 0.8 # Percentage of training data TRAIN_STEPS = 500000 # Number of training steps! TEST_ITER = 150 LOSS_ITER = 50 SAVE_ITER = 2000 BATCH_SIZE = 32 # EPOCH = 2000 # "Number" of batches in epoch # Function for preprocessing images graph = Graph('models/gap-clas/CNN-CG', operation='h_conv2_flat') def imgPreprocessor(x): # Cut returned data to N_INPUT size - no need for everything return graph.run(np.reshape(x, (1, 1800)))[0][:N_INPUT]
INFO:tensorflow:Restoring parameters from models/gap-clas/CNN-CG

Dataset

# Shuffle data images, gaplines = correspondingShuffle([images, gaplines]) for i in range(len(images)): # Add border and offset gaplines - RUN ONLY ONCE images[i] = cv2.copyMakeBorder(images[i], 0, 0, int(slider_size[1]/2), int(slider_size[1]/2), cv2.BORDER_CONSTANT, value=0) gaplines[i] += int(slider_size[1] / 2) # Image standardization same as tf.image.per_image_standardization for i in range(len(images)): images[i] = (images[i] - np.mean(images[i])) / max(np.std(images[i]), 1.0/math.sqrt(images[i].size)) # Split data on train and test dataset div = int(train_set * len(images)) trainImages = images[0:div] testImages = images[div:] trainGaplines = gaplines[0:div] testGaplines = gaplines[div:] print("Training images:", div) print("Testing images:", len(images) - div)
Training images: 806 Testing images: 202
class BucketDataIterator(): """ Iterator for feeding seq2seq model during training """ def __init__(self, images, gaplines, gap_span, num_buckets=5, slider=(60, 30), slider_step=2, imgprocess=lambda x: x, train=True): self.train = train # self.slider = slider # self.slider_step = slider_step length = [(image.shape[1]-slider[1])//slider_step for image in images] # Creating indices from gaplines indices = gaplines - int(slider[1]/2) indices = indices // slider_step # Split images to sequence of vectors # + targets seq of labels per image in images seq images_seq = np.empty(len(images), dtype=object) targets_seq = np.empty(len(images), dtype=object) for i, img in enumerate(images): images_seq[i] = [imgprocess(img[:, loc * slider_step: loc * slider_step + slider[1]].flatten()) for loc in range(length[i])] targets_seq[i] = np.ones((length[i])) * NEG for offset in range(gap_span): ind = indices[i] + (-(offset % 2) * offset // 2) + ((1 - offset%2) * offset // 2) if ind[0] < 0: ind[0] = 0 if ind[-1] >= length[i]: ind[-1] = length[i] - 1 targets_seq[i][ind] = POS # Create pandas dataFrame and sort it by images seq lenght (length) # in_length == out_length self.dataFrame = pd.DataFrame({'length': length, 'images': images_seq, 'targets': targets_seq }).sort_values('length').reset_index(drop=True) bsize = int(len(images) / num_buckets) self.num_buckets = num_buckets # Create buckets by slicing parts by indexes self.buckets = [] for bucket in range(num_buckets-1): self.buckets.append(self.dataFrame.iloc[bucket * bsize: (bucket+1) * bsize]) self.buckets.append(self.dataFrame.iloc[(num_buckets-1) * bsize:]) self.buckets_size = [len(bucket) for bucket in self.buckets] # cursor[i] will be the cursor for the ith bucket self.cursor = np.array([0] * num_buckets) self.bucket_order = np.random.permutation(num_buckets) self.bucket_cursor = 0 self.shuffle() print("Iterator created.") def shuffle(self, idx=None): """ Shuffle idx bucket or each bucket separately """ for i in [idx] if idx is not None else range(self.num_buckets): self.buckets[i] = self.buckets[i].sample(frac=1).reset_index(drop=True) self.cursor[i] = 0 def next_batch(self, batch_size): """ Creates next training batch of size: batch_size Retruns: image seq, letter seq, image seq lengths, letter seq lengths """ i_bucket = self.bucket_order[self.bucket_cursor] # Increment cursor and shuffle in case of new round self.bucket_cursor = (self.bucket_cursor + 1) % self.num_buckets if self.bucket_cursor == 0: self.bucket_order = np.random.permutation(self.num_buckets) if self.cursor[i_bucket] + batch_size > self.buckets_size[i_bucket]: self.shuffle(i_bucket) # Handle too big batch sizes if (batch_size > self.buckets_size[i_bucket]): batch_size = self.buckets_size[i_bucket] res = self.buckets[i_bucket].iloc[self.cursor[i_bucket]: self.cursor[i_bucket]+batch_size] self.cursor[i_bucket] += batch_size # PAD input sequence and output # Pad sequences with <PAD> to same length max_length = max(res['length']) input_seq = np.zeros((batch_size, max_length, N_INPUT), dtype=np.float32) for i, img in enumerate(res['images']): input_seq[i][:res['length'].values[i]] = img input_seq = input_seq.swapaxes(0, 1) # Need to pad according to the maximum length output sequence targets = np.ones([batch_size, max_length], dtype=np.float32) * PAD for i, target in enumerate(targets): target[:res['length'].values[i]] = res['targets'].values[i] return input_seq, targets, res['length'].values def next_feed(self, size): """ Create feed directly for model training """ (inputs_, targets_, length_) = self.next_batch(size) return { inputs: inputs_, targets: targets_, length: length_, keep_prob: (1.0 - dropout) if self.train else 1.0 }
# Create iterator for feeding BiRNN train_iterator = BucketDataIterator(trainImages, trainGaplines, POS_SPAN, num_buckets, slider_size, slider_step, # imgPreprocessor, train=True) test_iterator = BucketDataIterator(testImages, testGaplines, POS_SPAN, 2, slider_size, slider_step, # imgPreprocessor, train=False)
Iterator created. Iterator created.

Create classifier

Inputs

# Input placehodlers # N_INPUT -> size of vector representing one image in sequence # Inputs shape (max_seq_length, batch_size, vec_size) - time major inputs = tf.placeholder(shape=(None, None, N_INPUT), dtype=tf.float32, name='inputs') length = tf.placeholder(shape=(None,), dtype=tf.int32, name='length') # required for training, not required for testing and application targets = tf.placeholder(shape=(None, None), dtype=tf.int64, name='targets') # Dropout value keep_prob = tf.placeholder(tf.float32, name='keep_prob') sequence_size, batch_size, _ = tf.unstack(tf.shape(inputs))

Standardization + CNN

def CNN(img): pass # inpts = tf.map_fn( # lambda seq: tf.map_fn( # lambda img: tf.reshape( # tf.image.per_image_standardization(tf.reshape(img, [60, 30, 1])), [-1]), seq), # inputs, # dtype=tf.float32) inpts = inputs

Bi-RNN

# Cells cell_fw = create_cell(rnn_units, rnn_layers, rnn_residual_layers, is_dropout=True, keep_prob=keep_prob) cell_bw = create_cell(rnn_units, rnn_layers, rnn_residual_layers, is_dropout=True, keep_prob=keep_prob)
# Bidirectional RNN enc_outputs, enc_state = tf.nn.bidirectional_dynamic_rnn( cell_fw = cell_fw, cell_bw = cell_bw, inputs = inpts, sequence_length = length, dtype = tf.float32, time_major = True) encoder_outputs = tf.concat(enc_outputs, -1) if rnn_layers == 1: encoder_state = enc_state else: encoder_state = [] for layer_id in range(rnn_layers): encoder_state.append(enc_state[0][layer_id]) # forward encoder_state.append(enc_state[1][layer_id]) # backward encoder_state = tuple(encoder_state)

Attention

# attention_states: size [batch_size, max_time, num_units] attention_states = tf.transpose(encoder_outputs, [1, 0, 2]) # Create an attention mechanism attention_mechanism = tf.contrib.seq2seq.LuongAttention( rnn_units, attention_states, memory_sequence_length=length) final_cell = create_cell(rnn_units, 2*rnn_layers, 2*rnn_residual_layers, is_dropout=True, keep_prob=keep_prob) final_cell = seq2seq.AttentionWrapper( final_cell, attention_mechanism, attention_layer_size=attention_size) final_initial_state = final_cell.zero_state(batch_size, tf.float32).clone( cell_state=encoder_state) attention_output, _ = tf.nn.dynamic_rnn( cell=final_cell, inputs=attention_states, sequence_length=length, initial_state=final_initial_state, dtype = tf.float32)
# pred = tf.matmul(outputs, W) # pred = tf.scan(lambda a, x: tf.matmul(x, W), outputs, infer_shape=False) pred = tf.layers.dense(inputs=attention_output, units=2, name='pred') prediction = tf.argmax(pred, axis=-1, name='prediction')

Optimizer

# Define loss and optimizer # loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=pred, labels=targets), name='loss') weights = tf.multiply(targets, POS_WEIGHT) + 1 loss = tf.reduce_mean(tf.losses.sparse_softmax_cross_entropy( logits=pred, labels=targets, weights=weights), name='loss') train_step = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss, name='train_step') # Evaluate model correct_pred = tf.equal(prediction, targets) accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32)) # accuracy = tf.reduce_mean(prediction * targets) # Testing for only zero predictions

Training

sess.run(tf.global_variables_initializer()) saver = tf.train.Saver() # Creat plot for live stats ploting trainPlot = TrainingPlot(TRAIN_STEPS, TEST_ITER, LOSS_ITER) try: for i_batch in range(TRAIN_STEPS): fd = train_iterator.next_feed(BATCH_SIZE) train_step.run(fd) if i_batch % LOSS_ITER == 0: # Plotting loss tmpLoss = loss.eval(fd) trainPlot.updateCost(tmpLoss, i_batch // LOSS_ITER) if i_batch % TEST_ITER == 0: # Plotting accuracy fd_test = test_iterator.next_feed(BATCH_SIZE) accTest = accuracy.eval(fd_test) accTrain = accuracy.eval(fd) trainPlot.updateAcc(accTest, accTrain, i_batch // TEST_ITER) if i_batch % SAVE_ITER == 0: saver.save(sess, 'models/gap-clas/RNN-A/Bi-RNN') # if i_batch % EPOCH == 0: # fd_test = test_iterator.next_feed(BATCH_SIZE) # print('batch %r - loss: %r' % (i_batch, sess.run(loss, fd_test))) # predict_, target_ = sess.run([pred, targets], fd_test) # for i, (inp, pred) in enumerate(zip(target_, predict_)): # print(' expected > {}'.format(inp)) # print(' predicted > {}'.format(pred)) # break # print() except KeyboardInterrupt: saver.save(sess, 'models/gap-clas/RNN-A/Bi-RNN') print('Training interrupted, model saved.') fd_test = test_iterator.next_feed(2*BATCH_SIZE) accTest = accuracy.eval(fd_test) print("Training finished with accuracy:", accTest)
<IPython.core.display.Javascript object>
Training interrupted, model saved. Training finished with accuracy: 0.962598
% matplotlib inline # sess.close() # gr = Graph('models/gap-clas/RNN-A/Bi-RNN', # operation='prediction') num_examples = 5 # Shuffle test images testImages = testImages[np.random.permutation(len(testImages))] imgs = testImages[:num_examples] # Split images to sequence of vectors length = [(image.shape[1]-slider_size[1])//slider_step for image in imgs] images_seq = np.empty(num_examples, dtype=object) for i, img in enumerate(imgs): images_seq[i] = np.array([img[:, loc * slider_step: loc * slider_step + slider_size[1]].flatten() for loc in range(length[i])], dtype=np.float32) # Create predictions using trained model test_pred = [] for i, inpt in enumerate(images_seq): img = imgs[i].copy() # img = cv2.cvtColor(imgs[i].astype(np.float32), cv2.COLOR_GRAY2RGB) pr = prediction.eval({ 'inputs:0': [inpt], 'length:0': [len(inpt)], 'keep_prob:0': 1.0}) for pos, g in enumerate(pr[0]): if g == 1: cv2.line(img, ((int)(15 + pos*slider_step), 0), ((int)(15 + pos*slider_step), slider_size[0]), 1, 1) implt(img, 'gray', t=str(i))