プログラマーの徒然ブログ

プログラミングに関することをはじめ、興味がでたものを雑多に!

【concurrent】サクッと並列処理でPython高速化

f:id:t49m1:20200319200644j:plain

Pythonは使いやすいが、処理が遅いということが往々にしてあります。

前回は、numbaを使った高速化方法を紹介しました。

t49m1.hatenablog.com

しかし、numbaも万能ではありません。 適用が難しい場面もあります。

そんな時のために、並列処理という別のアプローチを紹介します。

Python3.2で追加されたconcurrent.futuresモジュールを使って サクッと並列処理で高速化する方法です。

どんなときに使える?

ある処理を複数の入力データに対して適用する場合などに有効です。

  • 具体例:100枚の画像に対してぼかし処理を実行する

上記例のように、複数の入力データ(画像)の処理に依存関係がないと 並列処理向きで高速化しやすいです。

逆に、1つ目の入力の処理結果をもとに2つ目の入力を処理するなど、 入力データの処理に依存関係があると並列化は難しいので、 別の高速化を行うべきです。

結論:何をすればいいの?

  1. Pythonのバージョン確認

    • 以下のコマンドを実行することで確認することができます。

      bash python --version

    • Python 3.2.x以上が表示されたら問題ありません。

    • Python 2.x.xと表示された場合、python3 --versionを実行してください。

      • Python 3.2.x以上が表示されたら、python3でプログラムを実行することで利用できます。(以降、pythonpython3と置き換えて読み進めてください。)
      • 上記以外、Python 3.2.x以上のインストールが必要になります。(他の高速化を検討するほうがお手軽かも、、)
  2. concurrent.futuresモジュールの確認

    • 以下のコマンドで今利用可能なモジュールは下記コマンドで確認できます。

      • linuxコマンドが利用できる場合、下記コマンドの後に| grep conccurentとつけると探しやすいです。

      python -c "help('modules')"

    • モジュールがなかった場合、下記コマンドでインストールしてください。

      • python3の方は、pip3になります。

      pip install futures

  3. Pythonコードの修正

    • 追加コード

      • モジュールのロード:import concurrent.futures

      • executorの作成:executor = concurrent.futures.ProcessPoolExecutor(max_workers='同時実行可能タスク数')

      • (高速化したい)関数の実行:futures = [executor.submit(reduction, task_list[i]) for i in range(num_tasks)]

        • 上記は、下の例の抜粋
      • 処理の完了の待機:(done, notdone) = confu.wait(futures)

      • 結果の取得:for i, future in enumerate(futures): result[i] = future.result()

      • Executorのシャットダウン:shutdown(wait=True)
    • コード例:修正前

import numpy as np
     
def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum
     
# 計算配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))
     
# 計算部分
for i in range(num_tasks):
    result[i] = reduction(task_list[i])
print(result[0])
  • 修正後
import numpy as np
import concurrent.futures as confu
     
def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum
     
# 計算配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))
  
num_workers = 4 # 同時実行可能タスク数を設定(おすすめ:コア数を設定)
     
# 計算部分
with confu.ProcessPoolExecutor(max_workers=num_workers) as executor:
    # 並列処理実行(executor,.submitの引数で実行する関数を指定)
    futures = [executor.submit(reduction, task_list[i]) for i in range(num_tasks)]
    (done, notdone) = confu.wait(futures) # 処理がすべて終わるのを待つ
    for i, future in enumerate(futures):
        results[i] = future.result()       # 結果の取得
                             
print(results[0])

以上、以前紹介したnumbaよりコード変更は多いですね。

実行方法としては、mapを使用する方法もあります。 コード例は、記事の後半の処理時間の比較にあります。

concurrent.futuresモジュールの紹介

近年のCPUはマルチコアプロセッサとも呼ばれ、内部に複数の処理装置(コア)が搭載されています。

しかし、Pythonは基本的に1コアで処理を行っていく逐次処理となっています。

もし、2コア、4コアとCPU内の複数のコア使って処理することができたら処理速度は2倍、4倍にできるかもしれません。 この考えが並列処理です。(とここではしておきます。)

concurrent.futuresモジュールは、マルチスレッド、マルチプロセス両方のインターフェイスを提供してくれます。

このインターフェイスを使うことで、これまでのコードをほとんど変更することなくマルチスレッド/プロセスで複数コアを使った実行を実現できます。

concurrent.futuresの主役:Executor

concurrent.futuresモジュールには抽象クラスのExecutorクラスがあります。

実装クラスとして

  • マルチプロセス用のProcessPoolExecutor:プロセスによる並列化
  • マルチスレッド用のThreadPoolExecutor:スレッドによる並列化

