アフィン変換による骨格追跡点の変形方法・PyTorch編

This image is generated with ChatGPT-4, and edited by the author.
作成日:2023年10月30日(月) 00:00
最終更新日:2024年10月07日(月) 11:51
カテゴリ:コンピュータビジョン
タグ:  アフィン変換 PyTorch 骨格追跡 Python

アフィン変換による骨格追跡点変形処理を,PyTorchで実装する方法を紹介します.

こんにちは.高山です.
以前の記事で,Numpy を用いてアフィン変換を骨格追跡点に適用して変形する方法を紹介しました.
今回は,同様の処理を PyTorch を用いて実装する方法を紹介します.
今回解説するスクリプトは GitHub 上に公開しています

更新履歴 (大きな変更のみ記載しています)

  • 2024/09/17: タイトル,タグ,サマリを更新しました

1. モジュールのインストールとロード

まず最初に,下記のコードでモジュールをロードします.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Standard modules.
import gc
import sys
import time
from functools import partial

# CV/ML.
import numpy as np

import torch
import torch.nn as nn
【コード解説】
- 標準モジュール
  - gc: ガベージコレクション用ライブラリ
    処理時間計測クラスの内部処理で用います.
  - sys: Pythonシステムを扱うライブラリ
    今回はバージョン情報を取得するために使用しています.
  - time: 時刻データを取り扱うためのライブラリ
    今回は処理時間を計測するために使用しています.
  - functools: 関数オブジェクトを操作するためのライブラリ
    処理時間計測クラスに渡す関数オブジェクト作成に使用しています.
- 画像処理・機械学習向けモジュール
  - numpy: 行列演算ライブラリ
    今回はデータのロードに用います.
  - torch: ニューラルネットワークライブラリ
    今回は行列演算機能を用います.

記事執筆時点のPythonと主要モジュールのバージョンは下記のとおりです.

1
2
3
print(f"Python:{sys.version}")
print(f"Numpy:{np.__version__}")
print(f"Torch:{torch.__version__}")
Python:3.10.12 (main, Jun 11 2023, 05:26:28) [GCC 11.4.0]
Numpy:1.23.5
Torch:2.0.1+cu118

2. データのロードと確認

次に,下記の処理で変換前と変換後の追跡点データをダウンロードします.
変換後の追跡点データはNumpy編で紹介した処理で生成したデータです.
このデータは今回の処理で生成したデータとの比較に用います.

!wget https://github.com/takayama-rado/trado_samples/raw/main/test_data/finger_far0_non_static.npy
!wget https://github.com/takayama-rado/trado_samples/raw/main/test_data/finger_far0_non_static_affine.npy

ls コマンドでデータがダウンロードされているか確認します.

!ls
finger_far0_non_static_affine.npy  finger_far0_non_static.npy  sample_data

3. 実験用処理の実装

実験に先立って,処理時間計測用の関数と実験用定数を定義します.
次のコードは処理時間の値から,適切なSI接頭辞を設定し,文字列にして返します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def get_perf_str(val):
    token_si = ["", "m", "µ", "n", "p"]
    exp_si = [1, 1e3, 1e6, 1e9, 1e12]
    perf_str = f"{val:3g}s"
    si = ""
    sval = val
    for token, exp in zip(token_si, exp_si):
        if val * exp > 1.0:
            si = token
            sval = val * exp
            break
    perf_str = f"{sval:3g}{si}s"
    return perf_str
- 引数
  - val: 処理時間を示す値 (秒)
- 2-6行目: 変数の初期化
- 7-11行目: 接頭辞を選択して,表示値を調整
- 12行目: 表示文字列を作成

次のコードは,処理時間を格納した配列から統計量を求めて表示します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def print_perf_time(intervals, top_k=None):
    if top_k is not None:
        intervals = np.sort(intervals)[:top_k]
    min = intervals.min()
    max = intervals.max()
    mean = intervals.mean()
    std = intervals.std()

    smin = get_perf_str(min)
    smax = get_perf_str(max)
    mean = get_perf_str(mean)
    std = get_perf_str(std)
    if top_k:
        print(f"Top {top_k} summary: Max {smax}, Min {smin}, Mean +/- Std {mean} +/- {std}")
    else:
        print(f"Overall summary: Max {smax}, Min {smin}, Mean +/- Std {mean} +/- {std}")
- 引数:
  - intervals: 処理時間のNumpy配列
  - top_k: intervalsから,処理時間が短いサンプルをk個取り出して統計量を算出
- 2-3行目: Top K個のサンプル抽出
- 4-7行目: 統計量算出
- 9-16行目: 表示文字列を作成して表示

