手話認識入門6 - RNNを用いた孤立手話単語認識モデル2 - Padding信号のマスキング

This image is generated with ChatGPT-4, and edited by the author.
作成日:2024年02月16日(金) 00:00
最終更新日:2024年10月24日(木) 18:21
カテゴリ:手話言語処理
タグ:  手話認識入門 孤立手話単語認識 RNN Python

Recurrent Neural Network (RNN) を用いた孤立手話単語認識モデルを紹介します.今回は特に,Padding信号のマスキングにポイントを絞って解説をします.

こんにちは.高山です. 先日の記事で告知しました手話入門記事の第六回になります.
前回に引き続いて,今回もRecurrent Neural Network (RNN) を用いた孤立手話単語認識モデルを実装する方法を紹介します.

前回の記事の冒頭で,RNNの設計上のポイントとして次の3点を挙げました.

  1. レイヤ構成: Stacked RNNとBidirectional RNN
  2. Padding信号のマスキング
  3. レイヤ種別: LSTMとGRU

前回はレイヤ構成について説明したので,今回はPadding信号のマスキングに焦点を絞って紹介をしたいと思います.

今回解説するスクリプトはGitHub上に公開しています
色々な実験を行っている都合で,CPUで動かした場合は結構時間がかるのでご注意ください.

更新履歴 (大きな変更のみ記載しています)

  • 2024/09/18: カテゴリを変更しました
  • 2024/09/17: タグを更新しました
  • 2024/07/29: Gitスクリプトのダウンロード元を master から v0.1タグに変更
  • 2024/07/23
    • 第1節の構成を見直し
    • 記事最終部の実験結果を削除して第3節に統合

1. モデルの改良内容

前回の記事では,図1(c)に示すように特徴抽出ブロックにRNN層を加えてモデルの改善を行いました.
今回はさらに,RNN層と出力層にPadding信号のマスキング機能を加えます.

RNNを用いた孤立手話単語認識モデルの,処理構成と処理の流れを描いたブロック図です.特徴抽出ブロックの末尾に新しくRNN層を追加した構造になっています.
RNNを用いた孤立手話単語認識モデル

2. Paddingについておさらい

第一回の記事 (第3節をご参照ください) で説明したように,深層学習では複数のサンプルをバッチという塊でまとめ,並列に計算を行う場合が多いです.
バッチデータを用いることで,学習の高速化や統計値を用いた処理が可能になります.

手話認識のようにサンプルの系列長が異なる場合は,図2に示すように系列長を整えるためにPaddingという処理を行います.
Paddingにはダミー信号 (例えばゼロベクトル) が用いられます.
ダミー信号は認識に必要な情報を持っていないため,そのまま学習・推論に用いると認識性能が落ちてしまう場合があります.

パディングを用いてデータの長さを揃えている様子を描いた図です.画像前の文章,および参照しているリンク先に説明があります.
パディングによるバッチデータの整形

3. Padding信号のマスキング

ダミー信号は時系列方向に付加されているため,時系列処理に対して影響を与えます.
今回の認識モデルでは,RNN層と出力層 (のGlobal Average Pooling処理) が時系列処理を行うため,それぞれの層に対してマスキング処理を追加します.
なお,線形変換層は各フレームを独立に処理するため (前回の記事の第2節 図2(a) をご参照ください),マスキングは必要ありません.

3.1 RNN層のマスキング

PyTorchのRNN層は,入力の型を判別して内部でマスキングを行います.

PyTorchのRNN層でマスキング処理を実装する方法を説明した図です.画像の後に詳細説明があります.
RNNの設計ポイント: Padding信号のマスキング

図3(a) は生のTensor型を入力した場合の処理を示しています.
この場合,RNN層はPadding信号のマスキングを行わず,全ての信号をそのまま処理します.

RNN層でマスキングを行う場合は図3(b) 上部の図に示すように,Packing, Unpacking, および (再) Paddingの処理を追加する必要があります.

PackingはTensor型の入力をPackedSequence型に変換する処理で,変換の際に各サンプルの系列長 (ダミー信号を除いた長さ) などの補助情報を持たせることができます.
RNN層はこの補助情報を基にマスキングを行います.

図3(b) 下部の図に示すように,RNN層ではダミー信号をスキップすることでマスキング処理を行います.
例えば,FRNNでは \(\boldsymbol{x}_2\) で計算処理をストップし,BRNNでは \(\boldsymbol{x}_3\) を飛ばして \(\boldsymbol{x}_2\) から計算処理を行います.

計算処理後はUnpackingという処理でデータ形式をPackedSequence型からTensor型に戻します.
RNN層のマスキングは計算処理のスキップで実装されているため,Unpacking後の出力データは系列長の異なるサンプルのリストになっています.
そのため,再度Paddingを行いバッチデータに戻す必要があります.

3.2 出力層のマスキング

出力層のGlobal Average Poolingでは,特徴量の時間平均値を求めます.
そのため,マスキングを行わない場合は出力にダミー信号が混ざり,認識性能に影響が出てしまいます.

Global Average Poolingに対するマスキング処理を図4に示します.

