SatoshiWatanabeの日記

続・わかりやすいパターン認識 ひとり読書会 4

今回は具体的な実装の内容を見ていきます。実装はpython3.6で行いました。


続パタ読書会 1:ディリクレ過程混合モデル
続パタ読書会 2:ディリクレ過程混合モデルのアルゴリズム
続パタ読書会 3:実験
続パタ読書会 4:実装
続パタ読書会 5:事前確率P(s)の妥当性 1
続パタ読書会 6:事前確率P(s)の妥当性 2
続パタ読書会 7:ベル数 1
続パタ読書会 8:ベル数 2
続パタ読書会 9:P(s_k|x_k, s_-k, θ)の導出
続パタ読書会 10:演習問題12.5の計算
続パタ読書会 11:演習問題12.6の計算 1
続パタ読書会 12:演習問題12.6の計算 2


標準外ライブラリとしては

  • numpy
  • scipy
  • matplotlib


あたりを使います。コードのファイルは全部で三つ使用しそれぞれ、

  • メインの実行ファイル
  • 「dpm.py」:クラスタリングを実行するクラスが記述されたファイル
  • 「tools.py」:その他の描画用関数を記述したファイル


としています。

実装

クラスタリングを実行するクラス

まずはディリクレ過程混合モデルによるクラスタリングを実際に行うクラスのコードです。言語の仕様上クラスタのインデックスは 1ではなく 0から始まるなど微妙な書き方の違いがありますが、やっていることは以前の読書会 2で書いた通りです。「dpm.py」のコード全文は下記のようになります。

from numpy.random import choice
from scipy import\
    int32, pi, inf, array, zeros, append, outer, where, delete,\
    log, sum, mean, cov
from scipy.linalg import det, inv
from scipy.stats import multivariate_normal as normal, wishart
from scipy.special import gamma
from copy import deepcopy