の2つのクラスがあります。

並列化を行うときには、上記2つのクラスから1つを選んでexecutorを作成します。

個人的には、処理が遅いと感じるときはCPUやメモリに負荷のかかる処理であることが多いと思います。

なので、迷った方は、マルチプロセス用のProcessPoolExecutorを使ってみてはいかかでしょうか?

同時実行可能タスク数の決定:max_workers

executorを作成するときには、同時に実行可能なタスク数を引数max_workersで指定する必要があります。

この値は、CPUのコア数から決定することをお勧めします。 同時実行できるタスクを多くしても、本当に同時に処理できるのはCPUのコア数分だけだからです。

コア数を知りたい場合は、psutilモジュールがおすすめです。(Python3.4以上である必要があったかも。。)

以下の手順で、CPU物理コア数を知ることができます。

import psutil
psutil.cpu_count(logical=False)

osモジュール等のcpu_count()でもコア数を知ることができますが、Hyper-threadingがオンの場合は物理コア数の2倍の数が出力されるため注意が必要です。

max_workersの値は

  • 他の作業をしないのならば物理コア数分
  • 他の作業と並行して行うなら物理コア数の半分程度

がいいのではないでしょうか?

max_workersの値が決まったら、下記コードでexecutorを作成します。

  • マルチプロセス
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
  • マルチスレッド
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

executorでできること(実行方法)

作成したexecutorには下記の3つのメソッドがあります。

  1. submit(fn, *args, **kwargs)
    • 関数fnexecutorのリソースプールで実行するようにスケジュール
    • オブジェクトFutureを返す(←このオブジェクトに実行結果も含まれる)
  2. map(fn, *iterables, timeout=None, chunksize=1)
    • 関数fnをiterableオブジェクトの各要素に対して実行
  3. shutdown(wait=True)
    • executorをシャットダウンして、リソースを開放(終了処理)

submitmapは、好きなほうで実装すればいいと思います。

この後、コード例を載せています。

個人的には、mapのほうがコード追加が少なくてお手軽かなと思います。

コード例と処理時間比較

今回使用するCPUは、物理コアは2コア、論理コアが4コアのノートPCで行います。 そのため、並列処理による高速化率としては2倍程度になれば御の字かと思います。

それでは、以下のコードで並列処理の威力を見ていきたいと思います。

import time
import numpy as np

def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum

# 計算する配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))

t1 = time.time()
for i in range(num_tasks):
    results[i] = reduction(task_list[i])
t2 = time.time() - t1

print('Result : ', results[0])
print('time (single) : ', t2, ' sec')
  • 逐次処理の結果

結果は以下のように、私のノートPCでは約38秒かかりました。

Result :  100000.0
time (single) :  51.93065285682678  sec

submitを用いた例

それでは、concurrent.futuresを使って処理reductionを並列に行っていきます。 submitを利用すると以下のようなコードになります。

import time
import numpy as np
import concurrent.futures as confu

def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum

# 計算する配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))

num_workers = 4 # 最大のworker数

t1 = time.time()
with confu.ProcessPoolExecutor(max_workers=num_workers) as executor:
    futures = [executor.submit(reduction, task_list[i]) for i in range(num_tasks)]
    (done, notdone) = confu.wait(futures) # 処理の終了待ち
    for i, future in enumerate(futures):
        results[i] = future.result()
t2 = time.time() - t1

print('Result : ', results[0])
print('time (multi-process) : ', t2, ' sec')
  • submitでの並列処理結果

結果は以下の通りで、論理コア数4をmax_workersに設定すると1.8倍強の高速化を実現しました。

Result :  100000.0
time (multi-process) :  28.629583835601807  sec

mapを用いた例

次に、同じコードをmapを用いて並列化します。 すると、以下のようなコードになります。

import time
import numpy as np
import concurrent.futures as confu

def reduction(arr):
    sum = 0.0
    for val in arr:
        sum += val
    return sum

# 計算する配列の準備
num_tasks = 1000
N=100000
task_list = [ np.ones((N)) for i in range(num_tasks)]
results = np.zeros(( num_tasks ))

num_workers = 4 # 最大のworker数

t1 = time.time()
with confu.ProcessPoolExecutor(max_workers=num_workers) as executor:
    for i, result in enumerate(executor.map(reduction, task_list)):
        results[i] = result
t2 = time.time() - t1

print('Result : ', results[0])
print('time (multi-process-map) : ', t2, ' sec')
  • mapでの並列処理結果

処理時間は以下のとおりで、高速化率はsubmitと同等の1.8倍です。 なので、コード修正時に書きやすいメソッドを利用すればいいと思います。

