Implementing an RNN for spam prediction

In this section, we will see how to implement an RNN in TensorFlow to predict spam/ham from texts.

Data description and preprocessing

The popular spam dataset from the UCI ML repository will be used, which can be downloaded from

The dataset contains texts from several emails, some of which were marked as spam. Here we will train a model that will learn to distinguish between spam and non-spam emails using only the text of the email. Let's get started by importing the required libraries and model:

import os
import re
import io
import requests
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from zipfile import ZipFile
from tensorflow.python.framework import ops
import warnings

Additionally, we can stop printing the warning produced by TensorFlow if you want:

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

Now, let's create the TensorFlow session for the graph:

sess = tf.Session()

The next task is to set the RNN parameters:

epochs = 300
batch_size = 250
max_sequence_length = 25
rnn_size = 10
embedding_size = 50
min_word_frequency = 10
learning_rate = 0.0001
dropout_keep_prob = tf.placeholder(tf.float32)

Let's manually download the dataset and store it in a text_data.txt file in the temp directory. First, we set the path:

data_dir = 'temp'
data_file = 'text_data.txt'
if not os.path.exists(data_dir):

Now, we directly download the dataset in zipped format:

if not os.path.isfile(os.path.join(data_dir, data_file)):
    zip_url = ''
    r = requests.get(zip_url)
    z = ZipFile(io.BytesIO(r.content))
    file ='SMSSpamCollection')

We still need to format the data:

    text_data = file.decode()
    text_data = text_data.encode('ascii',errors='ignore')
    text_data = text_data.decode().split('

Now, store in it the directory mentioned earlier in a text file:

    with open(os.path.join(data_dir, data_file), 'w') as file_conn:
        for text in text_data:
    text_data = []
    with open(os.path.join(data_dir, data_file), 'r') as file_conn:
        for row in file_conn:
    text_data = text_data[:-1]

Let's split the words that have a word length of at least 2:

text_data = [x.split('	') for x in text_data if len(x)>=1]
[text_data_target, text_data_train] = [list(x) for x in zip(*text_data)]

Now we create a text cleaning function:

def clean_text(text_string):
    text_string = re.sub(r'([^sw]|_|[0-9])+', '', text_string)
    text_string = " ".join(text_string.split())
    text_string = text_string.lower()

We call the preceding method to clean the text:

text_data_train = [clean_text(x) for x in text_data_train]

Now we need to do one of the most important tasks, which is creating word embedding – changing text into numeric vectors:

vocab_processor = tf.contrib.learn.preprocessing.VocabularyProcessor(max_sequence_length, min_frequency=min_word_frequency)
text_processed = np.array(list(vocab_processor.fit_transform(text_data_train)))

Now let's shuffle to make the dataset balance:

text_processed = np.array(text_processed)
text_data_target = np.array([1 if x=='ham' else 0 for x in text_data_target])
shuffled_ix = np.random.permutation(np.arange(len(text_data_target)))
x_shuffled = text_processed[shuffled_ix]
y_shuffled = text_data_target[shuffled_ix]

Now that we have shuffled the data, we can split the data into a training and testing set:

ix_cutoff = int(len(y_shuffled)*0.75)
x_train, x_test = x_shuffled[:ix_cutoff], x_shuffled[ix_cutoff:]
y_train, y_test = y_shuffled[:ix_cutoff], y_shuffled[ix_cutoff:]
vocab_size = len(vocab_processor.vocabulary_)
print("Vocabulary size: {:d}".format(vocab_size))
print("Training set size: {:d}".format(len(y_train)))
print("Test set size: {:d}".format(len(y_test)))

The following is the output of the preceding code:

Vocabulary size: 933
Training set size: 4180
Test set size: 1394

Before we start training, let's create placeholders for our TensorFlow graph:

x_data = tf.placeholder(tf.int32, [None, max_sequence_length])
y_output = tf.placeholder(tf.int32, [None])

Let's create the embedding:

embedding_mat = tf.get_variable("embedding_mat", shape=[vocab_size, embedding_size], dtype=tf.float32, initializer=None, regularizer=None, trainable=True, collections=None)
embedding_output = tf.nn.embedding_lookup(embedding_mat, x_data)

Now it's time to construct our RNN. The following code defines the RNN cell:

cell = tf.nn.rnn_cell.BasicRNNCell(num_units = rnn_size)
output, state = tf.nn.dynamic_rnn(cell, embedding_output, dtype=tf.float32)
output = tf.nn.dropout(output, dropout_keep_prob)

Now let's define the way to get the output from our RNN sequence:

output = tf.transpose(output, [1, 0, 2])
last = tf.gather(output, int(output.get_shape()[0]) - 1)

Next, we define the weights and the biases for the RNN:

weight = bias = tf.get_variable("weight", shape=[rnn_size, 2], dtype=tf.float32, initializer=None, regularizer=None, trainable=True, collections=None)
bias = tf.get_variable("bias", shape=[2], dtype=tf.float32, initializer=None, regularizer=None, trainable=True, collections=None)

The logits output is then defined. It uses both the weight and the bias from the preceding code:

logits_out = tf.nn.softmax(tf.matmul(last, weight) + bias)

Now we define the losses for each prediction so that later on, they can contribute to the loss function:

losses = tf.nn.sparse_softmax_cross_entropy_with_logits_v2(logits=logits_out, labels=y_output)

We then define the loss function:

loss = tf.reduce_mean(losses)

We now define the accuracy of each prediction:

accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.argmax(logits_out, 1), tf.cast(y_output, tf.int64)), tf.float32))

