プログラマーの徒然ブログ

プログラミングに関することをはじめ、興味がでたものを雑多に!

【concurrent】サクッと並列処理でPython高速化

f:id:t49m1:20200319200644j:plain

Pythonは使いやすいが、処理が遅いということが往々にしてあります。

前回は、numbaを使った高速化方法を紹介しました。

t49m1.hatenablog.com

しかし、numbaも万能ではありません。 適用が難しい場面もあります。

そんな時のために、並列処理という別のアプローチを紹介します。

Python3.2で追加されたconcurrent.futuresモジュールを使って サクッと並列処理で高速化する方法です。

どんなときに使える?

ある処理を複数の入力データに対して適用する場合などに有効です。

  • 具体例:100枚の画像に対してぼかし処理を実行する

上記例のように、複数の入力データ(画像)の処理に依存関係がないと 並列処理向きで高速化しやすいです。

逆に、1つ目の入力の処理結果をもとに2つ目の入力を処理するなど、 入力データの処理に依存関係があると並列化は難しいので、 別の高速化を行うべきです。

結論:何をすればいいの?

  1. Pythonのバージョン確認

    • 以下のコマンドを実行することで確認することができます。

      bash python --version

    • Python 3.2.x以上が表示されたら問題ありません。

    • Python 2.x.xと表示された場合、python3 --versionを実行してください。

      • Python 3.2.x以上が表示されたら、python3でプログラムを実行することで利用できます。(以降、pythonpython3と置き換えて読み進めてください。)
      • 上記以外、Python 3.2.x以上のインストールが必要になります。(他の高速化を検討するほうがお手軽かも、、)
  2. concurrent.futuresモジュールの確認

    • 以下のコマンドで今利用可能なモジュールは下記コマンドで確認できます。

      • linuxコマンドが利用できる場合、下記コマンドの後に| grep conccurentとつけると探しやすいです。

      python -c "help('modules')"

    • モジュールがなかった場合、下記コマンドでインストールしてください。

      • python3の方は、pip3になります。

      pip install futures

  3. Pythonコードの修正

    • 追加コード

      • モジュールのロード:import concurrent.futures

      • executorの作成:executor = concurrent.futures.ProcessPoolExecutor(max_workers='同時実行可能タスク数')

      • (高速化したい)関数の実行:futures = [executor.submit(reduction, task_list[i]) for i in range(num_tasks)]

        • 上記は、下の例の抜粋
      • 処理の完了の待機:(done, notdone) = confu.wait(futures)

      • 結果の取得:for i, future in enumerate(futures): result[i] = future.result()

      • Executorのシャットダウン:shutdown(wait=True)
    • コード例:修正前

import numpy as np
     
def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum
     
# 計算配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))
     
# 計算部分
for i in range(num_tasks):
    result[i] = reduction(task_list[i])
print(result[0])
  • 修正後
import numpy as np
import concurrent.futures as confu
     
def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum
     
# 計算配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))
  
num_workers = 4 # 同時実行可能タスク数を設定(おすすめ:コア数を設定)
     
# 計算部分
with confu.ProcessPoolExecutor(max_workers=num_workers) as executor:
    # 並列処理実行(executor,.submitの引数で実行する関数を指定)
    futures = [executor.submit(reduction, task_list[i]) for i in range(num_tasks)]
    (done, notdone) = confu.wait(futures) # 処理がすべて終わるのを待つ
    for i, future in enumerate(futures):
        results[i] = future.result()       # 結果の取得
                             
print(results[0])

以上、以前紹介したnumbaよりコード変更は多いですね。

実行方法としては、mapを使用する方法もあります。 コード例は、記事の後半の処理時間の比較にあります。

concurrent.futuresモジュールの紹介

近年のCPUはマルチコアプロセッサとも呼ばれ、内部に複数の処理装置(コア)が搭載されています。

しかし、Pythonは基本的に1コアで処理を行っていく逐次処理となっています。

もし、2コア、4コアとCPU内の複数のコア使って処理することができたら処理速度は2倍、4倍にできるかもしれません。 この考えが並列処理です。(とここではしておきます。)

concurrent.futuresモジュールは、マルチスレッド、マルチプロセス両方のインターフェイスを提供してくれます。

このインターフェイスを使うことで、これまでのコードをほとんど変更することなくマルチスレッド/プロセスで複数コアを使った実行を実現できます。

concurrent.futuresの主役:Executor

concurrent.futuresモジュールには抽象クラスのExecutorクラスがあります。

実装クラスとして

  • マルチプロセス用のProcessPoolExecutor:プロセスによる並列化
  • マルチスレッド用のThreadPoolExecutor:スレッドによる並列化

の2つのクラスがあります。

並列化を行うときには、上記2つのクラスから1つを選んでexecutorを作成します。

