【コード解説】MediaPipeの部位別追跡機能

著者: Natsuki Takayama
作成日: 2023年09月20日(水) 00:00
最終更新日: 2023年11月02日(木) 11:12
カテゴリ: コンピュータビジョン

こんにちは.高山です.
以前の記事MediaPipeの部位別追跡機能を,指文字動画に対して適用した例を紹介しました. 今回はその際に使用したプログラムについて解説したいと思います.
基本的な構成は全身追跡機能 (Holistic) と同様ですので,こちらも合わせてご覧いただければ幸いです.
今回解説するスクリプトはGitHub上に公開しています


1. モジュールのインストールとロード

第1節ではモジュールのインストールとロードを行っています.

1.1 MediaPipeのインストール

初期状態のColab環境にはMediaPipeがインストールされていません.
そこでまず最初に,Colab環境にMediaPipeをインストールします.

Colab環境では,先頭に"!"付けるとその行はShellコマンドとみなされます.
下のコードでは,Pythonのパッケージ管理ツール pip を呼び出してMediaPipeのバージョン"0.10.0"をインストールしています.

!pip3 install mediapipe==0.10.0

1.2 利用モジュールのインポート

次のコードでは使用するモジュールをインポートしています.
この操作によって,各モジュールに実装されている機能を利用できるようになります.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Standard modules.
import copy
import os

# CV/ML.
import cv2

import numpy as np

from mediapipe.python.solutions import holistic

# For drawing.
from mediapipe.python.solutions.face_mesh_connections import (
    FACEMESH_CONTOURS)
from mediapipe.python.solutions.hands_connections import (
    HAND_CONNECTIONS)
from mediapipe.python.solutions.pose_connections import (
    POSE_CONNECTIONS)

from IPython.display import HTML
from base64 import b64encode
import io
【コード解説】
- 標準モジュール
  - copy: 変数,オブジェクトのコピー機能
  - os: ファイルの作成,削除などOSが提供する機能
- 画像処理・機械学習向けモジュール
  - cv: 画像処理ライブラリOpenCVのPython版
  - numpy: 行列演算ライブラリ
  - mediapipe: MediaPipeモジュールのルート
    Imageなどの基本データクラスはこちらからアクセスします.
  - BaseOptions: MediaPipeの各種クラスに与える共通設定クラス
  - FaseLandmarker: MediaPipeの顔追跡クラス
  - FaseLandmarkerOptions: 顔追跡クラスの設定クラス
  - HandLandmarker: MediaPipeの手追跡クラス
  - HandLandmarkerOptions: 手追跡クラスの設定クラス
  - PoseLandmarker: MediaPipeの身体追跡クラス
  - PoseLandmarkerOptions: 身体追跡クラスの設定クラス
- 描画処理向けモジュール
  - FACEMESH_CONTOURS: MediaPipe向け顔追跡点の接続関係を定義したデータ
  - HAND_CONNECTIONS: MediaPipe向け手追跡点の接続関係を定義したデータ
  - POSE_CONNECTIONS: MediaPipe向け身体追跡点の接続関係を定義したデータ
  - HTML: 生データをロードしてHTML形式に変換する機能.今回は動画を描画するために使用
  - b64encode: データをbase64形式に変換する機能.
    base64はデータをアルファベット,数字,記号の64文字で表すデータ形式
  - io: データやファイルに対する入出力機能.標準モジュールだが今回は動画を読み込むために使用

2. データのロードと確認

第2節では,テスト用の動画ファイルと追跡モデルの重みをダウンロードしています.

2.1 テスト用ファイルのダウンロード

今回使用した動画ファイルは,GitHub上に公開しています.

!wget https://github.com/takayama-rado/trado_samples/raw/main/test_data/finger_near0.mp4

現状では全身追跡機能モデルの重みはライブラリに同梱されている一方,部位別機能モデルの重みは外部から与える必要があります.
下記のコマンドは各部位モデルの重みをダウンロードしています.
なお,今後は全身追跡機能モデルの重みも外部から与えるように変更される可能性が高いです.

