Easy Steps to Create a Simple Spam Classifier

Easy Steps to Create a Simple Spam Classifier

Beginner-Friendly Approach to Spam Classification

This project is perfect for beginner Machine Learning enthusiasts like myself, as it offers a great opportunity to learn the basics of Web Scraping and NLP (Natural Language Processing). Since we will only be skimming the surface, you don’t need to be well-versed in Deep Learning concepts to get started with this one.

Understanding the Problem Statement:

To develop an automated system capable of accurately distinguishing between spam and ham emails, thereby improving email security and user experience. Same methods can be applied to plain messages with some minor changes.

Data Collection:

We will be using Apache Spam Classifier’s public corpus for collecting binary files of spam and ham emails. For this, we will need to use following three modules:

  1. urllib – for accessing and downloading data from our assigned url.

  2. tarfile –for extracting the downloaded files.

  3. os module – for making directories where we will storing the downloaded data.

Also, one thing to keep in mind is to make sure that this whole loop is not executed again if we re-run the program because that might cause a lot of trouble while debugging other sections of code.

import os
import urllib.request
import tarfile

DOWNLOAD_ROOT = "http://spamassassin.apache.org/old/publiccorpus/"
HAM_URL1 = DOWNLOAD_ROOT + "20030228_easy_ham.tar.bz2"
HAM_URL2 = DOWNLOAD_ROOT + "20030228_hard_ham.tar.bz2"
SPAM_URL = DOWNLOAD_ROOT + "20030228_spam.tar.bz2"
SPAM_PATH = os.path.join("datasets", "spam")

def fetch_spam_data(ham_url1=HAM_URL1, ham_url2 = HAM_URL2,  spam_url=SPAM_URL, spam_path=SPAM_PATH):
    if not os.path.isdir(spam_path):
        os.makedirs(spam_path)

    datasets = [("easy_ham.tar.bz2", ham_url1, "easy_ham"), ("hard_ham.tar.bz2", ham_url2, "hard_ham"), ("spam.tar.bz2", spam_url, "spam")]

    for filename, url, extract_dir in datasets:
        path = os.path.join(spam_path, filename)
        extract_path = os.path.join(spam_path, extract_dir)

        if not os.path.isdir(extract_path):
            if not os.path.isfile(path):
                urllib.request.urlretrieve(url, path)

            with tarfile.open(path) as tar_bz2_file:
                tar_bz2_file.extractall(path=spam_path)
                print(f"Extracted {filename} to {extract_path}")
        else:
            print(f"Already extracted: {extract_path}")

We will store this in a file named 'load.py' for now. Proceeding to our main task now, since this data is in different non-readable formats, we will have to process this before using it for our model.

Let’s proceed step by step. Firstly, we define paths for easy_ham, hard_ham, and spam directories. Now we will store the filenames in a regular python array using loops.

import os
import load as ld

ld.fetch_spam_data()

HAM_DIR1 = os.path.join(ld.SPAM_PATH, "hard_ham")
HAM_DIR2 = os.path.join(ld.SPAM_PATH, "easy_ham")
SPAM_DIR = os.path.join(ld.SPAM_PATH, "spam")

hard_ham_filenames = [name for name in sorted(os.listdir(HAM_DIR1)) if len(name) > 20]
easy_ham_filenames = [name for name in sorted(os.listdir(HAM_DIR2)) if len(name) > 20]
spam_filenames = [name for name in sorted(os.listdir(SPAM_DIR)) if len(name) > 20]

Now we will define a module in a separate file named ‘modules.py’ which will load these emails for us based on given filename. The load_mail() method has directory_name as parameter along with the filename and path where the directory is located. This is required for using the open() method for reading files.

#In modules.py

def load_email(directory_name, filename, spam_path = SPAM_PATH):
    directory = directory_name
    with open(os.path.join(spam_path, directory, filename), "rb") as f:
        return email.parser.BytesParser(policy  = email.policy.default).parse(f)

Now we will use this method along with loops to make another set of python arrays. We will be storing easy and hard hams together, thus balancing the total no. of spam and ham records.

import modules as md

ham_emails = [md.load_email(directory_name = "hard_ham",  filename=name) for name in hard_ham_filenames] + [md.load_email(directory_name = "easy_ham", filename=name) for name in easy_ham_filenames]
spam_emails = [md.load_email(directory_name = "spam", filename=name) for name in spam_filenames]

Some Basic EDA

Now we have the emails, but we need to write another function to convert them into text so we can pass them to the model. To do this, we need to analyze the structure of all the emails. We will need a function that can go through the entire email and tell us its structure. Since many emails are going to be multipart, we will use recursion for exploring all parts.

#In modules.py

def get_email_structure(email):
    if isinstance(email, str):
        return email
    payload = email.get_payload()
    if isinstance(payload, list):
        return "multipart({})".format(", ".join([
            get_email_structure(sub_email)
            for sub_email in payload
        ]))
    else:
        return email.get_content_type()