マスキング機能付きのGlobal averaging pooling層の処理を説明した図です.画像の後に説明があります.
Masked Global Average Pooling

Masked Global Average Pooling では (なお,この名前は高山が勝手に名付けているだけです(^^;)),入力特徴量と同じ長さのMask信号 \(\boldsymbol{M}\) を用意します.
Mask信号の各要素は\(1\)\(0\)のスカラーで,\(1\)は正規の信号を示し,\(0\)はダミー信号を示します.
計算処理は非常に単純で,特徴量とマスク信号を要素毎にかけ合わせ総和を取り,各サンプルの系列長 (つまり,マスク信号の\(1\)の要素数) で割るだけです.

4. 実験結果

次節以降では,いつも通り実装の紹介をしながら実験結果をお見せします.
今回はマスキングの効果を見るために複数の実験条件を実装しており,少し冗長な展開が続きますので結果を先にお見せしたいと思います.

図5はマスキングを適用するレイヤ毎のValidation Lossと認識率の推移を示しています.

認識性能比較結果を図示したグラフです.画像前後の文章に詳細説明があります.
認識性能比較結果: Padding信号のマスキング

横軸は学習・評価ループの繰り返し数 (Epoch) を示します.
縦軸はそれぞれの評価指標を示します.

各線の色と実験条件の関係は次のとおりです.

  • 青線 (Default): Stacked - Bidirectional RNN
  • 橙線 (+ Masking(R)): RNN層だけにマスキング処理を適用
  • 緑線 (+ Masking(H)): 出力層だけにマスキング処理を適用
  • 赤線 (+ Masking(B)): RNN層と出力層にマスキング処理を適用

デフォルト設定に比べて,マスキング処理を加えた場合は認識性能が向上していることが分かります.

DefaultとMasking(R)のLossの減り方を比べると,RNN層のマスキングは一応効果がありそうです.
ただし,Making(H)の結果と比べるとMaking(R)の効果は限定的です.

この理由としては,次のような要因が考えられます.

  • RNN層ではダミー信号の伝播が近接フレームだけで収まっている
  • Default設定ではダミー信号も一種の正則化として過学習を防ぐ効果がある

一方,出力層では平均処理によってダミー信号が直接影響を及ぼすため,マスキングの効果が高かったと予想できます.

なお,今回の実験では話を簡単にするために,実験条件以外のパラメータは固定にし,乱数の制御もしていません.
複数回試して認識性能の傾向は確認していますが,必ずしも同様の結果になるわけではないので,ご了承ください.

5. 前準備

5.1 データセットのダウンロード

ここからは実装方法の説明をしていきます.
まずは,前準備としてGoogle Colabにデータセットをアップロードします.

最初に,データセットの格納先からデータをダウンロードし,ご自分のGoogle driveへアップロードしてください.

次のコードでGoogle driveをColabへマウントします.
Google Driveのマウント方法については,補足記事にも記載してあります.

1
2
3
from google.colab import drive

drive.mount("/content/drive")

ドライブ内のファイルをColabへコピーします.
パスはアップロード先を設定する必要があります.

# Copy to local.
!cp [path_to_dataset]/gislr_dataset_top10.zip gislr_top10.zip

データセットはZIP形式になっているので unzip コマンドで解凍します.

!unzip gislr_top10.zip
Archive:  gislr_top10.zip
   creating: dataset_top10/
  inflating: dataset_top10/16069.hdf5
  ...
  inflating: dataset_top10/sign_to_prediction_index_map.json

成功すると dataset_top10 以下にデータが解凍されます.
HDF5ファイルはデータ本体で,手話者毎にファイルが別れています.
JSONファイルは辞書ファイルで,TXTファイルは本データセットのライセンスです.

!ls dataset_top10
16069.hdf5  25571.hdf5  29302.hdf5  36257.hdf5  49445.hdf5  62590.hdf5
18796.hdf5  26734.hdf5  30680.hdf5  37055.hdf5  53618.hdf5  LICENSE.txt
2044.hdf5   27610.hdf5  32319.hdf5  37779.hdf5  55372.hdf5  sign_to_prediction_index_map.json
22343.hdf5  28656.hdf5  34503.hdf5  4718.hdf5   61333.hdf5

単語辞書には単語名と数値の関係が10単語分定義されています.

!cat dataset_top10/sign_to_prediction_index_map.json
{
    "listen": 0,
    "look": 1,
    "shhh": 2,
    "donkey": 3,
    "mouse": 4,
    "duck": 5,
    "uncle": 6,
    "hear": 7,
    "pretend": 8,
    "cow": 9
}

ライセンスはオリジナルと同様に,CC-BY 4.0 としています.

!cat dataset_top10/LICENSE.txt
The dataset provided by Natsuki Takayama (Takayama Research and Development Office) is licensed under CC-BY 4.0.
Author: Copyright 2024 Natsuki Takayama
Title: GISLR Top 10 dataset
Original licenser: Deaf Professional Arts Network and the Georgia Institute of Technology
Modification
- Extract 10 most frequent words.
- Packaged into HDF5 format.

次のコードでサンプルを確認します.
サンプルは辞書型のようにキーバリュー形式で保存されており,下記のように階層化されています.