Result :  100000.0
time (multi-process-map) :  27.78085160255432  sec

まとめ

今回は、Pythonの処理を並列化することで高速化できるconcurrent.futuresモジュールを紹介しました。

今回の記事では、マルチプロセス側の例を示しています。

理由は、Pythonが実行の一貫性保証としてGlobal Interpreter Lock (GIL)を採用しているためです。 (厳密には、C言語で実装されているCPythonの話ですが、使われているPythonの多くがCPytonだと思います。)

マルチスレッドとプロセスの違いも機会があれば、書こうと思います。

ではでは。。

【numba】サクッとPythonを高速化

www.python.jp

Deep Learningも流行ってきて、Pythonでプログラムを作成する人も多いんじゃないでしょうか?

Pythonは簡単に書けるので、私もちょっとしたデータ整形など色んな所で使っています。

そんなPythonですが、時々気になってしまうのが処理の遅さです。 C/C++などのコンパイル言語と比べると、処理時間が長くなりがちです。

そんなPythonの処理を簡単に高速化できるかもしれないnumbaというモジュールを紹介します。

最初に結論:何をすればいいの?

1: numbaのインストール

pip install numba

2: from numba import jit@jitの追加

from numba import jit

@jit
def func(x, y):

以上、これで高速化できない場合は、numbaが扱えるようにコードを修正していく必要があるかも。。

numbaとは?

Python仮想マシンコードを取得し、LLVMコンパイラが扱えるようにLLVM-IRを生成、LLVMコンパイラで動作マシン用のネイティブコードにするようです。

そのため、numbaのデコレータ@jitを付与したPythonコードを実行すると、初回にコンパイルが行われます。 コンパイル済みの関数が実行されるようになるので、重い処理や何回も呼ばれる処理なのでは、高速化の恩恵を受けやすいです。

メリット

  • デコレータの追加だけで手軽に高速化できる
  • 事前コンパイル不要で、これまで通り実行可能

デメリット

サンプルコードは、配列の総和を計算するコードです。

import time
import numpy as np

def sum_reduction(arr):
    sum = 0.
    for i in range(len(arr)):
        sum += arr[i]
    return sum

N = 10000000
arr = np.ones(N)

t1 = time.time()
sum = sum_reduction(arr)
t2 = time.time() - t1

print('Result: ', sum)
print('Time: ', t2, ' sec')

結果

Result:  10000000.0
Time:  1.4054384231567383  sec

numbaを適用する。

import time
import numpy as np
from numba import jit

@jit
def sum_reduction(arr):
    sum = 0.
    for i in range(len(arr)):
        sum += arr[i]
    return sum

N = 10000000
arr = np.ones(N)

t1 = time.time()
sum = sum_reduction(arr)
t2 = time.time() - t1

print('Result: ', sum)
print('Time: ', t2, ' sec')

結果

Result:  10000000.0
Time:  0.08794617652893066  sec

numbaのデコレータを付与するだけで、サクッと16倍も高速になりました。

速くならなかったら?

Objectモードが適用されているかもしれない。

今のnumbaでは、No pythonモードでコンパイルし、コンパイルに失敗するとObjectモードでコンパイルされます。

Objectモードは、No pythonモードと異なり型推定に失敗した部分等はPythonで処理されるため、かえって遅くなる場合もある。

将来の仕様では、Objectモードはオプションとなるようである。

現状、No pythonモードを強制するためには@jit(nopython=True)とする必要がある。 このように設定すると、型推定が失敗するとコンパイルエラーが出力されるようになる。

エラーが出てきた場合は、関数の中で重い処理のみを別の関数として切り出し、numbaを適用するなどの工夫をする必要がある。

まとめ

numbaを利用すると、比較的簡単にPythonのプログラムを高速化できる。

ただし、高速化できない場合も往々にある。 そのときは、コンパイルエラーに従ってコードを粛々と修正する。

たまに、遅いなと感じたら、numbaを使ってみてはいかがでしょうか?

より詳細な情報は以下の公式ページをご参照ください。

numba.pydata.org

【d払い】3月17日からの+10%還元キャンペーンまとめ!

f:id:t49m1:20200317230412j:plain

本日(2020年3月17日)から、ドコモのd払いで街のお店 d払い+10%還元キャンペーンが始まりました。

本記事は、間違いを含む可能性もありあます。 ご利用前には、下記公式ページをご確認ください。 service.smt.docomo.ne.jp

今回のキャンペーンは、一定の条件を満たせば誰でも利用金額の10%が追加で還元されます。

追加還元なので、スーパー還元プログラムと併用すると最大17%の還元になります。

今回のキャンペーンまとめ