次のクラスは,入力した関数を複数回呼び出して,処理時間の統計量を表示します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class PerfMeasure():
    def __init__(self,
                 trials=100,
                 top_k=10):
        self.trials = trials
        self.top_k = top_k

    def __call__(self, func):
        gc.collect()
        gc.disable()
        intervals = []
        for _ in range(self.trials):
            start = time.perf_counter()
            func()
            end = time.perf_counter()
            intervals.append(end - start)
        intervals = np.array(intervals)
        print_perf_time(intervals)
        if self.top_k:
            print_perf_time(intervals, self.top_k)
        gc.enable()
        gc.collect()
- 引数:
  - trials: 入力関数の実行回数
  - top_k: 値が定義されている場合,全体の処理時間配列から処理時間が短いサンプルをk個取り出して統計量を算出
- 9-10行目: 計測中はガベージコレクションをしないように設定
- 11-20行目: 処理時間計測処理
- 21-22行目: ガベージコレクションの設定を元に戻す

最後に,次のコードで実験用の定数を定義して処理時間計測クラスをインスタンス化しています.

1
2
3
4
5
6
TRIALS = 100
TOPK = 10
pmeasure = PerfMeasure(TRIALS, TOPK)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Target device is {DEVICE}.")
JIT_OPT = False

1-3行目は処理時間計測用の処理です.
実際の所,Colabの様な複雑な処理環境で他のプロセスの影響を排除して純粋な処理時間を計測するのはかなり難しいです.
そこでここでは,処理の繰り返し数を \(100\) とし全体の統計量に加えて,処理時間の短い代表値 \(10\) 試行からも統計量を表示するようにしています.

4-5行目ではランタイプを種類に応じて,CPU/GPUを切り替えています.

6行目はJITコンパイル時の最適化を抑制する設定です.
最適化を抑制しない場合に処理の再コンパイルが発生する現象が報告されています.
公式ドキュメントに記載が無いので何とも言えないのですが,本記事では最適化を抑制して実験を行います.

4. アフィン変換の実装

ここから先は変換処理の実装・実行を行います.
Tensorflowと同じくPyTorchも,プログラムの実行時に処理をコンパイル (計算グラフの作成) して実行します.
現在のPyTorchには,下記に示すとおり2種類の計算グラフ作成方法があります.

  • Define-by-Run: データを入力した際に処理のコンパイルと実行を同時に行う.
    Pythonの実行環境と親和性が高くインタラクティブな実行環境で利用しやすくなっています.
  • Define-and-Run (TorchScript): (C言語のように) 処理のコンパイルと実行が明確に別れて行われる.
    処理の最適化性能が高いため高速に動作します.

今回は,変換処理をそれぞれの計算グラフ作成方法で実行して処理時間を比較してみたいと思います.

4.1. Define-by-run に基づく実装

本項ではまず,Define-by-run を用いた場合の実装を紹介します.
以前の記事の後半で紹介した,対象物内の特定位置を変換軸とする変換を実装していきます.

変換行列の算出処理

次のコードは,入力パラメータに応じた変換行列を算出する関数を実装しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def get_affine_matrix_2d_torch(center,
                               trans,
                               scale,
                               rot,
                               skew,
                               dtype=torch.float32):
    device = center.device
    center_m = torch.tensor([[1.0, 0.0, float(-center[0])],
                             [0.0, 1.0, float(-center[1])],
                             [0.0, 0.0, 1.0]], dtype=dtype, device=device)
    scale_m = torch.tensor([[float(scale[0]), 0.0, 0.0],
                            [0.0, float(scale[1]), 0.0],
                            [0.0, 0.0, 1.0]], dtype=dtype, device=device)
    _cos = torch.cos(rot)
    _sin = torch.sin(rot)
    rot_m = torch.tensor([[float(_cos), float(-_sin), 0.0],
                          [float(_sin), float(_cos), 0.0],
                          [0.0, 0.0, 1.0]], dtype=dtype, device=device)
    _tan = torch.tan(skew)
    skew_m = torch.tensor([[1.0, float(_tan[0]), 0.0],
                           [float(_tan[1]), 1.0, 0.0],
                           [0.0, 0.0, 1.0]], dtype=dtype, device=device)
    move = center + trans
    trans_m = torch.tensor([[1.0, 0.0, float(move[0])],
                            [0.0, 1.0, float(move[1])],
                            [0.0, 0.0, 1.0]], dtype=dtype, device=device)
    # Make affine matrix.
    mat = torch.eye(3, 3, dtype=dtype, device=device)
    mat = torch.matmul(center_m, mat)
    mat = torch.matmul(scale_m, mat)
    mat = torch.matmul(rot_m, mat)
    mat = torch.matmul(skew_m, mat)
    mat = torch.matmul(trans_m, mat)
    return mat.to(dtype)
- 引数
  - center: 変換軸座標 `(center_x, center_y)`
    通常は物体中心位置や特定の追跡点位置を指定します.
  - trans: 平行移動量 `(trans_x, trans_y)`
  - scale: 拡大縮小量 `(scale_x, scale_y)`
  - rot: 回転量 (ラジアン)
    この値のみスカラーです.
  - skew: せん断量 (ラジアン) `(skew_x, skew_y)`
  - dtype: 出力データ型
