日本国内外の先端事例や生活者トレンドをSEEDATA独自の視点で分析し、ブログ形式で配信しています。News

Written by
SEEDATA
公開日:2019.11.28/ 更新日:2021.06.11

テクノロジー(Technologies)

ナイーブベイズ:Pythonによるスパムフィルタの実装

[mathjax]

ナイーブベイズは文書データの分類に用いられる手法で,単純な理論ながら高いパフォーマンスを誇ることで知られています.このページでは,ナイーブベイズモデルのPythonによる実装方法を解説していきます.なお,今回の記事における変数表記・命名は【ナイーブベイズ:カテゴリ数が一般の場合の式展開】に倣っていますので,この記事の前に必ずご覧ください.

ナイーブベイズ:カテゴリ数が一般の場合の式展開

1. 全体の構造設計


$ tree
.
├── datasets
└── src
    ├── main.py
    ├── naive_bayes.py
    └── nlp_process.py

このようなディレクトリ構成にします.main.pyがスクリプトとして実行するファイル,そしてmain.pyから,ナイーブベイズのモデルを定義したnaive_bayes.py及び文書データの前処理を行う nlp_process.pyを呼び出して処理する形にします.文書データはdatasets/に格納します.

2. 今回使用するデータ


こちらのKaggleデータを使用し,spamかhamかをナイーブベイズにより分類します.英語のSMSです.spam.csvという名前でdatasets/ ディレクトリ直下に配置してください.

https://www.kaggle.com/uciml/sms-spam-collection-dataset

3. NaiveBayesモデルの実装 – naive_bayes.py


機械学習モデルの実装設計については,Pythonの機械学習系のライブラリ、特にscikit-learnが参考になります.ナイーブベイズはscikit-learnでも実装されており,基本的な使用方法は以下の通りです。

> clf = NBmodel()
> clf.fit(x_train, y_train)
> y_pred = clf.predict(x_test)

clfclassifier(分類器)の略であり、クラスオブジェクトとして実装されています。今回はこれに添う形で実装してみましょう.

3.0 Baseとなるクラスの実装

今回は,ベルヌーイ,多項分布の場合で扱う変数が同じですので,コードの可読性を上げるために,共有する構造の部分はベースとなるクラスで定義し,ベースのクラスをベルヌーイ及び多項分布のクラスで継承する,という形でコードを組みます.scikit-learnに倣い,_BaseNBという名前で定義してみます.scikit-learnはこれよりも抽象度の高いクラスを定義していますが,今回は変数の初期化までを_BaseNBで行います.

class _BaseNB:
    def __init__(self, alpha=1.0, beta=1.0, theta_c=None, phi_cv=None):
        """
        :param alpha: smoothing param for calculating each category's prior distribution (default: Laplace smoothing)
        :param beta: smoothing param for calculating the probability of category c choosing the word v
        (default: Laplase smoothing)
        :param theta_c: category c's prior distribution
        :param phi_cv: the probability of category c selecting the word v
        """
        self.alpha = alpha
        self.beta = beta
        self.theta_c = theta_c
        self.phi_cv = phi_cv

3.1 Bernoulli event modelの実装

ベルヌーイイベントモデルを実装します.

import numpy as np


class BernoulliNB(_BaseNB):

    def fit(self, x, y):
        n_documents, vocabulary_size = x.shape
        n_categories = len(set(y))
        categories = sorted(list(set(y)))

        if self.theta_c is None:
            self.theta_c = np.zeros(shape=(n_categories, 1), dtype=np.float32)

        if self.phi_cv is None:
            self.phi_cv = np.zeros(shape=(n_categories, vocabulary_size), dtype=np.float32)

        for i, category in enumerate(categories):
            lc = np.sum(y == category)
            self.theta_c[i] = (lc + self.alpha) / (n_documents + n_categories * self.alpha)

            c_docs = x[y == category]
            mc = np.sum(c_docs, axis=0)
            self.phi_cv[i] = (mc + self.beta) / (lc + 2. * self.beta)

    def predict(self, x):
        log_likelihood = np.dot(np.log(self.phi_cv), x.T) + np.dot(np.log(1. - self.phi_cv), (1. - x).T)
        y = np.argmax(np.log(self.theta_c) + log_likelihood, axis=0)
        return y

ポイントは,実行速度の問題から,numpyでなるべくfor文を回さずに書くことで,特に訓練データから最尤推定する変数(theta_c)(phi_{c, v})に関してはnumpyのboolean indexを有効活用します.predict部分で,全てのサンプルに対してnumpyで並列化するところにやや頭を捻りますが,ベイズ系の機械学習で尤度を求める際には,以下のような演算は頻出します.(theta)は(x)ベクトルの次元と同じ次元を持ったベクトル,そして(X)には各観測サンプルが行ごとに格納されており,転置して行列演算させることで重みをかけます.