class algorithm1():
    def __init__(self, alpha, beta, nu, S):
        self.alpha = alpha # 集中度パラメーター
        self.beta = beta   # Norm-Wishart分布のハイパーパラメータ
        self.nu = nu       # 自由度(Wishart分布のハイパーパラメータ)
        self.S = array(S)  # d×d正定値対象行列(Wishart分布のハイパーパラメータ)

    def set_pattern(self, x):
        """ パターンのセットと各種パラメータの初期化 """
        self.x = array(x)                   # パラメータ集合
        self.n, self.d = self.x.shape       # パラメータ数と特徴空間の次元
        self.s = zeros(self.n, dtype=int32) # 潜在変数の集合
        self.c = 1                          # クラスタ数
        self.n_i = array([self.n])          # 各クラスタの所属パターン数
        self.mu_0 = mean(self.x, axis=0)    # ハイパーパラメータ
        self.mu = array([self.mu_0])        # 各クラスタの平均
        sigma = cov(self.x, rowvar=False, bias=True)
        self.icov = array([inv(sigma)])     # 各クラスタの精度行列

        # 計算用定数
        self.logalpha = log(self.alpha)
        self.logaf = sum(log(self.alpha + k) for k in range(self.n))

        self.logv = self.__posterior() # 対数事後確率log{p(s, θ|x)}
        self.maxv = self.logv          # 事後確率の最大値
        self.maxi = 0                  # 事後確率が最大の試行回

        # ログリスト
        self.ls = deepcopy([self.s])
        self.lc = deepcopy([self.c])
        self.ln_i = deepcopy([self.n_i])
        self.lmu = deepcopy([self.mu])
        self.licov = deepcopy([self.icov])
        self.llogv = deepcopy([self.logv])

    def run(self, iter, limit):
        """ 学習 """
        iteration = 0
        stagnation = 0

        while stagnation < limit and iteration < iter:
            print(\
                'iteration:{0:>3d} stag:{1:>3d} logv:{2:>7.1f} cluster:{3:>2d}'\
                .format(iteration, stagnation, self.logv, self.c)\
            )

            self.__update_s()
            self.__update_theta()

            newv = self.__posterior()
            self.__logging(newv)

            if newv > self.maxv:
                self.maxv = newv
                self.maxi = iteration + 1
                stagnation = 0
            else:
                stagnation += 1

            self.logv = newv
            iteration += 1

        print(\
            'iteration:{0:>3d} stag:{1:>3d} logv:{2:>7.1f} cluster:{3:>2d}'\
            .format(iteration, stagnation, self.logv, self.c)\
        )
        print('Finish')

    def __logging(self, logv):
        """ ログ取り """
        self.ls.append(deepcopy(self.s))
        self.lc.append(deepcopy(self.c))
        self.ln_i.append(deepcopy(self.n_i))
        self.lmu.append(deepcopy(self.mu))
        self.licov.append(deepcopy(self.icov))
        self.llogv.append(deepcopy(logv))

    def __prior(self, **kwargs):
        """ 事前確率 """
        return normal.pdf(\
            kwargs['mu'],\
            self.mu_0,\
            inv(self.beta * kwargs['icov'])\
        )\
        * wishart.pdf(\
            kwargs['icov'],\
            self.nu,\
            self.S\
        )\

    def __prob(self, **kwargs):
        """ 確率モデル """
        return normal.pdf(\
            kwargs['x'],\
            kwargs['mu'],\
            inv(kwargs['icov'])\
        )

    def __posterior(self):
        """ 事後確率 """
        # Ewensの抽出公式の対数を計算
        logv = self.c * self.logalpha
        logv += sum(\
            sum(\
                log(1 + l)\
                for l in range(self.n_i[i] - 1)\
            )\
            for i in range(self.c)\
        )
        logv -= self.logaf
        # 事前分布の対数を計算
        logv += sum(\
            log(\
                self.__prior(\
                    mu=self.mu[i],\
                    icov=self.icov[i]\
                )\
            )\
            for i in range(self.c)\
        )
        # 対数尤度を計算
        for i in range(self.c):
            x = self.x[self.s == i]
            logv += sum(\
                log(\
                    self.__prob(\
                        x=x[k],\
                        mu=self.mu[i],\
                        icov=self.icov[i]
                    )\
                )\
                for k in range(self.n_i[i])\
            )

        return logv

    def __update_s(self):
        """ sの更新 """
        for k in range(self.n):
            s_old = self.s[k]
            self.n_i[s_old] -= 1

            if self.n_i[s_old] == 0:
                self.c -= 1
                self.s[where(self.s > s_old)] -= 1
                self.n_i = delete(self.n_i, s_old)
                self.mu = delete(self.mu, s_old, axis=0)
                self.icov = delete(self.icov, s_old, axis=0)

            self.s[k] = self.__new_cluster(self.x[k])
            s_new = self.s[k]

            if s_new == self.c:
                self.c += 1
                self.n_i = append(self.n_i, 1)

                mu, icov = self.__generate_theta(\
                    self.x[self.s == self.c - 1],\
                    self.c - 1\
                )

                self.icov = append(self.icov, icov)
                self.icov = self.icov.reshape(self.c, self.d, self.d)
                self.mu = append(self.mu, mu)
                self.mu = self.mu.reshape(self.c, self.d)
            else:
                self.n_i[s_new] += 1

    def __new_cluster(self, x):
        """ 新規クラスタを確率的に割り当て """
        return choice(self.c + 1, p=self.__spost(x))

    def __spost(self, x):
        """ sの事後分布 """
        experience = self.__experience(x)
        base = self.__base(x)

        experience.append(base)
        p = array(experience)

        return p / sum(p)

    def __experience(self, x):
        """ 既存クラスタの比率 """
        return [\
            self.n_i[i]\
                * self.__prob(\
                    x=x,\
                    mu=self.mu[i],\
                    icov=self.icov[i]\
            )\
            for i in range(self.c)\
        ]

    def __base(self, x):
        """ 新規クラスタの比率 """
        Sb = inv(\
            inv(self.S)\
            + (self.beta/(1 + self.beta))\
            * outer(x - self.mu_0, x - self.mu_0)\
        )
        num = det(Sb)**((self.nu + 1)/2) * gamma((self.nu + 1)/2)
        den = det(self.S)**(self.nu/2) * gamma((self.nu + 1 - self.d)/2)

        return self.alpha\
            * (self.beta/((1 + self.beta) * pi))**(self.d/2)\
            * (num/den)

    def __update_theta(self):
        """ θの更新 """
        for i in range(self.c):
            mu, icov = self.__generate_theta(self.x[self.s == i], i)

            self.mu[i] = mu
            self.icov[i] = icov

    def __generate_theta(self, x, i):
        """ θの生成 """
        x_bar = mean(x, axis=0)

        # 分散共分散行列生成
        nu_c = self.nu + self.n_i[i]
        gap = x - array([x_bar])
        Sq = inv(inv(self.S)\
            + sum(\
                [\
                    outer(gap[k], gap[k])\
                    for k in range(self.n_i[i])\
                ],\
                axis=0\
            )\
            + ((self.n_i[i] * self.beta)/(self.n_i[i] + self.beta))\
            * outer(x_bar - self.mu_0, x_bar - self.mu_0))
        icov = wishart.rvs(nu_c, Sq)

        # 平均生成
        mu_c = (self.n_i[i]*x_bar + self.beta*self.mu_0)\
            / (self.n_i[i] + self.beta)
        icov_c = (self.n_i[i] + self.beta) * icov
        mu = normal.rvs(mu_c, inv(icov_c))

        return mu, icov