- 7-26行目: 各変換行列を算出
  `center_m` の算出では,指定座標を原点に移動するためにマイナスをかけた値を移動量として設定しています.
  回転とせん断はラジアン値を入力として,それぞれ対応する三角関数を適用した値を設定しています.
  平行移動では,最初に行う指定座標の原点への移動をオフセットとして加えた値を移動量として設定しています.
- 28-33行目: 初めに`mat` を単位行列で初期化し,各変換行列を順次適用
- 34行目: `dtype` で指定した型に変換して値を返す

Define-and-run 時と実装を揃えるために,center などのパラメータは torch.Tensor 型で渡す想定になっています.
一方,torch.tensor()torch.Tensor をインスタンス化する際は,引数のリスト要素は全て同じ型である必要があります.
そのため,torch.tensor() に値を渡す際に float() でキャストするようにしています.

アフィン変換の適用処理

次のコードは,追跡点配列に変換行列を適用する関数を実装しています.

1
2
3
4
5
6
7
def apply_affine_torch(inputs, mat):
    xy = inputs[:, :, :2]
    ones =  torch.ones([xy.shape[0], xy.shape[1], 1], device=inputs.device)
    xy = torch.cat([xy, ones], dim=-1)
    xy = torch.einsum("...j,ij", xy, mat)
    inputs[:, :, :2] = xy[:, :, :-1]
    return inputs
- 引数:
  - inputs: 追跡点配列 `[T, J, C]`
    - T: 動画フレームインデクス
    - J: 追跡点インデクス
    - C: 特徴量インデクス.今回は $(x, y, z, c)$ の4次元特徴量を用いています.
  - mat: アフィン変換行列 `[3, 3]`
- 2-5行目: 追跡点配列から $(x, y)$ 座標列を取り出して,特徴量次元の末尾に $1$ を加えて同次座標形式に変換
- 5行目: `xy` の特徴量次元に対してアフィン変換行列を適用
- 6行目: 変換後の$(x, y)$ 座標列を `inputs` に代入して返す

5行目の変換行列の適用では,Numpy版と同様にアインシュタインの縮約表記を用いた演算を行っています.
Einsum についてはこちらに簡単な解説記事を用意しています.
ご一読いただければうれしいです.

アフィン変換の実行

次のコードでは,追跡点を trackdata にロードして,関数の仕様に合わせて形状を変更しています.
今回用いるデータは [P, T, J, C] 形状 ( P は人物インデクス) の配列ですが,動画に映っている人物は1名だけですので P 軸は除去しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Load data.
trackfile = "./finger_far0_non_static.npy"
reffile = "./finger_far0_non_static_affine.npy"
trackdata = np.load(trackfile).astype(np.float32)
refdata = np.load(reffile).astype(np.float32)
print(trackdata.shape)

# Remove person axis.
trackdata = trackdata[0]
refdata = refdata[0]

次のコードでは,アフィン変換行列の設定パラメータを次のように指定しています.

  • 変換軸座標: \((C_x, C_y) = (638.0, 389.0)\),おおよそ両肩の中心になる座標を指定しています.
  • 平行移動量: \((T_x, T_y) = (100.0, 0.0)\)
  • 拡大縮小量: \((S_x, S_y) = (2.0, 0.5)\)
  • 回転量: \(R = \pi * 15.0 / 180.0\)
  • せん断量: \((K_x, K_y) = \pi * (15.0, 15.0) / 180.0\)

回転量とせん断量はラジアンになってます.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
 # Get affine matrix.
center = torch.tensor([638.0, 389.0]).to(DEVICE)
trans = torch.tensor([100.0, 0.0]).to(DEVICE)
scale = torch.tensor([2.0, 0.5]).to(DEVICE)
rot = torch.tensor(np.radians(15.0)).to(DEVICE)
skew = torch.tensor(np.radians([15.0, 15.0])).to(DEVICE)
dtype = torch.float32
print("Parameters")
print("Center:", center)
print("Trans:", trans)
print("Scale:", scale)
print("Rot:", rot)
print("Skew:", skew)

print処理の結果を示します.

Parameters
Center: tensor([638., 389.])
Trans: tensor([100.,   0.])
Scale: tensor([2.0000, 0.5000])
Rot: tensor(0.2618, dtype=torch.float64)
Skew: tensor([0.2618, 0.2618], dtype=torch.float64)

では,アフィン変換処理を実行していきます.
まず次のコードで処理時間計測クラスに入力するためのラッパー関数を定義します.

1
2
3
def perf_wrap_func(trackdata, center, trans, scale, rot, skew, dtype):
    mat = get_affine_matrix_2d_torch(center, trans, scale, rot, skew, dtype=dtype)
    newtrack = apply_affine_torch(trackdata, mat)