# 顔追跡モデル
!wget https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/latest/face_landmarker.task
# 手追跡モデル
!wget https://storage.googleapis.com/mediapipe-models/hand_landmarker/hand_landmarker/float16/latest/hand_landmarker.task
# 身体追跡モデル
!wget https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_full/float16/latest/pose_landmarker_full.task

ls コマンドでデータがダウンロードされているか確認します.

!ls
face_landmarker.task  hand_landmarker.task  sample_data
finger_near0.mp4      pose_landmarker_full.task

2.2 動画を描画して確認

次のスクリプトはColab上で動画を描画する機能を実装しています.

1
2
3
4
5
6
7
8
def show_video(video_path, video_width=500, video_height=500):
    video = io.open(video_path, 'r+b').read()
    encoded = b64encode(video)
    decoded = encoded.decode("ascii")
    data = f"<video width={video_width} height={video_height} controls>" \
        + f'<source src="data:video/mp4;base64,{decoded}"' \
        + 'type="video/mp4" /></video>'
    return(HTML(data=data))
【コード解説】
- 引数
  - video_path: 入力動画のパス
  - video_width: 動画表示領域の幅
  - video_height: 動画表示領域の高さ
- 2行目: 動画ファイルをオープン
- 3-4行目: 生データを一旦base64形式に変換し,さらにASCII文字列に変換
- 5-7行目: HTMLのvideoタグを表す文字列を定義
- 8行目: dataをHTML形式に変換して返す

次のコードでダウンロードした動画をColab上に表示しています.
show_video() の返り値がHTMLオブジェクトのため(videoタグ),埋め込み動画が表示されます.

show_video("./finger_near0.mp4")

3. 追跡処理

第3節では,追跡処理を行っています.
処理構成は全身追跡機能の場合と同様ですが,今回は処理をなるべく関数化して実装しています.

追跡処理では,動画を入力して追跡点配列を出力します.
処理の構成図を図1に示します.

図1: 追跡処理フロー
追跡処理フロー

最初に各部位の追跡クラスを生成し (インスタンス化),動画ファイルを開きます.

次に,開いた動画ファイルから1フレーム分の画像データを読み込みます.

読み込みが成功した場合は,画像データに対して追跡処理を行い,座標値のスケーリング後にデータを成形します.
ここでは,追跡点が \([P, T, J, C]\) 形状になるように成形しています.
\(P, T, J, C\) は,それぞれ部位のインスタンス番号,フレーム番号,追跡点番号,特徴量インデクスを示します.

上の処理を全フレームに対して行い,読み込むフレームがなくなったら追跡点データを保存して処理を終了します.

では,コードの解説をしていきます.
追跡のメイン処理に先立って,メイン処理から呼び出される関数群を実装します.

3.1 顔追跡点の抽出

次のコードは,MediaPipeで抽出した顔追跡点群をNumpy配列に変換しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def ext_face_tracking_points(result, num_detect_faces):
    # `[P, T, J, C]`
    body_kpts = np.zeros([num_detect_faces, 1, 33, 4])
    lhand_kpts = np.zeros([num_detect_faces, 1, 21, 4])
    rhand_kpts = np.zeros([num_detect_faces, 1, 21, 4])
    face_kpts = np.zeros([num_detect_faces, 1, 478, 4])  # Includes iri's keypoints.

    for i, landmarks in enumerate(result.face_landmarks):
        temp = np.array([
            [lmark.x, lmark.y, lmark.z, _binary_conf(lmark)] for lmark
            in landmarks])
        face_kpts[i, 0, :temp.shape[0]] = temp
    return body_kpts, lhand_kpts, rhand_kpts, face_kpts
【コード解説】
- 引数
  - result: 追跡結果データ
- 3-6行目: 部位毎に追跡点を格納する配列を $[P, T, J, C]$ 形状で初期化.
  後の処理における取り扱いを考えて,追跡に失敗した場合はゼロ埋めされるように初期化しています.