def structures_counter(emails):
    structures = Counter()
    for email in emails:
        structure = get_email_structure(email)
        structures[structure] += 1
    return structures

Now we can use this for some simple EDA (Exploratory Data Analysis) through the following code.

ham_structures = pd.DataFrame(np.array([md.get_email_structure(email) for email in ham_emails]), columns = ['content-type'])
spam_structures = pd.DataFrame(np.array([md.get_email_structure(email) for email in spam_emails]), columns = ['content-type'])

print(ham_structures['content-type'].value_counts())

Output of the above code might look something like this :

Email StructureContent-type
text/plain2489
text/html118
multipart(text/plain, application/pgp-signature)66
multipart(text/plain, text/html)51
multipart(text/plain, text/plain)5
multipart(text/plain)3
Others18

This shows that most of the content we will be dealing with is either 'text/plain' or 'text/html', even for multipart emails. Other types of content in our dataset, such as images, GIFs, or PDF attachments, can be skipped for now. We will focus on extracting HTML and text from the given emails.

Data Preprocessing

#In modules.py

def decode_email_part(part):
    charset = part.get_content_charset() or 'utf-8'
    payload = part.get_payload(decode=True)

    if payload is None:
        return ""

    try:
        part_content = payload.decode(charset)
    except (LookupError, UnicodeDecodeError):
        part_content = payload.decode('utf-8', errors='replace')

    return part_content

def get_html_from_email(part):
    if part.get_content_type() == 'text/html':
        return decode_email_part(part)
    elif part.get_content_type().startswith('multipart/'):
        for subpart in part.iter_parts():
            html_content = get_html_from_email(subpart)
            if html_content:
                return html_content
    return None

def get_text_from_email(part):
    if part.get_content_type() == 'text/plain':
        return decode_email_part(part)
    elif part.get_content_type().startswith('multipart/'):
        for subpart in part.iter_parts():
            text_content = get_text_from_email(subpart)
            if text_content:
                return text_content
    return None

To parse html content and extract text easily, we will be using a web scraping library called Beautiful Soup. This will hence, become the final method which we will be calling in the main program.

#In modules.py
from bs4 import BeautifulSoup

def email_to_text(email):
    def parse_html(content):
        try:
            return BeautifulSoup(content, 'lxml').get_text().strip()
        except Exception:
            return None

    content_type = email.get_content_type()
    if content_type == 'text/html':
        return parse_html(decode_email_part(email))
    elif content_type == 'text/plain':
        return decode_email_part(email).strip()
    elif content_type.startswith('multipart/'):
        html_content = get_html_from_email(email)
        if html_content:
            return parse_html(html_content)
        text_content = get_text_from_email(email)
        if text_content:
            return text_content.strip()

    return None
#In main.py
import numpy as np
import pandas as pd

ham_text = [md.email_to_text(email) for email in ham_emails]
spam_text = [md.email_to_text(email) for email in spam_emails]

ham_text = pd.DataFrame(np.array(ham_text), columns=['content'])
spam_text = pd.DataFrame(np.array(spam_text), columns=['content'])

That completes half of our Data Preprocessing task. Now that being done, we need to take into account the subject of each email and the ‘To’ and ‘From’ mail ids as they can give us a decent idea about the authenticity of the mail. To extract this metadata, we will need to create another method.

#In modules.py

def extract_email_metadata(email_array):
    email_data = []

    for email in email_array:
        subject = np.array(email['Subject'])
        from_ = np.array(email['From'])
        to = np.array(email['To'])

        email_data.append([subject, from_, to])

    temp_df = pd.DataFrame(email_data, columns=['subject', 'from', 'to'])
    return temp_df

Proceeding, we will have to do a little manipulation join these two features, also we will be creating a new feature named ‘isSpam’ labelling the content (You can try to do this in fewer lines of code as an additional exercise).

ham_meta_df = md.extract_email_metadata(ham_emails)
spam_meta_df = md.extract_email_metadata(spam_emails)

X_ham = pd.DataFrame(ham_text, columns=['content'])
X_spam = pd.DataFrame(spam_text, columns=['content'])
y_ham = pd.DataFrame([0] * len(ham_text), columns=['isSpam'])
y_spam = pd.DataFrame([1] * len(spam_text), columns=['isSpam'])

ham_df = pd.concat([X_ham, y_ham], axis=1)
spam_df = pd.concat([X_spam, y_spam], axis=1)

mail_df1 = pd.concat([ham_df, spam_df], ignore_index=True)
mail_df2 = pd.concat([ham_meta_df, spam_meta_df], ignore_index=True)

mail_df = pd.concat([mail_df1.reset_index(drop=True), mail_df2.reset_index(drop=True)], axis=1)