次のコードは先程実装した変換処理にtrackdataを入力して,変換後の追跡点newtrackを得ています.
1回目の処理呼び出しでは,処理のコンパイルが行われるため個別に時間を計測し,参照用の変換後追跡点との誤差を計測しています.
その後,処理時間計測クラスを用いて平均処理時間を表示しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

# The 1st call may be slow because of the computation graph construction.
print(f"Time of first call.")
start = time.perf_counter()
mat = get_affine_matrix_2d_torch(center, trans, scale, rot, skew, dtype=dtype)
newtrack = apply_affine_torch(testtrack, mat)
interval = time.perf_counter() - start
print_perf_time(np.array([interval]))

# Evaluate difference.
diff = (np.round(newtrack.detach().cpu().numpy()) - np.round(refdata)).sum()
print(f"Sum of error:{diff}")

testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

print("Time after second call.")
target_fn = partial(perf_wrap_func,
                    trackdata=testtrack,
                    center=center, trans=trans, scale=scale, rot=rot, skew=skew,
                    dtype=dtype)
pmeasure(target_fn)

print 処理の結果を示します.
1回目の処理時間は97.2ミリ秒程度で,誤差はほぼありませんでした.
2回目以降の処理では,(他のプロセスの影響が少ない場合は) 2.8ミリ秒程度で処理が完了しています.

Time of first call.
Overall summary: Max 97.1556ms, Min 97.1556ms, Mean +/- Std 97.1556ms +/-   0s
Sum of error:0.0
Time after second call.
Overall summary: Max 7.80179ms, Min 2.62753ms, Mean +/- Std 3.22169ms +/- 703.049µs
Top 10 summary: Max 2.8531ms, Min 2.62753ms, Mean +/- Std 2.77958ms +/- 78.8812µs

入力データ形状が変わった場合の挙動

Tensorflowの記事と同じく,今回も入力データの形状が変わった場合の挙動を調査してみたいと思います.
次のコードは先程のアフィン変換の実行処理とほぼ同じですが,入力データの時間長が1フレーム分短くなっています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

# The 1st call may be slow because of the computation graph construction.
print(f"Time of first call.")
start = time.perf_counter()
mat = get_affine_matrix_2d_torch(center, trans, scale, rot, skew, dtype=dtype)
newtrack = apply_affine_torch(testtrack[:-1], mat)
interval = time.perf_counter() - start
print_perf_time(np.array([interval]))

testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

print("Time after second call.")
target_fn = partial(perf_wrap_func,
                    trackdata=testtrack[:-1],
                    center=center, trans=trans, scale=scale, rot=rot, skew=skew,
                    dtype=dtype)
pmeasure(target_fn)

print 処理の結果を示します.
1回目の処理では4.9ミリ秒程度,2回目以降の処理では2.5ミリ秒程度で処理が完了しています.
Tensorflowと異なり,PyTorchの公式ドキュメントには再トレーシングに関する情報が記載されていません.
そのため確定的ではないですが,1回目と2回目以降の処理時間を見る限りでは処理の再トレーシングは起きていないようです.

Time of first call.
Overall summary: Max 4.85323ms, Min 4.85323ms, Mean +/- Std 4.85323ms +/-   0s
Time after second call.
Overall summary: Max 6.98628ms, Min 1.75266ms, Mean +/- Std 3.15871ms +/- 792.97µs
Top 10 summary: Max 2.80251ms, Min 1.75266ms, Mean +/- Std 2.51224ms +/- 394.804µs

4.2. Define-and-run に基づく実装

ここから先は,Define-and-run を用いた場合の挙動について見ていきます.

PyTorchでは @torch.jit.script デコレータで修飾した関数は TorchScript と呼ぶ中間表現に事前変換されます.
今回は Define-and-Run で動作するように変換処理を実装しているので,コードの変更はほとんどありません.

変換行列の算出処理

次のコードでは,先程紹介した変換行列処理関数を Define-and-run で動作するようにしています.
内部の処理は先程のコードと完全に同じです.
ただし,JITコンパイルした場合は,np.ndarray を入力として受け付けることができなくなるため,ここでは center: torch.Tensor のように型アノテーション付きで宣言しています.

1
2
3
4
5
6
7
8
@torch.jit.script
def get_affine_matrix_2d_torch_jit(center: torch.Tensor,
                                   trans: torch.Tensor,
                                   scale: torch.Tensor,
                                   rot: torch.Tensor,
                                   skew: torch.Tensor,
                                   dtype: torch.dtype = torch.float32):
    ...

アフィン変換の適用処理

次のコードは,追跡点配列に変換行列を適用する関数を実装しています.
中身のコードは完全に同じなのでここでは説明を省かさせていただきます.

1
2
3
@torch.jit.script
def apply_affine_torch_jit(inputs, mat):
    ...

アフィン変換の実行

アフィン変換を実行します.
まず次のコードで処理時間計測クラスに入力するためのラッパー関数を定義します.