- サンプルID (トップ階層のKey)
  |- feature: 入力特徴量で `[C(=3), T, J(=543)]` 形状.C,T,Jは,それぞれ特徴次元,フレーム数,追跡点数です.
  |- token: 単語ラベル値で `[1]` 形状.0から9の数値です.
1
2
3
4
5
6
7
8
9
with h5py.File("dataset_top10/16069.hdf5", "r") as fread:
    keys = list(fread.keys())
    print(keys)
    group = fread[keys[0]]
    print(group.keys())
    feature = group["feature"][:]
    token = group["token"][:]
    print(feature.shape)
    print(token)
['1109479272', '11121526', ..., '976754415']
<KeysViewHDF5 ['feature', 'token']>
(3, 23, 543)
[1]

5.2 モジュールのダウンロード

次に,過去の記事で実装したコードをダウンロードします.
本項は前回までに紹介した内容と同じですので,飛ばしていただいても構いません. コードはGithubのsrc/modules_gislrにアップしてあります (今後の記事で使用するコードも含まれています).

まず,下記のコマンドでレポジトリをダウンロードします.
(目的のディレクトリだけダウンロードする方法はまだ調査中です(^^;))

!wget https://github.com/takayama-rado/trado_samples/archive/refs/tags/v0.1.zip -O master.zip
--2024-01-21 11:01:47--  https://github.com/takayama-rado/trado_samples/archive/master.zip
Resolving github.com (github.com)... 140.82.112.3
...
2024-01-21 11:01:51 (19.4 MB/s) - ‘master.zip’ saved [75710869]

ダウンロードしたリポジトリを解凍します.

!unzip -o master.zip -d master
Archive:  master.zip
641b06a0ca7f5430a945a53b4825e22b5f3b8eb6
   creating: master/trado_samples-main/
  inflating: master/trado_samples-main/.gitignore
  ...

モジュールのディレクトリをカレントディレクトリに移動します.

!mv master/trado_samples-main/src/modules_gislr .

他のファイルは不要なので削除します.

!rm -rf master master.zip gislr_top10.zip
!ls
dataset_top10 drive modules_gislr  sample_data

5.3 モジュールのロード

主要な処理の実装に先立って,下記のコードでモジュールをロードします.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import json
import math
import sys
import time
from functools import partial
from inspect import signature
from pathlib import Path
from typing import (
    Any,
    Dict
)

# Third party's modules
import numpy as np

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import (
    DataLoader)

from torchvision.transforms import Compose

# Local modules
sys.path.append("modules_gislr")
from modules_gislr.dataset import (
    HDF5Dataset,
    merge_padded_batch)
from modules_gislr.defines import (
    get_fullbody_landmarks
)
from modules_gislr.transforms import (
    PartsBasedNormalization,
    ReplaceNan,
    SelectLandmarksAndFeature,
    ToTensor
)
【コード解説】
- 標準モジュール
  - json: JSONファイル制御ライブラリ.辞書ファイルのロードに使用します.
  - math: 数学計算処理ライブラリ
  - sys: Pythonインタプリタの制御ライブラリ.
    今回はローカルモジュールに対してパスを通すために使用します.
  - time: 時間計測ライブラリ
  - functools: 関数オブジェクトを操作するためのライブラリ.
    今回はDataLoaderクラスに渡すパディング関数に対して設定値をセットするために使用します.
  - inspect.signature: オブジェクトの情報取得ライブラリ.
    今回は,認識モデルがマスキング処理に対応しているかを調べるために使用します.
  - pathlib.Path: オブジェクト指向のファイルシステム機能.
    主にファイルアクセスに使います.osモジュールを使っても同様の処理は可能です.
    高山の好みでこちらのモジュールを使っています(^^;).
  - typing: 関数などに型アノテーションを行う機能.
    ここでは型を忘れやすい関数に付けていますが,本来は全てアノテーションをした方が良いでしょう(^^;).
- 3rdパーティモジュール
  - numpy: 行列演算ライブラリ
  - torch: ニューラルネットワークライブラリ
  - torchvision: PyTorchと親和性が高い画像処理ライブラリ.
    今回はDatasetクラスに与える前処理をパッケージするために用います.
- ローカルモジュール: sys.pathにパスを追加することでロード可能
  - dataset: データセット操作用モジュール
  - defines: 各部位の追跡点,追跡点間の接続関係,およびそれらへのアクセス処理を
    定義したモジュール
  - transforms: 入出力変換処理モジュール

6. 認識モデルの実装

6.1 RNN Encoder層

ここから先は,認識モデルを実装していきます.
まずは,前回の記事で実装した (第6.1項をご参照ください) RNN層をベースとして,マスキング処理を加えます.