X = mail_df[['content', 'subject', 'from', 'to']]
y = mail_df['isSpam']

Splitting the datasets :

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

Another step in preparing the data during such tasks related to languages is ‘Stemming’. Stemming includes cutting a group of words to their collective base word (or stem). For example, consult, consultant, consultation, consultative, all will lead to the same stem, i.e. consult. For this, we will be using a very popular library named nltk’s (Natural Language Toolkit) Snowball Stemmer (which is an enhanced version of Porter Stemmer).

import nltk 

class StemmingTokenizer:
    def __init__(self):
        self.stemmer = nltk.SnowballStemmer(language='english')

    def __call__(self, doc):
        tokens = nltk.word_tokenize(doc)
        return [self.stemmer.stem(token) for token in tokens]

You may have noticed that even after all this, our dataset still contains words, not numbers, and our ML model won't work with that. This brings us to the final step in our preprocessing pipeline – the Tf-Idf Vectorizer.

Before you get overwhelmed by the term, let's break it down. Tf stands for Term Frequency, and Idf stands for Inverse Document Frequency. This method helps determine the importance of words based on how often they appear compared to other words in the document.

(Remember we are not converting words to numbers, we are trying to represent the importance of words through numbers. A quick glance of the formulae given below will make it clearer).

If you feel like getting a more detailed explanation, I would recommend watching this 5-min Youtube Video. Done? Let’s proceed then.

from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline

tfidf_vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
column_transformer = ColumnTransformer(
    [('content_tfidf', tfidf_vectorizer, 'content'),
     ('subject_tfidf', tfidf_vectorizer, 'subject'),
     ('from_tfidf', tfidf_vectorizer, 'from'),
     ('to_tfidf', tfidf_vectorizer, 'to')],
    remainder='passthrough')

text_pipeline = Pipeline([
    ("tfidf", TfidfVectorizer(max_features=1000, tokenizer=StemmingTokenizer())),
])

preprocessing_pipeline = Pipeline([
    ('tfidf', column_transformer)
])

Fitting in the pipeline:

X_train_preprocessed = preprocessing_pipeline.fit_transform(X_train.astype(str))
X_test_preprocessed = preprocessing_pipeline.transform(X_test.astype(str))

Modelling

Technically, you should be running different models with the same data and pick up the best performing ones, but I have already done that for you (including the hyperparameter tuning) so we will skip that part.

Another thing which we will be using is the concept of Model Stacking. It’s simple, we use two or more base models and then use a meta model which combines their strengths and mitigates their weaknesses. For this we will use scikit-learn’s Stacking Classifier. Ideally the base models should be as diverse as possible.

from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler, MaxAbsScaler
from sklearn.naive_bayes import MultinomialNB

rf_clf = RandomForestClassifier(class_weight='balanced', bootstrap=False, n_estimators=100, max_depth=None, min_samples_leaf=1, min_samples_split=2, random_state=42)
xgb_clf = XGBClassifier(n_estimators=100, max_depth=6, learning_rate=0.1, subsample=0.7, random_state=42)
mnb_clf = MultinomialNB(alpha = 0.1)

rf_pipeline = Pipeline([
    ('scaler', StandardScaler(with_mean=False)),
    ('classifier', rf_clf) ])

xgb_pipeline = Pipeline([
    ('scaler', StandardScaler(with_mean=False)),
    ('classifier', xgb_clf) ])

mnb_pipeline = Pipeline([
    ('scaler', MaxAbsScaler()),
    ('classifier', mnb_clf)  ])

meta_model = LogisticRegression()

stacking_clf = StackingClassifier(
    estimators=[
        ('rf', rf_pipeline),
        ('xgb', xgb_pipeline),
        ('mnb', mnb_pipeline) ],  final_estimator=meta_model )

(you can investigate why using MaxAbsScaler for Multinomial Naive Bayes is a better option compared to StandardScaler )

Fitting and testing

We can keep going on and on fine tuning our models, but since the dataset is not that big, this should be more than enough. Let's fit our data to the stacking classifier and test it on test dataset.

stacking_clf.fit(X_train_preprocessed, y_train)

accuracy = stacking_clf.score(X_test_preprocessed, y_test)
print(f'Stacking Classifier Accuracy: {accuracy:.4f}')

Completion

Congrats. You just completed your first Spam Classifier project. If everything goes right, you will get accuracy close yo 98-99%. We can now proceed to save this model with the use of joblib, so that it can be further used for creating an API and deployment purposes.

import joblib

joblib.dump(stacking_clf, 'stacking_clf.joblib')
joblib.dump(preprocessing_pipeline, 'preprocessing_pipeline.joblib')

Thank you for reading this blog till here. If you want to know more about Stemming algorithms, here are the links for both Porter Stemmer and Snowball Stemmer, you can skim through them to get an idea of what's happening under the hood.