1
2
3
def perf_wrap_func(trackdata, center, trans, scale, rot, skew, dtype):
    mat = get_affine_matrix_2d_torch_jit(center, trans, scale, rot, skew, dtype=dtype)
    newtrack = apply_affine_torch_jit(trackdata, mat)

次のコードで変換処理を実行しています.
先程と同じく,1回目の処理呼び出しでは,処理のコンパイルが行われるため個別に時間を計測し,2回目以降に処理時間計測クラスを用いて平均時間を表示しています.
なお,次のコードではwith 文でJITの最適化を抑制する設定をしています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

with torch.jit.optimized_execution(JIT_OPT):
    # The 1st call may be slow because of the computation graph construction.
    print(f"Time of first call.")
    start = time.perf_counter()
    mat = get_affine_matrix_2d_torch_jit(center, trans, scale, rot, skew, dtype=dtype)
    newtrack = apply_affine_torch_jit(testtrack, mat)
    interval = time.perf_counter() - start
    print_perf_time(np.array([interval]))

    # Evaluate difference.
    diff = (np.round(newtrack.detach().cpu().numpy()) - np.round(refdata)).sum()
    print(f"Sum of error:{diff}")

    testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

    print("Time after second call.")
    target_fn = partial(perf_wrap_func,
                        trackdata=testtrack,
                        center=center, trans=trans, scale=scale, rot=rot, skew=skew,
                        dtype=dtype)
    pmeasure(target_fn)

print 処理の結果を示します.
ここの結果は少し興味深いです.
1回目の処理では処理のコンパイルが走るはずなので処理時間が長くなると予想されるのですが,実際には4.5ミリ秒程度で完了しています.
これはJITの最適化を抑制した効果と思われます.
JITの最適化を行う場合は106.9ミリ秒程度時間がかかりました.
2回目以降の処理では1.4ミリ秒程度で処理が完了しています.

Time of first call.
Overall summary: Max 4.54288ms, Min 4.54288ms, Mean +/- Std 4.54288ms +/-   0s
Sum of error:0.0
Time after second call.
Overall summary: Max 3.07281ms, Min 1.41188ms, Mean +/- Std 1.64159ms +/- 309.479µs
Top 10 summary: Max 1.4548ms, Min 1.41188ms, Mean +/- Std 1.43873ms +/- 17.1055µs

入力データ形状が変わった場合の挙動

次に,先程と同じく入力データの形状が変わった場合の挙動を示します.
次のコードは先程のアフィン変換の実行処理とほぼ同じですが,入力データの時間長が1フレーム分短くなっています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

with torch.jit.optimized_execution(JIT_OPT):
    # The 1st call may be slow because of the computation graph construction.
    print(f"Time of first call.")
    start = time.perf_counter()
    mat = get_affine_matrix_2d_torch_jit(center, trans, scale, rot, skew, dtype=dtype)
    newtrack = apply_affine_torch_jit(testtrack[:-1], mat)
    interval = time.perf_counter() - start
    print_perf_time(np.array([interval]))

    # Evaluate difference.
    diff = (np.round(newtrack.detach().cpu().numpy()) - np.round(refdata[:-1])).sum()
    print(f"Sum of error:{diff}")

    testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

    print("Time after second call.")
    target_fn = partial(perf_wrap_func,
                        trackdata=testtrack[:-1],
                        center=center, trans=trans, scale=scale, rot=rot, skew=skew,
                        dtype=dtype)
    pmeasure(target_fn)

print 処理の結果を示します.
1回目の処理では5.0ミリ秒程度,2回目以降の処理では2.3ミリ秒程度で処理が完了しています.
1回目および2回目以降の処理時間は大きくは変わらず,PyTorchでは処理の再トレーシングは起きていないと予想されます.

Time of first call
Summary: Max 5.03474ms, Min 5.03474ms, Mean +/- Std 5.03474ms +/-   0s
Time after second call
Summary: Max 2.45161ms, Min 2.04306ms, Mean +/- Std 2.30862ms +/- 145.906µs

5. データ拡張用途として,ランダムな変換を行う方法

第5節では,アフィン変換をデータ拡張に応用する方法を紹介します.
処理の流れを図1に示します.

追跡データに対して,ランダムなパラメータでアフィン変換を適用する処理のフローチャートです.画像に続いて説明があります.
アフィン変換処理フロー

変換処理自体は,次の3個の処理から構成されます.

  1. 変換軸算出と変換パラメータの生成
  2. 変換行列の算出
  3. アフィン変換の適用

また,データ拡張に応用する場合は,変換を適用するかどうかを確率的に決める処理が入る場合が多いです.

今回はPyTorchを用いているので,より高速な動作が期待できる Define-and-run を用いた実装形態を紹介したいと思います.
具体的には,次の図2に示す2種類の実装形態を紹介します.

PyTorchを用いたアフィン変換の処理フローチャートです.左側にはNumpy版と同様の標準的な実装形態を示しています.右側にはDefine-and-runに基づく高速な実装形態を示しています.各実装形態の説明は次項以降の文章で行っています.
Define-and-runに基づくランダム変換