エントリー期間 2020年3月9日(月)~ 2020年3月26日(木)
キャンペーン実施期間 2020年3月17日(火)~ 2020年3月26日(木)
キャンペーン内容 対象店舗でd払いで決済すると利用金額の10%還元*1

キャンペーンの適用条件

今回のキャンペーンに参加するためには、d払いからキャンペーンにエントリーが必要です。

エントリーすると、対象店舗でのd払い決済が還元の対象となります。

しかし、注意が必要です。下記の決済は対象となりません。

  • d払い(コード決済)のお支払い方法をdカード以外のクレジットカードに設定されたご購入分(電話料金合算払い、dカードdカードプリペイド除く)でのお支払い、口座払いが対象)
  • d払い(iD)でのご購入分
  • dポイントでのご利用分
  • クーポンでのご利用分
  • キャンペーン期間内に、正常に購入完了とならなかった商品のご購入分
  • キャンセル、返品をした商品のご購入分
  • お支払い方法に設定しているクレジットカードや通信によるエラー等があり、d払い決済が完了しなかった場合

適用条件を満たすためには、支払い方法はdカード、電話料金合算支払い、口座支払いのいずれかでなければならないみたいです。

詳しくは公式キャンペーンページを参照ください。

キャンペーン内容

ジャンル 主な対象店舗
コンビニ LAWSON、FamilyMartセブンイレブン
スーパー オークワ、サミット
ドラックストア マツモトキヨシ、ウエルシアグループ
量販店 エディオンビックカメラ東急ハンズ

主な対象店舗として、上記の店舗でのd払い決済の10%が還元されます。

より詳細な店舗情報は公式キャンペーンページをチェック!

今回のキャンペーンのアカウントの合計上限は1,000ポイントで、1決済上限値は500ポイントです。

1回の決済を5,000円までに抑え、キャンペーン期間中に合計10,000円分の買い物を対象店舗でするだけでキャンペーンの恩恵を100%受けることができます!

キャンペーン期間中に突然終了するかも

今回のキャンペーンは、合計10億ポイントに到達すると期間中でもキャンペーンは終了します。

終了のアナウンスは公式キャンペーンページで行われるようです。

ほしいものがある場合は、早めに購入したほうがいいみたいですね。

*1:還元ポイント上限あり

【日経平均】コロナウイルスをSARS時と比べてみた

f:id:t49m1:20200317231135j:plain

新型コロナウイルスの終息が見えない中、世界中の株価も下落の一途をたどっています。

楽観的にみられていた1月ごろ、記事でよく目にした"3月が買いのチャンス"の 3月中旬になったので、日経平均を2002年のSARS時と比較してみました。

f:id:t49m1:20200316230829j:plain

1月頃は、SARSのときと比較して似たような日経平均の変動でした。

SARSのときは、その後、10%弱の低下が発生した程度でした。 しかし、今回のコロナウイルスでは、2月中旬(60日経過)から日経平均が大きく低下し始めめています。

SARSと比べても、今回のコロナウイルスは世界的に拡大しており、また、亜熱帯のタイ等でも感染が確認されており夏になったら終息する保証もありません。

経済の低迷もしばらく続きそうですね。。。

【Git】エイリアス設定(gitコマンドの省略)

f:id:t49m1:20200317231646p:plain

Githubはソフトウェア開発プラットフォームで、仕事や研究、趣味で使われている人も多いのではないでしょうか?  

しかし、Githubのコマンドは、git checkoutgit branchなど長い!(っと私は思います。)

そんな長いコマンドを短くできるエイリアスの設定方法を、よく忘れるので、載せておきます。

注意 間違を含んている可能性もあるので、ご利用は各自の責任でお願いします。

設定方法:git configコマンド


エイリアスの適用範囲をシステム全体、ユーザ、特定リポジトリ限定にするかでコマンドが少し変わります。 個人で、エイリアス設定したい場合は、ユーザ反映が適しています。

git branchgit bエイリアスを例にすると、以下の3種類のコマンドになります。

  • システム全体に反映
git config --system alias.b branch
  • ユーザ反映
git config --global alias.b branch
git config --local alias.b branch

こんな感じで設定すると、git bgit branchが実行されます!

2ワード以上のコマンドのエイリアス登録


git checkout -bなどオプションや2ワード以上になるコマンドは、以下のように"で囲い登録ます。

git config --global alias.chb "checkout -b"

適用範囲の違い


最後に、適用範囲の違いは以下のとおりです。

  • システム全体 :同システムの全ユーザが利用可能
  • ユーザ    :設定を行ったユーザのみ利用可能
  • リポジトリ限定:設定を行ったリポジトリディレクトリでのみ利用可能