> np.dot(theta, X.T)  # theta.shape=(1, x_dim), X.shape=(n_samples, x_dim)

3.2 Multinomial event modelの実装

多項イベントモデルを実装します.

class MultinomialNB(_BaseNB):

    def fit(self, x, y):
        n_documents, vocabulary_size = x.shape
        n_categories = len(set(y))
        categories = sorted(list(set(y)))

        if self.theta_c is None:
            self.theta_c = np.zeros(shape=(n_categories, 1), dtype=np.float32)

        if self.phi_cv is None:
            self.phi_cv = np.zeros(shape=(n_categories, vocabulary_size), dtype=np.float32)

        for i, category in enumerate(categories):
            lc = np.sum(y == category)
            self.theta_c[i] = (lc + self.alpha) / (n_documents + n_categories * self.alpha)

            ncv = np.sum(x[y == category], axis=0)
            nc_total = np.sum(ncv)
            self.phi_cv[i] = (ncv + self.beta) / (nc_total + vocabulary_size * self.beta)

    def predict(self, x):
        log_likelihood = np.dot(np.log(self.phi_cv), x.T)
        y = np.argmax(np.log(self.theta_c) + log_likelihood, axis=0)
        return y

こちらもベルヌーイと同様に実装できます.

4. 前処理関数の実装 – nlp_process.py


さて,文書データを適切な形に前処理し,ベクトル化する処理をnlp_process.pyに書きたいと思います.
今回扱う文書データである,KaggleからDLしたspam.csvは次のように文書とクラスを格納しています.
v1,v2,,,
ham,"Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...",,,
ham,Ok lar... Joking wif u oni...,,,
spam,Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's,,,
前処理としては,v1, v2カラムのみを用いてpandasで読み込み,文書を単語ごとに分けるにはstr.split()メソッドを使用すれば良いでしょう.さらに,あまりに低頻度な単語は分類の邪魔になるため,今回は5種類以上の文書に登場した単語のみを辞書に加えることにします.
import collections
import numpy as np


def _get_words(document):
    words = document.lower().split()  # alphabetは小文字に統一
    return words


def create_dictionary(documents):
    word_count = collections.defaultdict(int)
    words_list = [set(_get_words(document)) for document in documents]
    for words in words_list:
        for word in words:
            word_count[word] += 1
    # 5文書以上に出現した単語のみを辞書に加える
    vocab = [word for word, count in word_count.items() if count >= 5]
    return dict(zip(vocab, range(len(vocab))))


# 多項イベントモデルに対するベクトル化
def _multinomial_vectorize(documents, word_dictionary):
    doc_array = np.zeros((len(documents), len(word_dictionary)), dtype=np.int)
    for i, document in enumerate(documents):
        for word in _get_words(document):
            if word in word_dictionary.keys():
                doc_array[i][word_dictionary[word]] += 1
    return doc_array


# ベルヌーイイベントモデルに対するベクトル化
def _bernoulli_vectorize(documents, word_dictionary):
    doc_array = np.zeros((len(documents), len(word_dictionary)))
    for i, document in enumerate(documents):
        for word in set(_get_words(document)):
            if word in word_dictionary.keys():
                doc_array[i][word_dictionary[word]] += 1
    return doc_array


# 二つのイベントモデルに対する処理を一つの関数にまとめます
def document_vectorizer(documents, dictionary, method='multinomial'):

    if method == 'multinomial':
        x = _multinomial_vectorize(documents, dictionary)

    elif method == 'bernoulli':
        x = _bernoulli_vectorize(documents, dictionary)

    else:
        raise Exception('invalid method name: select either multinomial or bernoulli')
    return x

ここで最も注意すべきは,各イベントモデルに基づいて文書ベクトルを作成する部分です.文書ベクトルは,shape=(n_documents, vocabulary_size)で初期化します.しかし,一般にボキャブラリーのサイズは大きくなるため, 辞書の全ての単語についてそのままイテレートすると計算時間が大きくなります.そこで,1回目のfor文で各文書にアクセスしたら,辞書についてfor文を回すのではなく,文書内の単語一つ一つにアクセスする方が結果としてループ処理が少なくなります.

5. scriptファイルの実装 – main.py


さて,これまで定義してきた関数を処理系として組み合わせてみましょう.spam.csv のうちランダムに抽出した8割を訓練データとし,残りの2割をテストデータとしてモデルを評価します.精度を算出する際,今回はクラス分布に偏りがある(spamの方がhamに比べて圧倒的に少ない)ため,単に精度だけではなく,F1スコアなどを含めた分類レポートを算出してみます.