マスキングを行うかどうかのフラグ apply_mask を追加し,第3節で説明したマスキング処理を forward() に追加しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class RNNEncoder(nn.Module):
    def __init__(self,
                 in_channels,
                 out_channels,
                 num_layers,
                 activation,
                 bidir,
                 dropout,
                 apply_mask):
        super().__init__()

        # If num_layers = 1, we remove dropout option of RNN layers to avoid warning.
        rnn_dropout = 0.0 if num_layers == 1 else dropout

        self.rnn = nn.RNN(input_size=in_channels,
                          hidden_size=out_channels,
                          num_layers=num_layers,
                          nonlinearity=activation,
                          batch_first=True,
                          dropout=rnn_dropout,
                          bidirectional=bidir)
        # Dropout is added into intermediate RNN layers expect the last layer.
        # So, we add Dropout layer for the last layer explicitly.
        if dropout > 0:
            self.dropout = nn.Dropout(dropout)
        else:
            self.dropout = nn.Identity()

        self.apply_mask = apply_mask

    def forward(self, feature, feature_pad_mask=None):
        if feature_pad_mask is not None and self.apply_mask:
            tlength = feature_pad_mask.sum(axis=-1).detach().cpu()
            feature = nn.utils.rnn.pack_padded_sequence(
                feature, tlength, batch_first=True, enforce_sorted=False)

        hidden_seqs, last_hstate = self.rnn(feature)
        # Unpack hidden sequence.
        if isinstance(hidden_seqs, nn.utils.rnn.PackedSequence):
            hidden_seqs = nn.utils.rnn.unpack_sequence(hidden_seqs)
            # Back list to padded batch.
            hidden_seqs = nn.utils.rnn.pad_sequence(
                hidden_seqs, batch_first=True, padding_value=0.0)
        hidden_seqs = self.dropout(hidden_seqs)

        return hidden_seqs, last_hstate
【コード解説】
- 引数
  - in_channels: 入力特徴量の次元数
  - out_channels: 出力特徴量の次元数.
    bidir=Trueの場合,出力特徴量次元数は設定値の倍になります.
  - num_layers: RNN層の数
  - activation: RNN層内の活性化関数.
    ["tanh"/"relu"]で指定します.
  - bidir: Trueの場合,Bidirectional RNNを使用
  - dropout: Dropoutレイヤの欠落率
  - apply_mask: Trueの場合,マスキング処理を行う
- 10-29行目: 初期化処理
  - 13行目: RNN内のDropoutは `num_layers > 1` の場合のみ有効という仕様であるため,
    警告を避けるために `num_layers=1` の場合は設定値を上書きしています.
  - 15-21行目: SRNN層の作成
  - 24-27行目: RNN内のDropoutは最後のRNN層には適用されないという仕様であるため,
    明示的にDropout層を作成しています.
  - 28行目: `apply_mask` をセット
- 31-46行目: 推論処理
  - 32-35行目: `apply_mask == True` で,かつ,`feature_pad_mask is not None` の
    場合は,`feature` をPackedSequence型に変換
  - 37行目: RNNを適用
  - 39-43行目: `feature` がPackedSequence型の場合は,Tensor型に戻し,さらに,
    再度Paddingを行う.
  - 44行目: Dropoutを適用

6.2 出力層の更新

次に,出力層を更新して,マスキング処理を追加します.
forward() に対して,第3節で説明したマスキング処理を追加しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class GPoolRecognitionHead(nn.Module):
    def __init__(self,
                 in_channels,
                 out_channels):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels

        self.head = nn.Linear(in_channels, out_channels)
        self._init_weight()

    def _init_weight(self):
        nn.init.normal_(self.head.weight,
                        mean=0.,
                        std=math.sqrt(1. / self.out_channels))

    def forward(self, feature, feature_pad_mask=None):
        # Averaging over temporal axis.
        # `[N, C, T] -> [N, C, 1] -> [N, C]`
        if feature_pad_mask is not None:
            tlength = feature_pad_mask.sum(dim=-1)
            feature = feature * feature_pad_mask.unsqueeze(1)
            feature = feature.sum(dim=-1) / tlength.unsqueeze(-1)
        else:
            feature = F.avg_pool1d(feature, kernel_size=feature.shape[-1])
        feature = feature.reshape(feature.shape[0], -1)

        # Predict.
        feature = self.head(feature)
        return feature
【コード解説】
- 引数
  - in_channels: 入力特徴量の次元数
  - out_channels: 出力特徴量の次元数.単語応答値を出力したいので,全単語数と同じにします.
- 5-10行目: 初期化処理.
- 12-15行目: `self.head` のパラメータ初期化処理
- 17-25行目: 推論処理
  - 20-26行目: Global average pooling処理.
    - 20-23行目: `feature_pad_mask is not None` の場合は,マスキングを適用しながら平均特徴量を求めます.
    - 24-25行目: `feature_pad_mask is None` の場合は,`avg_pool1d()` のカーネルサイズを時系列長にすることで時系列全体の平均値を求めています.
  - 29行目: 単語毎の応答値に変換

6.3 認識モデル

