jtwp470’s blog

日記とかプヨグヤミングとか

Pythonで並列実行をする

最近沢山の画像から特定の部分のみを抜き出すというような処理をしているのだが特に各画像との間に関連はないので1プロセスずつ動かすより数プロセス同時に動かして並列実行をすることを調べた.

今回は例として適当に100000枚の画像がありそれらは000000.jpg - 100000.jpgという感じでファイルが存在しているとした.またこれらの画像を適当な関数の処理にかけて別のディレクトリに保存する.例えばPythonでそれらの100000枚の画像に以下の様な関数処理を施すとしよう.

# source, destともにファイルのパス
# sourceは元ファイルのパス, destは処理したファイルを保存するパス
def example(source, dest):
    print(source + "を処理して" + save + "に保存しました")

こんなときLinuxやMacであれば次のような感じで実行するといい感じに並行実行してくれる.

$ find $SOURCE_PATH -type f | gawk -F/ '{print $NF}' | xargs -I{} -P $n python3 logo_cutter.py $SOURCE_PATH/{} $SAVE_PATH/{}

xargsのPオプションはなんとすごいことにこのプロセスを並行実行してくれる.

なのでこれで困っていなかったのだが実家のマシンはWindowsでLinuxは入っていない.また何度もPythonを呼び出すのはコストが掛かりそうな気がする. そこでPythonで並列実行できないか調べてみた.

するとどうやら標準ライブラリにmultiprocessingなる強そうなライブラリを発見した. きちんとWindowsでもサポートしてくれるらしい.

調べて次のようなコードを書いた.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import multiprocessing as mp
import os
import sys
import time


def hoge(source, dest):
    time.sleep(0.01)
    print("PID:" + str(os.getpid()) + ": " + source + "を" + dest + "に保存しました")


def wrapper_hoge(tuple_data):
    return tuple_data[0](tuple_data[1], tuple_data[2])


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("python3 example.py <SOURCE_PATH> <DEST_PATH>")
        sys.exit(1)
    source_path = sys.argv[1]
    dest_path = sys.argv[2]
    # 入力値がパスであるかの確認
    assert os.path.isdir(dest_path) and os.path.isdir(save_path), True
    source = [os.path.abspath(source_path + x) for x in os.listdir(source_path)]
    dests = [os.path.abspath(dest_path + x) for x in os.listdir(dest_path)]

    p = mp.Pool(4)  # プロセス数
    data = [(hoge, x, y) for x, y in zip(dests, saves)]
    p.map(wrapper_hoge, data)

簡単に解説すると

source = [os.path.abspath(source_path + x) for x in os.listdir(source_path)]

の部分は

find $SOURCE_PATH -type f

と同じ処理をしている. あとPythonは変数に関数オブジェクトを格納することができるので

data = [(hoge, x, y) for x, y in zip(dests, saves)]

をすることができる.

今回の場合デッドロックとかが起こることもないので非常に簡単な実装で実現することができた.これでWindowsでもxargs等を使わずとも利用できる. あと最近勉強しだしたGoでは関数に並列実行しろ的な命令をつけるだけでやってくれるらしいので暇があればそれも調べたい.

参考文献