図2(a) に示す一つ目の実装形態では,全体の処理は通常のPythonプロセスを用いて組み,PythonプロセスからJITコンパイルをした関数を読み出します.
こちらの実装は比較的単純で,Numpy版の実装と同様の処理構成が維持できます.

図2(b) に示す2つ目の実装形態では,分岐やランダムパラメータの生成を含んだ処理全体をJITコンパイルします.
こちらの実装では,コンパイル可能な形に処理構成を変更する必要があります.

では,次項から具体的な処理を説明していきます.

5.1 実装形態1: 標準クラスからJITコンパイルをした関数を呼び出す

データ拡張クラス

図2(a) に示した処理を実装したクラスが次のコードになります.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class RandomAffineTransform2D_Torch():
    def __init__(self,
                 center_joints,
                 apply_ratio,
                 trans_range,
                 scale_range,
                 rot_range,
                 skew_range,
                 random_seed=None,
                 device="cpu",
                 dtype=torch.float32):

        self.center_joints = center_joints
        if isinstance(self.center_joints, int):
            self.center_joints = [self.center_joints]

        self.apply_ratio = apply_ratio
        self.trans_range = trans_range
        self.scale_range = scale_range
        self.rot_range = np.radians(rot_range).tolist()
        self.skew_range = np.radians(skew_range).tolist()
        self.dtype = dtype
        self.device = device
        self.rng = torch.Generator(device=device)
        if random_seed is not None:
            self.rng.manual_seed(random_seed)

    def __call__(self, inputs):
        if torch.rand(1, generator=self.rng, device=self.device) >= self.apply_ratio:
            return inputs

        temp = inputs[:, self.center_joints, :]
        temp = temp.reshape([inputs.shape[0], -1, inputs.shape[-1]])
        mask = temp.sum(dim=(1, 2)) != 0
        # Use x and y only.
        center = temp[mask].mean(dim=0).mean(dim=0)[:2]

        # Random value in [0, 1].
        trans = torch.rand(2, generator=self.rng, device=self.device)
        scale = torch.rand(2, generator=self.rng, device=self.device)
        rot = torch.rand(1, generator=self.rng, device=self.device)
        skew = torch.rand(2, generator=self.rng, device=self.device)
        # Scale to target range.
        trans = (self.trans_range[1] - self.trans_range[0]) * trans + self.trans_range[0]
        scale = (self.scale_range[1] - self.scale_range[0]) * scale + self.scale_range[0]
        rot = (self.rot_range[1] - self.rot_range[0]) * rot + self.rot_range[0]
        skew = (self.skew_range[1] - self.skew_range[0]) * skew + self.skew_range[0]

        # Calculate matrix.
        mat = get_affine_matrix_2d_torch_jit(center, trans, scale, rot, skew,
            dtype=self.dtype)

        # Apply transform.
        inputs = apply_affine_torch_jit(inputs, mat)
        return inputs
- 引数
  - center_joints: 変換軸座標算出に使用する追跡点インデクス.
    ここで指定した追跡点の重心 (の全フレーム平均) が変換軸になります.
  - apply_ratio: 変換を適用する確率,[0.0, 1.0]で指定
  - trans_range: 平行移動量の範囲 `(minimum, maximum)`
  - scale_range: 拡大縮小量の範囲 `(minimum, maximum)`
  - rot_range: 回転量の範囲,度数で指定 `(minimum, maximum)`
  - skew_range: せん断量の範囲,度数で指定 `(minimum, maximum)`
  - random_seed: 疑似乱数生成器のシード,Noneの場合はPyTorchのグローバル設定が用いられる
  - device: torch.Tensorを扱うデバイスを指定 `{cpu, cuda}`
  - dtype: 出力データ型
- 13-26行目: クラスのインスタンス化処理
  24-26行目では疑似乱数生成器を生成しています.`random_seed` が指定されている場合は
  その値を用いて生成し,`None` の場合はPyTorchのグローバル設定を用いるようにしています.
- 28-55行目: ランダム変換処理
  - 29-30行目: 乱数を生成し,`apply_ratio` 以上だった場合は何もせずに値を返す
  - 32-36行目: 変換軸を算出
    まず初めに,`center_joints` で指定した追跡点配列を抽出します.
    次に,欠損フレームを除去するための `mask` を生成します.
    最後に,`mask` を適用した上で平均座標 (x, y) を算出し `center` としています.
  - 39-47行目: 指定した範囲で各変換パラメータをランダムに生成
  - 50-51行目: アフィン変換行列を算出
    ここでは先程実装した `get_affine_matrix_2d_torch_jit()` をクラスから呼び出すようにしています.
  - 54行目: アフィン変換適用
    ここでは先程実装した `apply_affine_torch_jit` をクラスから呼び出すようにしています.

データ拡張処理の実行