メインの実行ファイル

メインの実行ファイル上では「dpm.py」「tools.py」を読み込んで処理をします。主な処理の流れは

  1. 各種パラメータの設定
  2. 入力パターンの生成と表示
  3. 学習
  4. 結果のアニメーションの表示とgifの書き出し
  5.  \theta_iの予測結果をコマンドラインに表示


となっています。実際にコードを見てもらえばわかりますが学習のためのコードは3行しかなく、ほとんどが設定・パターン生成・結果表示のためのコードです。

if __name__ == "__main__":
    from numpy.random import rand
    from scipy import int32, array, linspace, concatenate
    from scipy.linalg import inv
    from scipy.stats import multivariate_normal as normal, wishart
    import matplotlib.pyplot as plt
    from matplotlib.patches import Ellipse
    from matplotlib.gridspec import GridSpec
    from matplotlib.animation import FuncAnimation
    from operator import itemgetter
    from dpm import algorithm1
    from tools import ellipse

    """ パラメータ設定 """
    # 学習パラメータ
    iter = 100 # 学習回数
    limit = 30 # 未更新回数がこの回数を超えたら収束したと判断

    # ハイパーパラメータ
    alpha = 1
    beta = 1 / 3
    nu = 15
    S = [[.1, .0], [.0, .1]]

    # 生成パターンのパラメータ
    origin = [0, 0]
    nsample = 500
    c = 5
    d = 2
    pi = [.4, .2, .2, .1, .1]
    mu = [[-3, 3], [2, 4], [0, 0], [4, -2], [-2, -4]]
    cov = [[[.4, -.4], [-.4, .6]],\
           [[.7, .6], [.6, .6]],\
           [[.3, .2], [.2, .5]],\
           [[.3, 0], [0, .3]],\
           [[.5, .1], [.1, .5]]]

    """ パターンの生成と表示 """
    x = []
    for i in range(c):
        num = int32(pi[i] * nsample)
        x.append([normal.rvs(mu[i], cov[i]) for j in range(num)])
    
    fig, ax = plt.subplots(figsize=(7, 6))
    for i in range(c):
        path = ax.scatter(\
            array(x[i])[:,0],\
            array(x[i])[:,1],\
            marker='.',\
            s=25,\
            alpha=.5,\
            label='cluster' + str(i + 1) + ' : ' + str(int32(nsample*pi[i]))
        )
        e = ellipse(mu[i], cov[i], 3, fc=path._facecolors[0][:3], alpha=.25)
        ax.add_artist(e)

    ax.set_xlim(-6, 6)
    ax.set_ylim(-6, 6)
    ax.legend(loc='upper right', scatterpoints=1)
    fig.suptitle('input pattern')
    plt.show()
    
    """ 学習 """
    x = concatenate(array(x), axis=0)

    algo = algorithm1(alpha, beta, nu, S)
    algo.set_pattern(x)
    algo.run(iter, limit)

    """ 結果の表示 """
    ls = algo.ls
    lc = algo.lc
    ln_i = algo.ln_i
    lmu = algo.lmu
    licov = algo.licov
    llogv = algo.llogv
    maxi = algo.maxi

    def update(i):
        while len(paths) > 0:
            paths.pop().remove()
        ax1.cla()
        ax2.cla()
        ax3.cla()

        # 散布図描画
        for j in range(lc[i]):
            pat = x[ls[i] == j]
            paths.append(
                ax1.scatter(\
                    pat[:,0],\
                    pat[:,1],\
                    marker='.',\
                    s=25,\
                    alpha=.5,\
                    label='cluster' + str(j + 1) + ' : ' + str(ln_i[i][j])
                )
            )
            e = ellipse(\
                lmu[i][j],\
                inv(licov[i][j]),\
                3,\
                fc=paths[j]._facecolors[0][:3],\
                alpha=.25
            )
            ax1.add_artist(e)

        ax1.set_xlim(-6, 6)
        ax1.set_ylim(-6, 6)
        ax1.legend(loc='upper right', scatterpoints=1)
        plt.setp(ax1, title='figure')

        # 事後確率描画
        h = int32(linspace(0, i, i + 1))
        v1 = itemgetter(*h)(llogv)
        ax2.plot(h, v1)
        ax2.set_xlim(0, len(lc))
        ax2.set_xlabel('number of iteration')
        ax2.set_ylabel('log v')
        plt.setp(ax2, title='posterior probability')
        if i >= maxi:
            ax2.axvline(maxi, ls='--', color='red')

        # クラスタ数描画
        v2 = itemgetter(*h)(lc)
        ax3.plot(h, v2)
        ax3.set_xlim(0, len(lc))
        ax3.set_ylim(1,)
        ax3.set_xlabel('number of iteration')
        ax3.set_ylabel('number of cluster')
        plt.setp(ax3, title='number of cluster')
        if i >= maxi:
            ax3.axvline(maxi, ls='--', color='red')
        
        fig.suptitle('number of iteration = '+ str(i))

    fig = plt.figure(figsize=(7, 10))
    grid = GridSpec(10, 1, hspace=10)
    ax1 = fig.add_subplot(grid[:6, 0])
    ax2 = fig.add_subplot(grid[6:8, 0])
    ax3 = fig.add_subplot(grid[8:10, 0])
    paths = []
    anim = FuncAnimation(fig, update, interval=500, frames=len(lc))
    plt.show()
    anim.save('result.gif', 'pillow')

    """ θの予測結果表示 """
    for c in range(lc[maxi]):
        print('mu_{} = {}'.format(c + 1, lmu[maxi][c]))
    for c in range(lc[maxi]):
        print('cov_{} = {}'.format(c + 1, inv(licov[maxi][c])))
