【concurrent】サクッと並列処理でPython高速化
Pythonは使いやすいが、処理が遅いということが往々にしてあります。
前回は、numbaを使った高速化方法を紹介しました。
しかし、numbaも万能ではありません。 適用が難しい場面もあります。
そんな時のために、並列処理という別のアプローチを紹介します。
Python3.2で追加されたconcurrent.futuresモジュールを使って サクッと並列処理で高速化する方法です。
どんなときに使える?
ある処理を複数の入力データに対して適用する場合などに有効です。
- 具体例:100枚の画像に対してぼかし処理を実行する
上記例のように、複数の入力データ(画像)の処理に依存関係がないと 並列処理向きで高速化しやすいです。
逆に、1つ目の入力の処理結果をもとに2つ目の入力を処理するなど、 入力データの処理に依存関係があると並列化は難しいので、 別の高速化を行うべきです。
結論:何をすればいいの?
Pythonのバージョン確認
以下のコマンドを実行することで確認することができます。
bash python --version
Python 3.2.x
以上が表示されたら問題ありません。Python 2.x.x
と表示された場合、python3 --version
を実行してください。Python 3.2.x
以上が表示されたら、python3
でプログラムを実行することで利用できます。(以降、python
はpython3
と置き換えて読み進めてください。)- 上記以外、
Python 3.2.x
以上のインストールが必要になります。(他の高速化を検討するほうがお手軽かも、、)
concurrent.futures
モジュールの確認以下のコマンドで今利用可能なモジュールは下記コマンドで確認できます。
- linuxコマンドが利用できる場合、下記コマンドの後に
| grep conccurent
とつけると探しやすいです。
python -c "help('modules')"
- linuxコマンドが利用できる場合、下記コマンドの後に
モジュールがなかった場合、下記コマンドでインストールしてください。
python3
の方は、pip3
になります。
pip install futures
Pythonコードの修正
追加コード
モジュールのロード:
import concurrent.futures
executorの作成:
executor = concurrent.futures.ProcessPoolExecutor(max_workers='同時実行可能タスク数')
(高速化したい)関数の実行:
futures = [executor.submit(reduction, task_list[i]) for i in range(num_tasks)]
- 上記は、下の例の抜粋
処理の完了の待機:
(done, notdone) = confu.wait(futures)
結果の取得:
for i, future in enumerate(futures): result[i] = future.result()
- Executorのシャットダウン:
shutdown(wait=True)
コード例:修正前
import numpy as np def reduction(arr): sum = 0.0 for val in arr: sum += val return sum # 計算配列の準備 num_tasks = 1000 N=100000 task_list = [ np.ones((N)) for i in range(num_tasks)] results = np.zeros(( num_tasks )) # 計算部分 for i in range(num_tasks): result[i] = reduction(task_list[i]) print(result[0])
- 修正後
import numpy as np import concurrent.futures as confu def reduction(arr): sum = 0.0 for val in arr: sum += val return sum # 計算配列の準備 num_tasks = 1000 N=100000 task_list = [ np.ones((N)) for i in range(num_tasks)] results = np.zeros(( num_tasks )) num_workers = 4 # 同時実行可能タスク数を設定(おすすめ:コア数を設定) # 計算部分 with confu.ProcessPoolExecutor(max_workers=num_workers) as executor: # 並列処理実行(executor,.submitの引数で実行する関数を指定) futures = [executor.submit(reduction, task_list[i]) for i in range(num_tasks)] (done, notdone) = confu.wait(futures) # 処理がすべて終わるのを待つ for i, future in enumerate(futures): results[i] = future.result() # 結果の取得 print(results[0])
以上、以前紹介したnumbaよりコード変更は多いですね。
実行方法としては、map
を使用する方法もあります。
コード例は、記事の後半の処理時間の比較にあります。
concurrent.futuresモジュールの紹介
近年のCPUはマルチコアプロセッサとも呼ばれ、内部に複数の処理装置(コア)が搭載されています。
しかし、Pythonは基本的に1コアで処理を行っていく逐次処理となっています。
もし、2コア、4コアとCPU内の複数のコア使って処理することができたら処理速度は2倍、4倍にできるかもしれません。 この考えが並列処理です。(とここではしておきます。)
concurrent.futures
モジュールは、マルチスレッド、マルチプロセス両方のインターフェイスを提供してくれます。
このインターフェイスを使うことで、これまでのコードをほとんど変更することなくマルチスレッド/プロセスで複数コアを使った実行を実現できます。
concurrent.futuresの主役:Executor
concurrent.futures
モジュールには抽象クラスのExecutorクラスがあります。
実装クラスとして
- マルチプロセス用の
ProcessPoolExecutor
:プロセスによる並列化 - マルチスレッド用の
ThreadPoolExecutor
:スレッドによる並列化
の2つのクラスがあります。
並列化を行うときには、上記2つのクラスから1つを選んでexecutor
を作成します。
個人的には、処理が遅いと感じるときはCPUやメモリに負荷のかかる処理であることが多いと思います。
なので、迷った方は、マルチプロセス用のProcessPoolExecutor
を使ってみてはいかかでしょうか?
同時実行可能タスク数の決定:max_workers
executor
を作成するときには、同時に実行可能なタスク数を引数max_workers
で指定する必要があります。
この値は、CPUのコア数から決定することをお勧めします。 同時実行できるタスクを多くしても、本当に同時に処理できるのはCPUのコア数分だけだからです。
コア数を知りたい場合は、psutil
モジュールがおすすめです。(Python3.4以上である必要があったかも。。)
以下の手順で、CPU物理コア数を知ることができます。
import psutil psutil.cpu_count(logical=False)
os
モジュール等のcpu_count()
でもコア数を知ることができますが、Hyper-threadingがオンの場合は物理コア数の2倍の数が出力されるため注意が必要です。
max_workers
の値は
- 他の作業をしないのならば物理コア数分
- 他の作業と並行して行うなら物理コア数の半分程度
がいいのではないでしょうか?
max_workers
の値が決まったら、下記コードでexecutor
を作成します。
- マルチプロセス
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
- マルチスレッド
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
executorでできること(実行方法)
作成したexecutor
には下記の3つのメソッドがあります。
submit(fn, *args, **kwargs)
- 関数
fn
をexecutor
のリソースプールで実行するようにスケジュール - オブジェクト
Future
を返す(←このオブジェクトに実行結果も含まれる)
- 関数
map(fn, *iterables, timeout=None, chunksize=1)
- 関数
fn
をiterableオブジェクトの各要素に対して実行
- 関数
shutdown(wait=True)
executor
をシャットダウンして、リソースを開放(終了処理)
submit
とmap
は、好きなほうで実装すればいいと思います。
この後、コード例を載せています。
個人的には、map
のほうがコード追加が少なくてお手軽かなと思います。
コード例と処理時間比較
今回使用するCPUは、物理コアは2コア、論理コアが4コアのノートPCで行います。 そのため、並列処理による高速化率としては2倍程度になれば御の字かと思います。
それでは、以下のコードで並列処理の威力を見ていきたいと思います。
import time import numpy as np def reduction(arr): sum = 0.0 for val in arr: sum += val return sum # 計算する配列の準備 num_tasks = 1000 N=100000 task_list = [ np.ones((N)) for i in range(num_tasks)] results = np.zeros(( num_tasks )) t1 = time.time() for i in range(num_tasks): results[i] = reduction(task_list[i]) t2 = time.time() - t1 print('Result : ', results[0]) print('time (single) : ', t2, ' sec')
- 逐次処理の結果
結果は以下のように、私のノートPCでは約38秒かかりました。
Result : 100000.0 time (single) : 51.93065285682678 sec
submit
を用いた例
それでは、concurrent.futuresを使って処理reduction
を並列に行っていきます。
submit
を利用すると以下のようなコードになります。
import time import numpy as np import concurrent.futures as confu def reduction(arr): sum = 0.0 for val in arr: sum += val return sum # 計算する配列の準備 num_tasks = 1000 N=100000 task_list = [ np.ones((N)) for i in range(num_tasks)] results = np.zeros(( num_tasks )) num_workers = 4 # 最大のworker数 t1 = time.time() with confu.ProcessPoolExecutor(max_workers=num_workers) as executor: futures = [executor.submit(reduction, task_list[i]) for i in range(num_tasks)] (done, notdone) = confu.wait(futures) # 処理の終了待ち for i, future in enumerate(futures): results[i] = future.result() t2 = time.time() - t1 print('Result : ', results[0]) print('time (multi-process) : ', t2, ' sec')
submit
での並列処理結果
結果は以下の通りで、論理コア数4をmax_workers
に設定すると1.8倍強の高速化を実現しました。
Result : 100000.0 time (multi-process) : 28.629583835601807 sec
map
を用いた例
次に、同じコードをmap
を用いて並列化します。
すると、以下のようなコードになります。
import time import numpy as np import concurrent.futures as confu def reduction(arr): sum = 0.0 for val in arr: sum += val return sum # 計算する配列の準備 num_tasks = 1000 N=100000 task_list = [ np.ones((N)) for i in range(num_tasks)] results = np.zeros(( num_tasks )) num_workers = 4 # 最大のworker数 t1 = time.time() with confu.ProcessPoolExecutor(max_workers=num_workers) as executor: for i, result in enumerate(executor.map(reduction, task_list)): results[i] = result t2 = time.time() - t1 print('Result : ', results[0]) print('time (multi-process-map) : ', t2, ' sec')
map
での並列処理結果
処理時間は以下のとおりで、高速化率はsubmit
と同等の1.8倍です。
なので、コード修正時に書きやすいメソッドを利用すればいいと思います。
Result : 100000.0 time (multi-process-map) : 27.78085160255432 sec
まとめ
今回は、Pythonの処理を並列化することで高速化できるconcurrent.futuresモジュールを紹介しました。
今回の記事では、マルチプロセス側の例を示しています。
理由は、Pythonが実行の一貫性保証としてGlobal Interpreter Lock (GIL)を採用しているためです。 (厳密には、C言語で実装されているCPythonの話ですが、使われているPythonの多くがCPytonだと思います。)
マルチスレッドとプロセスの違いも機会があれば、書こうと思います。
ではでは。。