- 8-12行目: 顔追跡点を格納
  顔追跡点は追跡結果データに `face_landmarks` として格納されています.

追跡結果の仕様については,公式ガイドAPI仕様書をご参照ください.

手および顔の追跡点には身体追跡点の visibility に相当する変数がありません.
そこで互換性をもたせるために下記に示すような,座標値をもとに2値形式の visibility を返す関数を実装しています.

1
2
def _binary_conf(lmark):
    return float((lmark.x + lmark.y + lmark.z) > 0)

3.2 手追跡点の抽出

次のコードは,MediaPipeで抽出した手追跡点群をNumpy配列に変換しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def ext_hand_tracking_points(result, num_detect_hands):
    # `[P, T, J, C]`
    body_kpts = np.zeros([num_detect_hands, 1, 33, 4])
    lhand_kpts = np.zeros([num_detect_hands, 1, 21, 4])
    rhand_kpts = np.zeros([num_detect_hands, 1, 21, 4])
    face_kpts = np.zeros([num_detect_hands, 1, 478, 4])  # Includes iri's keypoints.

    if len(result.handedness) > 0:
        for i, (handedness, landmarks) in enumerate(zip(result.handedness, result.hand_landmarks)):
            temp = np.array([
                    [lmark.x, lmark.y, lmark.z, _binary_conf(lmark)] for lmark
                    in landmarks])
            if handedness[0].display_name == "Left":
                lhand_kpts[i, 0, :temp.shape[0]] = temp
            else:
                rhand_kpts[i, 0, :temp.shape[0]] = temp
    return body_kpts, lhand_kpts, rhand_kpts, face_kpts
【コード解説】
- 引数
  - result: 追跡結果データ
- 3-6行目: 部位毎に追跡点を格納する配列を $[P, T, J, C]$ 形状で初期化.
  後の処理における取り扱いを考えて,追跡に失敗した場合はゼロ埋めされるように初期化しています.
- 8-16行目: 手追跡点を格納
  `handedness` はCategoryクラスのインスタンスで,左右どちらの手かの情報が格納されます.
  追跡点座標は `hand_landmarks` に格納されています.
  左手の場合は `index=0, display_name="Left", category_name="Left"` の属性値となり,
  右手の場合は `index=1, display_name="Right", category_name="Right"` の属性値となります.
  今回は `display_name` の値を基に配列の格納先を切り替えています.

追跡結果の仕様については,公式ガイドAPI仕様書をご参照ください.
また,Categoryクラスについては公式のAPI仕様をご参照ください.

3.3 身体追跡点の抽出

次のコードは,MediaPipeで抽出した身体追跡点群をNumpy配列に変換しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def ext_pose_tracking_points(result, num_detect_poses):
    # `[P, T, J, C]`
    body_kpts = np.zeros([num_detect_poses, 1, 33, 4])
    lhand_kpts = np.zeros([num_detect_poses, 1, 21, 4])
    rhand_kpts = np.zeros([num_detect_poses, 1, 21, 4])
    face_kpts = np.zeros([num_detect_poses, 1, 478, 4])  # Includes iri's keypoints.

    for i, landmarks in enumerate(result.pose_landmarks):
        temp = np.array([
            [lmark.x, lmark.y, lmark.z, lmark.visibility] for lmark
            in landmarks])
        body_kpts[i, 0, :temp.shape[0]] = temp
    return body_kpts, lhand_kpts, rhand_kpts, face_kpts
【コード解説】
- 引数
  - result: 追跡結果データ
- 3-6行目: 部位毎に追跡点を格納する配列を $[P, T, J, C]$ 形状で初期化.
  後の処理における取り扱いを考えて,追跡に失敗した場合はゼロ埋めされるように初期化しています.
- 8-12行目: 身体追跡点を格納
  身体追跡点は追跡結果データに `pose_landmarks` として格納されています.