その他の描画用関数

といっても関数一つだけです。散布図に分散共分散行列の 3\sigma楕円が描画されますが、その楕円を作成する関数です。引数に平均と分散共分散行列、また何 \sigmaまでの楕円を描画するかを指定し楕円を作成します。「tools.py」のコード全文は下記のようになります。

from scipy import sqrt, arctan2, degrees
from scipy.linalg import eigh
from matplotlib.patches import Ellipse

def ellipse(mu, cov, nstd, **kwargs):
    eigvals, eigvecs = eigh(cov)
    order = eigvals.argsort()[::-1]
    eigvals, eigvecs = eigvals[order], eigvecs[:, order]

    vx, vy = eigvecs[:,0][0], eigvecs[:,0][1]
    theta = arctan2(vy, vx)

    width, height = 2 * nstd * sqrt(eigvals)

    return\
        Ellipse(\
            xy=mu,\
            width=width,\
            height=height,\
            angle=degrees(theta),\
            **kwargs\
        )


コードを見ればわかると思いますが入力パターンは実行時に確率的に作成されますので実行のたびに違ったパターンになります。


では今回はここまでにして次回以降は保留にしていた話題や各計算の具体的な内容について触れていこうと思います。


続パタ読書会 1:ディリクレ過程混合モデル
続パタ読書会 2:ディリクレ過程混合モデルのアルゴリズム
続パタ読書会 3:実験
続パタ読書会 4:実装
続パタ読書会 5:事前確率P(s)の妥当性 1
続パタ読書会 6:事前確率P(s)の妥当性 2
続パタ読書会 7:ベル数 1
続パタ読書会 8:ベル数 2
続パタ読書会 9:P(s_k|x_k, s_-k, θ)の導出
続パタ読書会 10:演習問題12.5の計算
続パタ読書会 11:演習問題12.6の計算 1
続パタ読書会 12:演習問題12.6の計算 2