必要な処理が実装できましたので,処理を適用してみます.
まず,次のコードで変換クラスをインスタンス化します.

1
2
3
4
5
6
7
8
9
aug_fn = RandomAffineTransform2D_Torch(
    center_joints=[11, 12],
    apply_ratio=1.0,
    trans_range=[-100.0, 100.0],
    scale_range=[0.5, 2.0],
    rot_range=[-30.0, 30.0],
    skew_range=[-30.0, 30.0],
    device=DEVICE,
    dtype=dtype)

ここでは,center_joints に両肩の追跡点を指定しています.
この場合,変換軸は両肩の中央,つまり首元の座標になります.

次のコードでは,trackdata (のコピー) に対してランダムなパラメータで変換を施しています.
先程と同じく,1回目の処理呼び出しでは処理のコンパイルが行われるため個別に時間を計測し,2回目以降に処理時間計測クラスを用いて平均時間を表示しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

with torch.jit.optimized_execution(JIT_OPT):
    # The 1st call may be slow because of the computation graph construction.
    print(f"Time of first call.")
    start = time.perf_counter()
    temp = aug_fn(testtrack)
    interval = time.perf_counter() - start
    print_perf_time(np.array([interval]))

    testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)
    print("Time after second call.")
    target_fn = partial(aug_fn, inputs=testtrack)
    pmeasure(target_fn)

print 処理の結果を示します.
1回目の処理では30.6ミリ秒程度かかっていますが,2回目以降の処理では1.9ミリ秒程度で処理が完了していることが分かります.

Time of first call.
Overall summary: Max 30.5797ms, Min 30.5797ms, Mean +/- Std 30.5797ms +/-   0s
Time after second call.
Overall summary: Max 4.47461ms, Min 1.86795ms, Mean +/- Std 2.14591ms +/- 347.849µs
Top 10 summary: Max 1.93027ms, Min 1.86795ms, Mean +/- Std 1.90984ms +/- 17.3263µs

入力データ形状が変わった場合の挙動

第4節と同じく入力データの形状が変わった場合の挙動を示します.
次のコードは上の変換実行処理とほぼ同じですが,入力データの時間長が1フレーム分短くなっています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

with torch.jit.optimized_execution(JIT_OPT):
    # The 1st call may be slow because of the computation graph construction.
    print(f"Time of first call.")
    start = time.perf_counter()
    temp = aug_fn(testtrack[:-1])
    interval = time.perf_counter() - start
    print_perf_time(np.array([interval]))

    testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)
    print("Time after second call.")
    target_fn = partial(aug_fn, inputs=testtrack[:-1])
    pmeasure(target_fn)

print 処理の結果を示します.
1回目の処理は6.0ミリ秒程度で,2回目以降の処理では2.0ミリ秒程度でした.

Time of first call.
Overall summary: Max 5.96948ms, Min 5.96948ms, Mean +/- Std 5.96948ms +/-   0s
Time after second call.
Overall summary: Max 4.98512ms, Min 1.86971ms, Mean +/- Std 2.21059ms +/- 502.983µs
Top 10 summary: Max 1.98956ms, Min 1.86971ms, Mean +/- Std 1.95277ms +/- 42.2789µs

5.2 実装形態2: nn.Moduleを継承したクラスをJITコンパイルする

データ拡張クラス

次に,図2(b) に示した処理を実装したクラスを示します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class RandomAffineTransform2D_TorchModule(nn.Module):
    def __init__(self,
                 center_joints,
                 apply_ratio,
                 trans_range,
                 scale_range,
                 rot_range,
                 skew_range,
                 random_seed=None,
                 device="cpu",
                 dtype=torch.float32):
        super().__init__()

        self.center_joints = center_joints
        if isinstance(self.center_joints, int):
            self.center_joints = [self.center_joints]

        self.apply_ratio = apply_ratio
        self.trans_range = trans_range
        self.scale_range = scale_range
        self.rot_range = np.radians(rot_range).tolist()
        self.skew_range = np.radians(skew_range).tolist()
        self.dtype = dtype
        self.device = device
        # self.rng = torch.Generator(device=device)
        # if random_seed is not None:
        #     self.rng.manual_seed(random_seed)
        self.rng = None

    def forward(self, inputs):
        if torch.rand(1, generator=self.rng, device=self.device) >= self.apply_ratio:
            return inputs

        temp = inputs[:, self.center_joints, :]
        temp = temp.reshape([inputs.shape[0], -1, inputs.shape[-1]])
        mask = temp.sum(dim=(1, 2)) != 0
        # Use x and y only.
        center = temp[mask].mean(dim=0).mean(dim=0)[:2]

        # Random value in [0, 1].
        trans = torch.rand(2, generator=self.rng, device=self.device)
        scale = torch.rand(2, generator=self.rng, device=self.device)
        rot = torch.rand(1, generator=self.rng, device=self.device)
        skew = torch.rand(2, generator=self.rng, device=self.device)
        # Scale to target range.
        trans = (self.trans_range[1] - self.trans_range[0]) * trans + self.trans_range[0]
        scale = (self.scale_range[1] - self.scale_range[0]) * scale + self.scale_range[0]
        rot = (self.rot_range[1] - self.rot_range[0]) * rot + self.rot_range[0]
        skew = (self.skew_range[1] - self.skew_range[0]) * skew + self.skew_range[0]

        # Calculate matrix.
        mat = get_affine_matrix_2d_torch_jit(center, trans, scale, rot, skew,
            dtype=self.dtype)

        # Apply transform.
        inputs = apply_affine_torch_jit(inputs, mat)
        return inputs