追跡結果の詳細については,公式ガイドAPI仕様書をご参照ください.

3.4 追跡点のスケーリング (投影)

次のコードは,追跡点の座標値を画像サイズにスケーリングしています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def project(kpts,
            width,
            height,
            inplace=True,
            limit_to_window=False):
    if inplace is True:
        temp = kpts
    else:
        temp = copy.deepcopy(kpts)
    depth = max(width, height)
    xs = temp[:, :, :, 0]
    ys = temp[:, :, :, 1]
    zs = temp[:, :, :, 2]
    xs = np.floor(xs * width)
    ys = np.floor(ys * height)
    zs = np.floor(zs * depth)
    if limit_to_window:
      xs[xs > (width - 1)] = width - 1
      ys[ys > (height - 1)] = height - 1
      zs[zs > (depth - 1)] = depth - 1
      xs[xs < 0] = 0
      ys[ys < 0] = 0
      zx[zx < 0] = 0
    temp[:, :, :, 0] = xs
    temp[:, :, :, 1] = ys
    temp[:, :, :, 2] = zs
    return temp
【コード解説】
- 引数
  - kpts: `[P, T, J, C]` 形状の追跡点配列
  - width: スケーリングするウィンドウ幅
  - height: スケーリングするウィンドウ高さ
  - inplace: `True` の場合,入力 `kpts` を上書きする
  - limit_to_window: `True` の場合,画像外にはみ出した値を画像境界線上に留める
- 6-9行目: `inplace=True` の場合は,入力 `kpts` を上書きするように `temp` を初期化
- 10行目: `width` と `height` の大きい方の値で,`z` 軸のウィンドウサイズを定義
- 11-16行目: ウィンドウサイズで座標値をスケーリング.
  MediaPipeの座標値は画像サイズを $[0, 1]$ の範囲で正規化した値です.
  そのため,ウィンドウサイズをかけるだけでスケーリングができます.
- 17-23行目: `limit_to_window=True` の場合は,画像外にはみ出した値を画像境界線上に留める
- 24-26行目: 返り値の作成

3.5 追跡メイン処理 (共通部分)

次のコードは,追跡処理の共通部分を実装しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def tracking(tracker, videofile, static_image_mode, num_detects):
    # Load video and start tracking.
    capture = cv2.VideoCapture(videofile)
    capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
    fps = capture.get(cv2.CAP_PROP_FPS)
    frame_count = 0
    frames = None
    while True:
        status, image = capture.read()
        if status is False:
            break
        try:
            # Track.
            mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image)
            if static_image_mode:
                result = tracker.detect(mp_image)
            else:
                timestamp_ms = int(frame_count / fps * 1000)
                result = tracker.detect_for_video(mp_image, timestamp_ms)
                frame_count += 1
            if isinstance(tracker, FaceLandmarker):
                body, lhand, rhand, face = ext_face_tracking_points(result, num_detects)
            elif isinstance(tracker, HandLandmarker):
                body, lhand, rhand, face = ext_hand_tracking_points(result, num_detects)
            elif isinstance(tracker, PoseLandmarker):
                body, lhand, rhand, face = ext_pose_tracking_points(result, num_detects)
            else:
                raise ValueError(f"Unknown tracker type: {type(tracker)}.")
            # Project to image plane.
            height, width = image.shape[:2]
            body = project(body, width, height)
            lhand = project(lhand, width, height)
            rhand = project(rhand, width, height)
            face = project(face, width, height)

            keypoints = np.concatenate([body, lhand, rhand, face], axis=2)
            if frames is None:
                frames = keypoints
            else:
                frames = np.concatenate([frames, keypoints], axis=1)
        except Exception as inst:
            raise inst
    return frames
【コード解説】
- 引数
  - tracker: 追跡クラスのインスタンス
  - videofile: 入力動画のパス
  - static_image_mode: `True` の場合,静止画モードとして処理を行う.
    この値は後述するOptionsクラスに与える値とそろえる必要があります.
  - num_detects: 検出する人体または部位数.
    この値は後述するOptionsクラスに与える値とそろえる必要があります.