最後に,認識モデルを更新します.
RNN層と出力層でマスキングを行うようにインタフェースを更新しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class RNNISLR(nn.Module):
    def __init__(self,
                 in_channels,
                 hidden_channels,
                 out_channels,
                 rnn_num_layers=1,
                 rnn_activation="tanh",
                 rnn_bidir=False,
                 rnn_dropout=0.1,
                 masking_type="both"):
        super().__init__()
        assert masking_type in ["none", "rnn", "head", "both"]

        self.linear = nn.Linear(in_channels, hidden_channels)
        self.activation = nn.ReLU()

        apply_mask = True if masking_type in ["rnn", "both"] else False
        self.rnn = RNNEncoder(
            in_channels=hidden_channels,
            out_channels=hidden_channels,
            num_layers=rnn_num_layers,
            activation=rnn_activation,
            bidir=rnn_bidir,
            dropout=rnn_dropout,
            apply_mask=apply_mask)

        if rnn_bidir:
            self.head = GPoolRecognitionHead(hidden_channels * 2, out_channels)
        else:
            self.head = GPoolRecognitionHead(hidden_channels, out_channels)

        self.masking_type = masking_type

    def forward(self, feature, feature_pad_mask=None):
        # Feature extraction.
        # `[N, C, T, J] -> [N, T, C, J] -> [N, T, C*J] -> [N, T, C']`
        N, C, T, J = feature.shape
        feature = feature.permute([0, 2, 1, 3])
        feature = feature.reshape(N, T, -1)

        feature = self.linear(feature)
        feature = self.activation(feature)

        hidden_seqs, last_hstate = self.rnn(feature, feature_pad_mask)[:2]

        # `[N, T, C'] -> [N, C', T]`
        feature = hidden_seqs.permute(0, 2, 1)

        if feature_pad_mask is not None and self.masking_type in ["head", "both"]:
            logit = self.head(feature, feature_pad_mask)
        else:
            logit = self.head(feature)
        return logit
【コード解説】
- 引数
  - in_channels: 入力特徴量の次元数
  - hidden_channels: RNN層の次元数.
    rnn_bidir=Trueの場合,内部では設定値の倍次元の特徴量を出力します.
  - out_channels: 出力特徴量の次元数.単語応答値を出力したいので,全単語数と同じにします.
  - rnn_num_layers: RNN層の数
  - rnn_activation: RNN層内の活性化関数.
    ["tanh"/"relu"]で指定します.
  - rnn_bidir: Trueの場合,Bidirectional RNNを使用
  - rnn_dropout: Dropoutレイヤの欠落率
  - masking_type: 指定に応じて,マスキングを適用します.
    - none: マスキングは行わない
    - rnn: RNN層だけマスキングを行う
    - head: 出力層だけマスキングを行う
    - both: RNN層と出力層でマスキングを行う
- 11-32行目: 初期化処理
  - 17-25行目: RNN層でマスキングを行う場合は,`apply_mask = True` として,`RNNEncoder` に渡します.
  - 27-30行目: rnn_bidirの設定に応じて,出力層の入力次元を調整しています.
- 34-53行目: 推論処理
  49-52行目: 出力層でマスキングを行う場合は,`feature_pad_mask` を渡します.

6.4 動作チェック

認識モデルの実装ができましたので,動作確認をしていきます.
次のコードでデータセットからHDF5ファイルとJSONファイルのパスを読み込みます.

1
2
3
4
5
6
7
8
# Access check.
dataset_dir = Path("dataset_top10")
files = list(dataset_dir.iterdir())
dictionary = [fin for fin in files if ".json" in fin.name][0]
hdf5_files = [fin for fin in files if ".hdf5" in fin.name]

print(dictionary)
print(hdf5_files)
dataset_top10/sign_to_prediction_index_map.json
[PosixPath('dataset_top10/55372.hdf5'), ..., PosixPath('dataset_top10/61333.hdf5')]

次のコードで辞書ファイルをロードして,認識対象の単語数を格納します.

1
2
3
4
5
# Load dictionary.
with open(dictionary, "r") as fread:
    key2token = json.load(fread)

VOCAB = len(key2token)

次のコードで前処理を定義します.
前回と同様に,以前に説明した追跡点の選定と,追跡点の正規化を前処理として適用して実験を行います.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
_, use_landmarks = get_fullbody_landmarks()
use_features = ["x", "y"]
trans_select_feature = SelectLandmarksAndFeature(landmarks=use_landmarks, features=use_features)
trans_repnan = ReplaceNan()
trans_norm = PartsBasedNormalization(align_mode="framewise", scale_mode="unique")

pre_transforms = Compose([trans_select_feature,
                          trans_repnan,
                          trans_norm])
transforms = Compose([ToTensor()])

次のコードで,前処理を適用したHDF5DatasetとDataLoaderをインスタンス化し,データを取り出します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
batch_size = 2
feature_shape = (len(use_features), -1, len(use_landmarks))
token_shape = (1,)
merge_fn = partial(merge_padded_batch,
                   feature_shape=feature_shape,
                   token_shape=token_shape,
                   feature_padding_val=0.0,
                   token_padding_val=0)

dataset = HDF5Dataset(hdf5_files, pre_transforms=pre_transforms, transforms=transforms)

dataloader = DataLoader(dataset, batch_size=batch_size, collate_fn=merge_fn)
try:
    data = next(iter(dataloader))
    feature_origin = data["feature"]

    print(feature_origin.shape)
except Exception as inst:
    print(inst)
torch.Size([2, 2, 25, 130])