import json
import numpy as np
import pandas as pd

import naive_bayes
import nlp_process


def read_data(path):
    df = pd.read_csv(path, usecols=['v1', 'v2'], encoding='latin-1')  # encodingを指定しないと文字化けします
    df.columns = ['class', 'text']  # column名は書き換えます
    df['class'] = np.array(df['class'].values == 'spam', dtype='int8')
    return df


def train_test_split(data, train_rate=0.8):
    """
    sklearn.model_selectionで定義されている処理
    """
    index = np.array(data.index)
    np.random.shuffle(index)
    train_idx = index[:int(len(index) * train_rate)]
    test_idx = index[:-int(len(index) * train_rate)]
    train_df = data.loc[train_idx]
    test_df = data.loc[test_idx]
    return train_df, test_df


def classification_report(y_true, y_pred):
    """
    sklearn.metricsで定義されているクラス
    """
    tp = np.sum((y_true == 1) * (y_pred == 1))
    fn = np.sum((y_true == 1) * (y_pred == 0))
    fp = np.sum((y_true == 0) * (y_pred == 1))
    tn = np.sum((y_true == 0) * (y_pred == 0))
    accuracy = np.sum(y_true == y_pred) / len(y_true)
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1_score = 2 * (precision * recall) / (precision + recall)
    clf_summary = {'True Positive': tp, 'False Negative': fn, 'False Positive': fp, 'True Negative': tn,
                   'Accuracy': accuracy, 'Precision': precision, 'Recall': recall, 'F1 score': f1_score}
    return clf_summary


def main(event_model, save_dictionary=True):
    print('--- event model: {} ---'.format(event_model))
    datasets_dir = '../datasets/'
    df = read_data(path=datasets_dir+'spam.csv')
    train_df, test_df = train_test_split(data=df, train_rate=0.8)

    print("making the dictionary from train data...")
    dictionary = nlp_process.create_dictionary(train_df['text'])
    if save_dictionary:
        with open('../datasets/dict.json', 'w') as f:
            json.dump(dictionary, f, indent=4)  # 辞書をjsonファイルに保存

    x_train = nlp_process.document_vectorizer(documents=train_df['text'], dictionary=dictionary, method=event_model)
    y_train = train_df['class'].values

    x_test = nlp_process.document_vectorizer(documents=test_df['text'], dictionary=dictionary, method=event_model)
    y_test = test_df['class'].values

    if event_model == 'multinomial':
        clf = naive_bayes.MultinomialNB()
    elif event_model == 'bernoulli':
        clf = naive_bayes.BernoulliNB()
    else:
        raise Exception('event model name invalid')

    print('fitting the model to the train data...')
    clf.fit(x_train, y_train)

    print('predicting class labels for the test data')
    y_pred = clf.predict(x_test)
    summary = classification_report(y_true=y_test, y_pred=y_pred)
    print()
    print('*** classification report ***')
    for key, value in summary.items():
        print(key + ': ' + str(round(value, 2)))


if __name__ == '__main__':
    main(event_model='multinomial')
    print()
    main(event_model='bernoulli')

さて,以下のように実行すると,次のような結果が得られると思います.環境にもよりますが,ナイーブベイズは処理がシンプルで高速なので,1秒程度で終了するはずです.

$ python3 main.py
--- event model: multinomial ---
making the dictionary from train data...
fitting the model to the train data...
predicting class labels for the test data

*** classification report ***
True Positive: 153
False Negative: 12
False Positive: 5
True Negative: 945
Accuracy: 0.98
Precision: 0.97
Recall: 0.93
F1 score: 0.95

--- event model: bernoulli ---
making the dictionary from train data...
fitting the model to the train data...
predicting class labels for the test data

*** classification report ***
True Positive: 135
False Negative: 12
False Positive: 11
True Negative: 957
Accuracy: 0.98
Precision: 0.92
Recall: 0.92
F1 score: 0.92

ここで,precisionは,適合率と呼び,「Positive」と予測したもののうち,実際に「Positive」であるものの割合です.今回はPositive = Spam認定です.Spamフィルターでは,「SpamでないメールがSpam認定されてしまう」ことをできるだけ避けるのが良いデザインといえるでしょうから,Precisionが高いモデルほどフィルタに適していると考えられます.その点では,今回のデータでは多項イベントモデルの勝利と言えます.F1スコアも優れています.

6. まとめ


今回は,実際にナイーブベイズを一から実装し,うまくスパムフィルタとして機能することを確かめました.