- 3-4行目: 動画ファイルをオープンし,動画の取り込み位置を初期フレームに設定
- 5-7行目: trackerが要求するタイムスタンプ計測用変数と,追跡点配列格納用変数の初期化
- 8-42行目: メインループ
  - 9-11行目: 画像フレーム読み込み.読み込みができない場合はループを抜けます.
  - 14行目: 画像データをNumpy配列からMediaPipe.Imageクラスに変換
  - 15-28行目: 画像から追跡点を生成.
    14-20行目に示しているように,静止画モードと動画モードで追跡処理の呼び出し方が異なる点に注意してください.
    また,21-28行目に示しているように,trackerの型を調べて型に応じて追跡点の抽出関数を切り替えています.
  - 30-34行目: 追跡点のスケーリング
  - 36-40行目: 過去の追跡点配列と現フレームの追跡点を結合

3.6 追跡の実行

ここまでで,追跡に必要な処理は実装できました.
以降では各部位ごとに追跡処理を実行していきます.

顔追跡

まず最初に顔追跡を実行します.
実行処理は下記のようになります.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
videofile = "./finger_near0.mp4"
output = "out_track/finger_near0_face.npy"
static_image_mode = False
num_detect_faces = 1

options = FaceLandmarkerOptions(
    base_options=BaseOptions(model_asset_path="./face_landmarker.task"),
    num_faces=num_detect_faces,
    running_mode=RunningMode.IMAGE if static_image_mode else RunningMode.VIDEO)

tracker = FaceLandmarker.create_from_options(options)

frames = tracking(tracker, videofile, static_image_mode, num_detect_faces)

print(frames.shape)
parent = os.path.dirname(output)
os.makedirs(parent, exist_ok=True)
np.save(output, frames)
【コード解説】
- 1-4行目: 入出力パスとOptionsクラスに与える引数の設定
- 6-9行目: FaceLandmarkerOptionsクラスの初期化
  Holisticクラスと異なり,部位毎の追跡クラスはOptionsクラスを各追跡クラスに与えることでインスタンス化を行います.
  Optionsクラスの初期化変数には,共通オプションであるBaseOptionsと各クラスに固有の変数があります.
  BaseOptionsにはモデルファイルのパスや,実行するデバイス (CPUやGPU) を指定します.
  顔追跡クラスの固有設定として,ここでは `num_faces` (顔の検出数) と `running_mode` (静止画モードまたは動画モード) を設定しています.
- 11-13行目: 追跡クラスをインスタンス化し,追跡処理を実行
- 15-18行目: 追跡点を保存

BaseOptionsクラスの詳細については,API仕様をご参照してください.
顔追跡クラスの固有設定については,こちらに記載があります.

手追跡

次に,下記のコードで手追跡を実行します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
videofile = "./finger_near0.mp4"
output = "out_track/finger_near0_hand.npy"
static_image_mode = False
num_detect_hands = 1

options = HandLandmarkerOptions(
    base_options=BaseOptions(model_asset_path="./hand_landmarker.task"),
    num_hands=num_detect_hands,
    running_mode=RunningMode.IMAGE if static_image_mode else RunningMode.VIDEO)

tracker = HandLandmarker.create_from_options(options)

frames = tracking(tracker, videofile, static_image_mode, num_detect_hands)

print(frames.shape)
parent = os.path.dirname(output)
os.makedirs(parent, exist_ok=True)
np.save(output, frames)
【コード解説】
- 1-4行目: 入出力パスとOptionsクラスに与える引数の設定
- 6-9行目: HandLandmarkerOptionsクラスの初期化
  手追跡クラスの固有設定としてここでは,`num_hands` (手の検出数) と `running_mode` (静止画モードまたは動画モード) を設定しています.
- 11-13行目: 追跡クラスをインスタンス化し,追跡処理を実行
- 15-18行目: 追跡点を保存