次のコードでモデルをインスタンス化して,動作チェックをします.
追跡点抽出の結果,入力追跡点数は130で,各追跡点はXY座標値を持っていますので,入力次元数は260になります.
出力次元数は単語数なので10になります.
また,SRNN層の次元数は64に設定しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Define model.
# in_channels: J * C (130*2=260)
#   J: use_landmarks (130)
#   C: use_channels (2)
# out_channels: 10
in_channels = len(use_landmarks) * len(use_features)
hidden_channels = 64
out_channels = VOCAB
rnn_num_layers=2
rnn_bidir=True
rnn_dropout=0.1
masking_type="none"

model = RNNISLR(in_channels=in_channels,
                hidden_channels=hidden_channels,
                out_channels=out_channels,
                rnn_num_layers=rnn_num_layers,
                rnn_bidir=rnn_bidir,
                masking_type=masking_type)
print(model)

# Sanity check.
logit = model(feature_origin)
print(logit.shape)
RNNISLR(
  (linear): Linear(in_features=260, out_features=64, bias=True)
  (activation): ReLU()
  (rnn): RNNEncoder(
    (rnn): RNN(64, 64, num_layers=2, batch_first=True, dropout=0.1, bidirectional=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (head): GPoolRecognitionHead(
    (head): Linear(in_features=128, out_features=10, bias=True)
  )
)
torch.Size([2, 10])

7. 学習と評価

7.1 学習・評価処理の更新

認識モデルに対して,マスク配列を渡すように学習・評価処理を更新します.

学習ループ

まずは,次のコードで学習ループを更新します.
前回までの実装と互換性を持たせるために,ループ前にモデルがマスキングに対応しているかを調べて処理を切り替えるようにしています.
また,モデルがマスキングに対応している場合は マスク配列 feature_pad_mask をバッチからロードしてモデルに渡すようにしています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def train_loop(dataloader, model, loss_fn, optimizer, device, use_mask=True):
    num_batches = len(dataloader)
    train_loss = 0
    size = len(dataloader.dataset)

    # Inspect model signature.
    sig = signature(model.forward)
    use_mask = True if "feature_pad_mask" in sig.parameters and use_mask is True else False

    # Switch to training mode.
    model.train()
    # Main loop.
    print("Start training.")
    start = time.perf_counter()
    for batch_idx, batch_sample in enumerate(dataloader):
        feature = batch_sample["feature"]
        token = batch_sample["token"]
        feature = feature.to(device)
        token = token.to(device)

        # Predict.
        if use_mask:
            feature_pad_mask = batch_sample["feature_pad_mask"]
            feature_pad_mask = feature_pad_mask.to(device)
            pred = model(feature, feature_pad_mask=feature_pad_mask)
        else:
            pred = model(feature)
        # Compute loss.
        loss = loss_fn(pred, token.squeeze(-1))

        # Back propagation.
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        # Print current loss per 100 steps.
        if batch_idx % 100 == 0:
            loss = loss.item()
            steps = batch_idx * len(feature)
            print(f"loss:{loss:>7f} [{steps:>5d}/{size:>5d}]")
    print(f"Done. Time:{time.perf_counter()-start}")
    # Average loss.
    train_loss /= num_batches
    print("Training performance: \n",
          f"Avg loss:{train_loss:>8f}\n")
    return train_loss
【コード解説】
- 引数
  - dataloader: DataLoaderクラスのインスタンス
  - model: 認識モデルのインスタンス
  - loss_fn: Loss関数のインスタンス
  - optimizer: モデルのパラメータ制御クラスのインスタンス
  - device: 計算処理を行うデバイスを示す文字列 ("cpu"や"cuda"など)
  - use_mask: Trueの場合,学習時にマスキング処理を行う.
    ただし,モデルがマスキング処理に対応していない場合は,この設定は無視されます.
- 2行目: バッチ数を取得.この値は平均Lossを計算するために使用します.
- 3行目: Loss格納用変数を初期化
- 4行目: データ数を取得.この値は学習の進捗を表示するために使用します.
- 7-8行目: モデルがマスキング処理に対応しているかを解析.
  未対応の場合は,`use_mask` の設定を `False` で上書き
- 11行目: モデルを学習モードに切り替え
- 14-43行目: 学習ループ
  - 16-19行目: バッチデータをロードしてデバイス (CPUやGPU) に転送
  - 22-29行目: 推論後,Loss値を算出
    マスキング処理を行う場合は,`faeture_pad_mask` をロードしてモデルに渡しています.
  - 32-34行目: モデルのパラメータを更新
  - 36行目: 後で平均Lossを算出するために,Lossを加算
  - 39-42行目: 学習の進捗状況を表示
- 45-48行目: 平均Lossを計算して返す

バリデーションループ

次に,バリデーションループを実装します.
更新箇所は学習ループと同様です.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def val_loop(dataloader, model, loss_fn, device, use_mask=True):
    num_batches = len(dataloader)
    val_loss = 0

    # Inspect model signature.
    sig = signature(model.forward)
    use_mask = True if "feature_pad_mask" in sig.parameters and use_mask is True else False

    # Switch to evaluation mode.
    model.eval()
    # Main loop.
    print("Start validation.")
    start = time.perf_counter()
    with torch.no_grad():
        for batch_sample in dataloader:
            feature = batch_sample["feature"]
            token = batch_sample["token"]
            feature = feature.to(device)
            token = token.to(device)

            # Predict.
            if use_mask:
                feature_pad_mask = batch_sample["feature_pad_mask"]
                feature_pad_mask = feature_pad_mask.to(device)
                pred = model(feature, feature_pad_mask=feature_pad_mask)
            else:
                pred = model(feature)
            val_loss += loss_fn(pred, token.squeeze(-1)).item()
    print(f"Done. Time:{time.perf_counter()-start}")

    # Average loss.
    val_loss /= num_batches
    print("Validation performance: \n",
          f"Avg loss:{val_loss:>8f}\n")
    return val_loss
【コード解説】
- 引数
  - dataloader: DataLoaderクラスのインスタンス
  - model: 認識モデルのインスタンス
  - loss_fn: Loss関数のインスタンス
  - device: 計算処理を行うデバイスを示す文字列 ("cpu"や"cuda"など)
  - use_mask: Trueの場合,学習時にマスキング処理を行う.
    ただし,モデルがマスキング処理に対応していない場合は,この設定は無視されます.
- 2行目: バッチ数を取得.この値は平均Loss値を算出するために使用します.
- 3行目: Loss格納用変数を初期化
- 6-7行目: モデルがマスキング処理に対応しているかを解析.
  未対応の場合は,`use_mask` の設定を `False` で上書き
- 10行目: モデルを評価モードに切り替え
- 13-29行目: バリデーションループ
  - 14行目: ループを `with torch.no_grad()` で囲むことで,パラメータ更新に使用する
    勾配計算処理をOFFにしています.
    これにより無駄なメモリ消費と計算を抑制することができます.
  - 16-19行目: バッチデータをロードしてデバイス (CPUやGPU) に転送
  - 22-27行目: 推論後,Loss値を算出.
    マスキング処理を行う場合は,`faeture_pad_mask` をロードしてモデルに渡しています.
  - 28行目: 後で平均Lossを算出するために,Lossを加算
  - 32-35行目: 平均Loss値を計算して返す

テストループ

最後に,テストループを実装します.
更新箇所は学習ループと同様です.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def test_loop(dataloader, model, device, use_mask=False):
    size = len(dataloader.dataset)
    correct = 0

    # Inspect model signature.
    sig = signature(model.forward)
    use_mask = True if "feature_pad_mask" in sig.parameters and use_mask is True else False

    # Switch to evaluation mode.
    model.eval()
    # Main loop.
    print("Start evaluation.")
    start = time.perf_counter()
    with torch.no_grad():
        for batch_sample in dataloader:
            feature = batch_sample["feature"]
            token = batch_sample["token"]
            feature = feature.to(device)
            token = token.to(device)

            # Predict.
            if use_mask:
                feature_pad_mask = batch_sample["feature_pad_mask"]
                feature_pad_mask = feature_pad_mask.to(device)
                pred = model(feature, feature_pad_mask=feature_pad_mask)
            else:
                pred = model(feature)

            pred_ids = pred.argmax(dim=1).unsqueeze(-1)
            count = (pred_ids == token).sum().detach().cpu().numpy()
            correct += int(count)
    print(f"Done. Time:{time.perf_counter()-start}")

    acc = correct / size * 100
    print("Test performance: \n",
          f"Accuracy:{acc:>0.1f}%")
    return acc
【コード解説】
- 引数
  - dataloader: DataLoaderクラスのインスタンス
  - model: 認識モデルのインスタンス
  - device: 計算処理を行うデバイスを示す文字列 ("cpu"や"cuda"など)
  - use_mask: Trueの場合,学習時にマスキング処理を行う.
    ただし,モデルがマスキング処理に対応していない場合は,この設定は無視されます.
- 2行目: データ数を取得.この値は認識率を計算するために使用します.
- 3行目: 正解認識数格納用変数を初期化
- 6-7行目: モデルがマスキング処理に対応しているかを解析.
  未対応の場合は,`use_mask` の設定を `False` で上書き
- 10行目: モデルを評価モードに切り替え
- 13-32行目: テストループ
  - 14行目: ループを `with torch.no_grad()` で囲むことで,パラメータ更新に使用する
    勾配計算処理をOFFにしています.
    これにより無駄なメモリ消費と計算を抑制することができます.
  - 16-19行目: バッチデータをロードしてデバイス (CPUやGPU) に転送
  - 22-27行目: 推論処理.
    マスキング処理を行う場合は,`faeture_pad_mask` をロードしてモデルに渡しています.
  - 29-31行目: 正解数算出.
    最初に,最大応答値となる単語インデクスを算出.
    その後,正解ラベルと比較することで正解数を算出.
    後で認識率を計算するための正解数の総和を計算.
- 34-37行目: 認識率を計算して返す

7.2 共通設定

では,実際に学習・評価を行います.
まずは,実験全体で共通して用いる設定値を次のコードで実装します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# Set common parameters.
batch_size = 32
load_into_ram = True
test_pid = 16069
num_workers = 1
lr = 3e-4

epochs = 50
eval_every_n_epochs = 1
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} for computation.")

train_hdf5files = [fin for fin in hdf5_files if str(test_pid) not in fin.name]
val_hdf5files = [fin for fin in hdf5_files if str(test_pid) in fin.name]
test_hdf5files = [fin for fin in hdf5_files if str(test_pid) in fin.name]

_, use_landmarks = get_fullbody_landmarks()
use_features = ["x", "y"]
Using cpu for computation.

次のコードで学習・バリデーション・評価処理それぞれのためのDataLoaderクラスを作成します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Build dataloaders.
train_dataset = HDF5Dataset(train_hdf5files, pre_transforms=pre_transforms,
    transforms=transforms, load_into_ram=load_into_ram)
val_dataset = HDF5Dataset(val_hdf5files, pre_transforms=pre_transforms,
    transforms=transforms, load_into_ram=load_into_ram)
test_dataset = HDF5Dataset(test_hdf5files, pre_transforms=pre_transforms,
    transforms=transforms, load_into_ram=load_into_ram)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=merge_fn, num_workers=num_workers, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, collate_fn=merge_fn, num_workers=num_workers, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=1, collate_fn=merge_fn, num_workers=num_workers, shuffle=False)