処理の中身は実装形態1と同じなのですが,2点ほど気をつける箇所があります.

一つ目は,クラスに実装した処理全体をJITコンパイルする場合は,__call__ メソッドを @torch.jit.script デコレータで変換することはできません.
この場合は,まず nn.Module クラスを継承して実装し,インスタンスを torch.jit.script() で変換するという手順を踏む必要があります.

二つ目は,torch.Generator() のようなクラスを attribute として持つことはできません.
現在のところJITコンパイルするクラスは torch.Tensor 型に変換可能な attribute しか持つことができないようです.

データ拡張処理の実行

必要な処理が実装できましたので,処理を適用してみます.
まず,次のコードで変換クラスをインスタンス化し,さらにJITコンパイルを適用します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
aug_fn = RandomAffineTransform2D_TorchModule(
    center_joints=[11, 12],
    apply_ratio=1.0,
    trans_range=[-100.0, 100.0],
    scale_range=[0.5, 2.0],
    rot_range=[-30.0, 30.0],
    skew_range=[-30.0, 30.0],
    device=DEVICE,
    dtype=dtype)
aug_fn = torch.jit.script(aug_fn)

次のコードでは,trackdata (のコピー) に対してランダムなパラメータで変換を施しています.
前項と同じく,1回目の処理呼び出しでは処理のコンパイルが行われるため個別に時間を計測し,2回目以降に同じ処理を複数回適用して平均時間を表示しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

with torch.jit.optimized_execution(JIT_OPT):
    # The 1st call may be slow because of the computation graph construction.
    print(f"Time of first call.")
    start = time.perf_counter()
    temp = aug_fn(testtrack)
    interval = time.perf_counter() - start
    print_perf_time(np.array([interval]))

    testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)
    print("Time after second call.")
    target_fn = partial(aug_fn, inputs=testtrack)
    pmeasure(target_fn)

print 処理の結果を示します.
1回目の処理では14.3ミリ秒程度で,2回目以降の処理では1.8ミリ秒程度でした.

Time of first call.
Overall summary: Max 14.3173ms, Min 14.3173ms, Mean +/- Std 14.3173ms +/-   0s
Time after second call.
Overall summary: Max 3.85911ms, Min 1.75767ms, Mean +/- Std 2.12727ms +/- 452.078µs
Top 10 summary: Max 1.85505ms, Min 1.75767ms, Mean +/- Std 1.81007ms +/- 36.4946µs

入力データ形状が変わった場合の挙動

前項と同じく入力データの形状が変わった場合の挙動を示します.
次のコードは上の変換実行処理とほぼ同じですが,入力データの時間長が1フレーム分短くなっています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)

with torch.jit.optimized_execution(JIT_OPT):
    # The 1st call may be slow because of the computation graph construction.
    print(f"Time of first call.")
    start = time.perf_counter()
    temp = aug_fn(testtrack[:-1])
    interval = time.perf_counter() - start
    print_perf_time(np.array([interval]))

    testtrack = torch.tensor(trackdata.copy().astype(np.float32)).to(DEVICE)
    print("Time after second call.")
    target_fn = partial(aug_fn, inputs=testtrack[:-1])
    pmeasure(target_fn)

print 処理の結果を示します.
1回目の処理は3.1ミリ秒程度で,2回目以降の処理では1.8ミリ秒程度でした.

Time of first call.
Overall summary: Max 3.13964ms, Min 3.13964ms, Mean +/- Std 3.13964ms +/-   0s
Time after second call.
Overall summary: Max 13.1938ms, Min 1.76681ms, Mean +/- Std 2.10644ms +/- 1.16488ms
Top 10 summary: Max 1.79179ms, Min 1.76681ms, Mean +/- Std 1.77768ms +/- 8.68246µs

今回はアフィン変換処理をPyTorchで実装する方法を紹介しましたが,如何でしょうか?

PyTorchは Define-by-run で組んでいる場合は非常に使いやすいのですが,Define-and-run で組もうとすると結構つまづく箇所が多いです.
今後少しづつ整備が進んでいくと思いますが,しばらくはGitHubのissueなどを追いながら組む状況が続きそうですね.

今回紹介した話が,動作認識のデータ拡張などでお悩みの方に何か参考になれば幸いです.