手追跡クラスの固有設定についてはこちらに記載があります.

身体追跡

最後に,下記のコードで身体追跡を実行します.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
videofile = "./finger_near0.mp4"
output = "out_track/finger_near0_pose.npy"
static_image_mode = False
num_detect_poses = 1

options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path="./pose_landmarker_full.task"),
    num_poses=num_detect_poses,
    running_mode=RunningMode.IMAGE if static_image_mode else RunningMode.VIDEO)

tracker = PoseLandmarker.create_from_options(options)

frames = tracking(tracker, videofile, static_image_mode, num_detect_poses)

print(frames.shape)
parent = os.path.dirname(output)
os.makedirs(parent, exist_ok=True)
np.save(output, frames)
【コード解説】
- 1-4行目: 入出力パスとOptionsクラスに与える引数の設定
- 6-9行目: PoseLandmarkerOptionsクラスの初期化
  身体追跡クラスの固有設定としてここでは,`num_poses` (人体の検出数) と `running_mode` (静止画モードまたは動画モード) を設定しています.
- 11-13行目: 追跡クラスをインスタンス化し,追跡処理を実行
- 15-18行目: 追跡点を保存

身体追跡クラスの固有設定についてはこちらに記載があります.


4. 結果の描画

第4節では,追跡点の描画処理を行っています.
処理構成は全身追跡機能の場合と同様ですが,今回はメイン処理を関数化して実装しています. この処理では,動画と追跡点配列を入力します.
入力動画は背景を描画するために使い,フレーム毎に追跡点位置を示す円と,追跡点間の接続関係を示す直線を上書き描画します.
最終的に,描画結果は動画ファイルとして出力されます. 処理の構成図を図2に示します.

図2: 描画処理フロー
描画処理フロー

最初に,前処理としてファイルのロード,動画のオープン,およびウィンドウサイズや描画位置の調整を行います.

次に,開いた動画と追跡点から1フレーム分のデータを取り出します.
画像データを背景としてウィンドウに描画し,その後に追跡点を描画します.

この処理を全フレーム分行い,読み込むデータがなくなったら動画を保存して処理を完了します.

では,コードの解説をしていきます.
描画のメイン処理に先立って,メイン処理から呼び出される関数を実装します.

4.1 追跡点の描画

次のコードは,描画ウィンドウに対して追跡点を描画しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def draw_landmarks(draw, landmarks, connections,
                   pt_color=(0, 0, 0), line_color=(0, 0, 0),
                   pt_size=3, line_size=3,
                   do_remove_error=True):
    for i, line in enumerate(connections):
        p0 = landmarks[line[0], :2]
        p1 = landmarks[line[1], :2]
        if do_remove_error:
            if np.isnan(p0).any() or np.isnan(p1).any():
                continue
            if (p0==0).any() or (p1==0).any():
                continue
        p0 = (int(p0[0]), int(p0[1]))
        p1 = (int(p1[0]), int(p1[1]))
        cv2.line(draw, p0, p1, line_color, line_size)
        cv2.circle(draw, p0, pt_size, pt_color, -1)
        cv2.circle(draw, p1, pt_size, pt_color, -1)
    return draw
【コード解説】
- 引数
  - draw: 描画領域.背景は既に描画済みの想定です.
  - landmarks: 追跡点配列
  - connections: 追跡点間の接続関係.
    接続関係は `[[0, 1], [1, 2], ...]` のように追跡点インデクスのペアの配列になっています.
  - pt_color: 追跡点の描画色 (BGR)形式
  - line_color: 追跡点間の描画色 (BGR)形式
  - pt_size: 追跡点の描画サイズ
  - line_size: 追跡点間の描画サイズ
  - do_remove_error: `True` の場合,追跡に失敗している点は無視する
- 5-7行目: 追跡点間の接続関係と,対応関係になっている2点を取り出します.
  追跡点の描画には $(x, y)$ 座標だけを用います.