7.3 学習・評価の実行

次のコードでモデルをインスタンス化します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
rnn_num_layers = 2
rnn_bidir = True
masking_type = "none"

model = RNNISLR(in_channels=in_channels,
                hidden_channels=hidden_channels,
                out_channels=out_channels,
                rnn_num_layers=rnn_num_layers,
                rnn_bidir=rnn_bidir,
                masking_type=masking_type)
print(model)

loss_fn = nn.CrossEntropyLoss(reduction="mean")
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
RNNISLR(
  (linear): Linear(in_features=260, out_features=64, bias=True)
  (activation): ReLU()
  (rnn): RNNEncoder(
    (rnn): RNN(64, 64, num_layers=2, batch_first=True, dropout=0.1, bidirectional=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (head): GPoolRecognitionHead(
    (head): Linear(in_features=128, out_features=10, bias=True)
  )
)

次のコードで学習・評価処理を行います.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# Train, validation, and evaluation.
model.to(device)

val_losses = []
test_accs = []
print("Start training.")
for epoch in range(epochs):
    print("-" * 80)
    print(f"Epoch {epoch+1}")

    train_loop(train_dataloader, model, loss_fn, optimizer, device)
    val_loss = val_loop(val_dataloader, model, loss_fn, device)
    val_losses.append(val_loss)

    if (epoch+1) % eval_every_n_epochs == 0:
        acc = test_loop(test_dataloader, model, device)
        test_accs.append(acc)
val_losses_stacked_bidir = np.array(val_losses)
test_accs_stacked_bidir = np.array(test_accs)
print(f"Minimum validation loss:{val_losses_stacked_bidir.min()} at {np.argmin(val_losses_stacked_bidir)+1} epoch.")
print(f"Maximum accuracy:{test_accs_stacked_bidir.max()} at {np.argmax(test_accs_stacked_bidir)*eval_every_n_epochs+1} epoch.")
Start training.
--------------------------------------------------------------------------------
Epoch 1
Start training.
loss:2.651798 [    0/ 3881]
loss:2.004460 [ 3200/ 3881]
Done. Time:25.232042656000033
Training performance: 
 Avg loss:2.237586

Start validation.
Done. Time:0.6116373980000276
Validation performance: 
 Avg loss:2.075622

Start evaluation.
Done. Time:1.7048921189999646
Test performance: 
 Accuracy:27.5%
--------------------------------------------------------------------------------
...
--------------------------------------------------------------------------------
Epoch 50
Start training.
loss:0.578132 [    0/ 3881]
loss:1.199638 [ 3200/ 3881]
Done. Time:27.51056778499992
Training performance: 
 Avg loss:0.771569

Start validation.
Done. Time:0.6108820279998781
Validation performance: 
 Avg loss:1.452735

Start evaluation.
Done. Time:1.4725680939998256
Test performance: 
 Accuracy:68.5%
Minimum validation loss:1.034569697720664 at 43 epoch.
Maximum accuracy:75.5 at 31 epoch.

以後,同様の処理をマスキング設定毎に繰り返します.
コード構成は同じですので,ここでは説明を割愛させていただきます.


今回はRNN層を用いた孤立手話単語認識モデルに対して,マスキング処理を追加して認識性能を向上させる方法を紹介しましたが,如何でしたでしょうか?
RNN層よりも出力層へのマスキングの方が効果が高いことは少し予想外だったかもしれません.

深層学習は複雑なため,最初に検討したアイデア,仮説が素直に当てはまるとは限りません.
複雑な処理を組むよりも,むしろシンプルなアイデアの方が上手くいくこともよくあります.
いずれにせよ,問題を細かく切り分けて実験をしていくことが重要です (と,今回の実験で再認識しました(^^;)).

今回紹介した話が,これから手話認識を勉強してみようとお考えの方に何か参考になれば幸いです.