We then create the training_op with RMSPropOptimizer:

optimizer = tf.train.RMSPropOptimizer(learning_rate)
train_step = optimizer.minimize(loss)

Now let's initialize all the variables using the global_variables_initializer() method:

init_op = tf.global_variables_initializer()

Additionally, we can create some empty lists to keep track of the training loss, testing loss, training accuracy, and the testing accuracy in each epoch:

train_loss = []
test_loss = []
train_accuracy = []
test_accuracy = []

Now we are ready to perform the training, so let's get started. The workflow of the training goes as follows:

  • Shuffle the training data
  • Select the training set and calculate generations
  • Run training step for each batch
  • Run loss and accuracy of training
  • Run the evaluation steps.

The following codes includes all of the afore-mentioned steps:

    shuffled_ix = np.random.permutation(np.arange(len(x_train)))
    x_train = x_train[shuffled_ix]
    y_train = y_train[shuffled_ix]
    num_batches = int(len(x_train)/batch_size) + 1

    for i in range(num_batches):
        min_ix = i * batch_size
        max_ix = np.min([len(x_train), ((i+1) * batch_size)])
        x_train_batch = x_train[min_ix:max_ix]
        y_train_batch = y_train[min_ix:max_ix]
        train_dict = {x_data: x_train_batch, y_output: 
y_train_batch, dropout_keep_prob:0.5}, feed_dict=train_dict)
        temp_train_loss, temp_train_acc =[loss, 
                         accuracy], feed_dict=train_dict)
    test_dict = {x_data: x_test, y_output: y_test,  
    temp_test_loss, temp_test_acc =[loss, accuracy], 
    print('Epoch: {}, Test Loss: {:.2}, Test Acc: {:.2}'.format(epoch+1, temp_test_loss, temp_test_acc))
Overall accuracy on test set (%): {}'.format(np.mean(temp_test_acc)*100.0))

The following is the output of the preceding code:

Epoch: 1, Test Loss: 0.68, Test Acc: 0.82
Epoch: 2, Test Loss: 0.68, Test Acc: 0.82
Epoch: 3, Test Loss: 0.67, Test Acc: 0.82

Epoch: 997, Test Loss: 0.36, Test Acc: 0.96
Epoch: 998, Test Loss: 0.36, Test Acc: 0.96
Epoch: 999, Test Loss: 0.35, Test Acc: 0.96
Epoch: 1000, Test Loss: 0.35, Test Acc: 0.96
Overall accuracy on test set (%): 96.19799256324768 

Well done! The accuracy of the RNN is above 96%, which is outstanding. Now let's observe how the loss propagates across each iteration and over time:

epoch_seq = np.arange(1, epochs+1)
plt.plot(epoch_seq, train_loss, 'k--', label='Train Set')
plt.plot(epoch_seq, test_loss, 'r-', label='Test Set')
plt.title('RNN training/test loss')
plt.legend(loc='upper left')
Figure 13: a) RNN training and test loss per epoch b) test accuracy per epoch

We also plot the accuracy over time:

plt.plot(epoch_seq, train_accuracy, 'k--', label='Train Set')
plt.plot(epoch_seq, test_accuracy, 'r-', label='Test Set')
plt.title('Test accuracy')
plt.legend(loc='upper left')

The next application uses time series data for predictive modeling. We will also see how to develop more complex RNNs called LSTM networks.