- 8-12行目: 追跡に失敗した点をスキップする処理です.
  `Nan` や座標値が `0` になっている場合は描画をスキップします.
- 13-17行目: 追跡点の接続関係を直線で描画し,追跡点を円で描画しています.

4.2 描画メイン処理 (共通部分)

次のコードは,描画メイン処理の共通部分を実装しています.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def drawing(videofile, trackfile, outvideo):
    trackdata = np.load(trackfile)
    if os.path.exists(videofile):
        capture = cv2.VideoCapture(videofile)
        capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
        width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
    else:
        capture = None
        width = 500
        height = 500

    xmin_orig = trackdata[:, :, :, 0].min()
    ymin_orig = trackdata[:, :, :, 1].min()
    xmax_orig = trackdata[:, :, :, 0].max()
    ymax_orig = trackdata[:, :, :, 1].max()
    if xmax_orig < 10 and ymax_orig < 10:
        trackdata[:, :, 0] *= width
        trackdata[:, :, 1] *= height
    xmin_proj = int(xmin_orig)
    ymin_proj = int(ymin_orig)
    xmax_proj = int(xmax_orig)
    ymax_proj = int(ymax_orig)

    # Add offset.
    offset_x = 0
    offset_y = 0
    if xmin_proj < 0:
        offset_x = -xmin_proj
        trackdata[:, :, :, 0] += offset_x
    if ymin_proj < 0:
        offset_y = -ymin_proj
        trackdata[:, :, :, 1] += offset_y

    ywin = int(max(ymax_proj + offset_y, height + offset_y))
    xwin = int(max(xmax_proj + offset_x, width + offset_x))

    print("Window size:", xwin, ywin)
    print("Offsets:", offset_x, offset_y)

    writer = cv2.VideoWriter(outvideo, cv2.VideoWriter_fourcc(*"mp4v"),
        30.0, (xwin, ywin))
    if writer.isOpened() is False:
        print("VideoWriter is failed to open.")
        if writer is not None:
            writer.release()
        raise ValueError(f"Can not open {videofile}")

    # `[P, T, J, C] -> [T, P, J, C]`
    trackdata = np.transpose(trackdata, [1, 0, 2, 3])
    for frame in trackdata:
        if capture is not None:
            status, image = capture.read()
            if status is False:
                break
        else:
            image = np.full([height, width], 255, dtype=np.uint8)

        # Draw.
        draw = np.full([int(ywin), int(xwin), 3], 64, dtype=np.uint8)
        draw[offset_y: offset_y+height, offset_x: offset_x+width, :] = image

        for instance in frame:
            body = instance[:33, :]
            lhand = instance[33:33+21, :]
            rhand = instance[33+21:33+21+21, :]
            face = instance[33+21+21:, :]
            draw_landmarks(draw, body, POSE_CONNECTIONS, [0, 255, 0], [0, 255, 0],
                           do_remove_error=True)
            draw_landmarks(draw, lhand, HAND_CONNECTIONS, [255, 0, 0], [255, 0, 0],
                           do_remove_error=True)
            draw_landmarks(draw, rhand, HAND_CONNECTIONS, [0, 0, 255], [0, 0, 255],
                           do_remove_error=True)
            draw_landmarks(draw, face, FACEMESH_CONTOURS, [255, 255, 0], [255, 255, 0],
                           do_remove_error=True)
        writer.write(draw)
    writer.release()
    if capture is not None:
        capture.release()
【コード解説】
- 引数:
  - videofile: 入力動画のパス.動画は背景画像を描画するために用います.
  - trackfile: 追跡データファイルのパス
  - outvideo: 結果動画の出力パス
- 2-11行目: 追跡点ファイルと動画ファイルをロード.
  動画ファイルのオープンに成功した場合は,読み込み位置を先頭フレームに設定し,画像幅と画像高さを読み込みます.
- 13-23行目: 追跡点座標の最大値と最小値を算出します.
  この値はウィンドウサイズと描画位置の調整に用います.
  また,先の追跡処理でスケーリングをしていない場合は,19-21行目で再度スケーリングを行います.