個人的には、処理が遅いと感じるときはCPUやメモリに負荷のかかる処理であることが多いと思います。

なので、迷った方は、マルチプロセス用のProcessPoolExecutorを使ってみてはいかかでしょうか?

同時実行可能タスク数の決定:max_workers

executorを作成するときには、同時に実行可能なタスク数を引数max_workersで指定する必要があります。

この値は、CPUのコア数から決定することをお勧めします。 同時実行できるタスクを多くしても、本当に同時に処理できるのはCPUのコア数分だけだからです。

コア数を知りたい場合は、psutilモジュールがおすすめです。(Python3.4以上である必要があったかも。。)

以下の手順で、CPU物理コア数を知ることができます。

import psutil
psutil.cpu_count(logical=False)

osモジュール等のcpu_count()でもコア数を知ることができますが、Hyper-threadingがオンの場合は物理コア数の2倍の数が出力されるため注意が必要です。

max_workersの値は

  • 他の作業をしないのならば物理コア数分
  • 他の作業と並行して行うなら物理コア数の半分程度

がいいのではないでしょうか?

max_workersの値が決まったら、下記コードでexecutorを作成します。

  • マルチプロセス
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
  • マルチスレッド
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

executorでできること(実行方法)

作成したexecutorには下記の3つのメソッドがあります。

  1. submit(fn, *args, **kwargs)
    • 関数fnexecutorのリソースプールで実行するようにスケジュール
    • オブジェクトFutureを返す(←このオブジェクトに実行結果も含まれる)
  2. map(fn, *iterables, timeout=None, chunksize=1)
    • 関数fnをiterableオブジェクトの各要素に対して実行
  3. shutdown(wait=True)
    • executorをシャットダウンして、リソースを開放(終了処理)

submitmapは、好きなほうで実装すればいいと思います。

この後、コード例を載せています。

個人的には、mapのほうがコード追加が少なくてお手軽かなと思います。

コード例と処理時間比較

今回使用するCPUは、物理コアは2コア、論理コアが4コアのノートPCで行います。 そのため、並列処理による高速化率としては2倍程度になれば御の字かと思います。

それでは、以下のコードで並列処理の威力を見ていきたいと思います。

import time
import numpy as np

def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum

# 計算する配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))

t1 = time.time()
for i in range(num_tasks):
    results[i] = reduction(task_list[i])
t2 = time.time() - t1

print('Result : ', results[0])
print('time (single) : ', t2, ' sec')
  • 逐次処理の結果

結果は以下のように、私のノートPCでは約38秒かかりました。

Result :  100000.0
time (single) :  51.93065285682678  sec

submitを用いた例

それでは、concurrent.futuresを使って処理reductionを並列に行っていきます。 submitを利用すると以下のようなコードになります。

import time
import numpy as np
import concurrent.futures as confu

def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum

# 計算する配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))

num_workers = 4 # 最大のworker数

t1 = time.time()
with confu.ProcessPoolExecutor(max_workers=num_workers) as executor:
    futures = [executor.submit(reduction, task_list[i]) for i in range(num_tasks)]
    (done, notdone) = confu.wait(futures) # 処理の終了待ち
    for i, future in enumerate(futures):
        results[i] = future.result()
t2 = time.time() - t1

print('Result : ', results[0])
print('time (multi-process) : ', t2, ' sec')
  • submitでの並列処理結果

結果は以下の通りで、論理コア数4をmax_workersに設定すると1.8倍強の高速化を実現しました。

Result :  100000.0
time (multi-process) :  28.629583835601807  sec

mapを用いた例

次に、同じコードをmapを用いて並列化します。 すると、以下のようなコードになります。

import time
import numpy as np
import concurrent.futures as confu

def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum

# 計算する配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))

num_workers = 4 # 最大のworker数

t1 = time.time()
with confu.ProcessPoolExecutor(max_workers=num_workers) as executor:
    for i, result in enumerate(executor.map(reduction, task_list)):
        results[i] = result
t2 = time.time() - t1

print('Result : ', results[0])
print('time (multi-process-map) : ', t2, ' sec')
  • mapでの並列処理結果

処理時間は以下のとおりで、高速化率はsubmitと同等の1.8倍です。 なので、コード修正時に書きやすいメソッドを利用すればいいと思います。

Result :  100000.0
time (multi-process-map) :  27.78085160255432  sec

まとめ

今回は、Pythonの処理を並列化することで高速化できるconcurrent.futuresモジュールを紹介しました。

今回の記事では、マルチプロセス側の例を示しています。

理由は、Pythonが実行の一貫性保証としてGlobal Interpreter Lock (GIL)を採用しているためです。 (厳密には、C言語で実装されているCPythonの話ですが、使われているPythonの多くがCPytonだと思います。)

マルチスレッドとプロセスの違いも機会があれば、書こうと思います。

ではでは。。