- 25-39行目: ウィンドウサイズと描画位置の調整
  この処理によって追跡点座標が負の値や画面外の値になっている場合でも描画できるようになります.
- 41-47行目: 出力用の動画ファイルをオープン
  オープンに失敗した場合は例外を投げて中断します.
- 50-76行目: メインループ
  - 50行目: 時間次元に沿ってループ処理を行うために,追跡点配列の第1次元と第2次元を入れ替え
  - 52-61行目: 画像フレームを読み込み,背景を描画
    入力動画をオープンしていない場合は白背景を設定します.
  - 63行目: 人物毎に追跡点を描画
    全身追跡モードは複数名の追跡に未対応なためここでのループは不要なのですが,他の追跡処理と互換性をもたせるためにこのような処理になっています.
  - 64-75行目: 部位毎に追跡点を描画
    追跡点の接続関係は部位毎に定義されているため,一旦追跡点を部位毎に分割して,それぞれ異なる色で描画しています.
  - 76行目: 描画フレームを動画ファイルに書き出し
- 77-79行目: 動画の保存および終了処理

4.3 描画の実行

ここまでで,描画に必要な処理は実装できました.
以降では各部位ごとに描画処理を実行していきます.

顔追跡

まず最初に,次のコードで顔追跡結果を描画しています.
この処理は特に難しいことはなく,最初に引数を設定して描画関数に渡しているだけです.

1
2
3
4
5
videofile = "./finger_near0.mp4"
trackfile = "out_track/finger_near0_face.npy"
outvideo = "./finger_near0_face_track.mp4"

drawing(videofile, trackfile, outvideo)

描画結果を表示して確認をします.
OpenCVを使って作成した動画ファイルはColab環境上では上手く表示できなかったため,FFMPEGを使用して変換を行います.

!ffmpeg -i finger_near0_face_track.mp4 -vcodec vp9 -y finger_near0_face_track.webm

その後,次のコードで動画ファイルの描画を行います.

show_video("./finger_near0_face_track.webm")

手追跡

次に,手追跡結果を描画します.
コード構成は顔追跡の場合と同様で,引数に与えるファイルパスが変わっています.

1
2
3
4
5
videofile = "./finger_near0.mp4"
trackfile = "out_track/finger_near0_hand.npy"
outvideo = "./finger_near0_hand_track.mp4"

drawing(videofile, trackfile, outvideo)

顔追跡の場合と同様に動画を変換します.

!ffmpeg -i finger_near0_hand_track.mp4 -vcodec vp9 -y finger_near0_hand_track.webm

次のコードで動画ファイルの描画を行います.

show_video("./finger_near0_hand_track.webm")

身体追跡

最後に,身体追跡結果を描画します.
コード構成は顔追跡の場合と同様で,引数に与えるファイルパスが変わっています.

1
2
3
4
5
videofile = "./finger_near0.mp4"
trackfile = "out_track/finger_near0_pose.npy"
outvideo = "./finger_near0_pose_track.mp4"

drawing(videofile, trackfile, outvideo)

顔追跡の場合と同様に動画を変換します.

!ffmpeg -i finger_near0_pose_track.mp4 -vcodec vp9 -y finger_near0_pose_track.webm

次のコードで動画ファイルの描画を行います.

show_video("./finger_near0_pose_track.webm")

今回はMediaPipeの部位別追跡機能を使ったプログラムの解説を行いましたが,如何でしょうか?
各追跡機能のインタフェースの違いを見ていただきたく,3種類の追跡機能を一変に解説したため長くなってしまいました(^^;).

MediaPipeは活発に開発されており,これまでもインタフェースの変更はしばしば行われています.
もう少し時間はかかると思いますが,インタフェースの使用感は今後統一されていくと期待しています.
今回紹介した話が,これから追跡機能を使おうとお考えの方に何か参考になれば幸いです.