非同期処理を行いたくなったため、標準ライブラリ「asyncio」を深掘り。
このライブラリは並行処理を行うもので、サーバーからのレスポンス待ちなど時間がかかる処理を待つ間、別のことをさせたいときに有効です。
まだ完全に理解したとは言えませんが、実際に非同期処理が使えるぐらいにはまとめていきたいと思います。
※このページのコードを動かすにはPython3.7以上が必要です。
使用時の環境
WSL2 Ubuntu - 20.04
Python 3.10.0
参考ページ
asyncio --- 非同期 I/O — Python 3.11.2 ドキュメント
並行処理と非同期
たとえば自分が雑誌の編集者だと仮定して、モデルの写真撮影、記事の執筆、といった2つの独立したタスクを抱えているとします。
どちらのタスクも一日では終わらず、時間がかかりそうです。
次のような仕事の進め方があるでしょう。
逐次処理
モデルの写真を全員撮り終わってから、記事を書く。
またその逆。
普段のPythonの処理方法です。
コードは上から順番に実行され、時間がかかる処理も終わるまで待たなければいけません。
並行処理
モデルの写真を撮りつつ、記事を書き進める。
交互に取り掛かる。
自分ひとりでやらなければいけないなら一般的な処理方法でしょう。
時間がかかるものに着手しつつ、合間に別の作業を行います。
パソコン内部で行われている処理のように、タスクを高速で切り替えながら進めると、人の目には同時に進んでいるように映ります。
このページで行いたい処理です。
並列処理
写真撮影を同僚に頼み、記事を自分で書く。
別のリソースが処理を担当。
Pythonでの並列処理のライブラリは「threading」や「multiprocessing」、「concurrent.futures」
そして、タスクに順序があり、完了するまで次のタスクに手をつけられないのが同期処理、
タスクに順序がなく、他に縛られずに着手できるのが非同期処理、というのが私の理解です。
「モデルの写真を撮りつつ、記事を書き進める」のは並行処理で非同期ですが、それはお互いのタスクが関係していなかったからで、もし写真を元にして記事を書く必要があったなら、並行でも同期的な処理をする必要があります。
動作の確認
標準ライブラリ「asyncio」をインポートし、動かしてみます。
並行処理(非同期処理)を行うためにルールとしてまとめると、
- キーワード「async」を付けて関数を定義
- 処理の実行や一時停止、再開させたいところで「await」
- await式はasync関数の中だけで使える。またawait式の右に置けるのは、コールチン、タスク、フューチャーの各オブジェクト
- 実行をスケジューリング
- 最上位のエントリーポイントをイベントループ内で実行
asyncを付けて定義した関数の戻り値は、コールチンオブジェクト(そもそもコールチンとは、実行、一時停止、または再開できる機能を持ったもの)です。
フューチャーは非同期処理の最終的な結果を表すオブジェクト。
import asyncio
async def main():
print('Hello')
await asyncio.sleep(2)
await asyncio.sleep(5)
print('World!')
asyncio.run(main())
最上位のエントリーポイントとしてmain関数を定義し、asyncio.runに渡しています。
asyncio.sleepはコールチン。
コールチンはawaitを付けないと動きません。
ただ上記コードは期待どおりにはなりません。
$ python sample.py Hello (7秒待つ) World! ※()は注釈です。
非同期処理であれば「Hello」の後、5秒後に「World!」と表示されるはずですが、結果は7秒後に表示されていました。
これは、上のルール4、実行をスケジューリングするようになっていないからです。
コールチンを非同期処理するにはただawaitするだけではいけなくて、タスクオブジェクトでラップしてからawaitします。
タスクオブジェクト化すると、実行がスケジューリングされます。
並行処理させる
並行処理を行う関数はいくつか用意されていますが、ここでは3つ紹介します。
- create_task
- コールチンをタスクオブジェクトにラップ。
- gather
- 渡されたものを並行処理。渡されたものがコールチンなら自動でタスク化。
- wait
- 渡されたイテラブルを並行処理。コールチンを自動でタスク化しない。
create_task
create_task関数の引数にコールチンオブジェクトを渡すと、戻り値としてタスクオブジェクトにラップされて返ってきます。
asyncio.create_task(coroutine)
このタスクオブジェクトをawaitすると、そこで処理が一時停止し別のタスクに処理が移ります。
ただタスクがひとつだけでは意味がありません。
処理を一時停止したとしても別の処理するタスクがないからです。
import asyncio
async def fast_func():
print("fast_funcスタート")
await asyncio.sleep(2)
print("fast_func終了")
async def slow_func():
print("slow_funcスタート")
await asyncio.sleep(5)
print("slow_func終了")
async def main():
print('mainスタート')
fast_task = asyncio.create_task(fast_func())
slow_task = asyncio.create_task(slow_func())
await fast_task
await slow_task
print("main終了")
asyncio.run(main())
上記のコードは、処理が速い関数と遅い関数を模したもの。
注意点として、create_taskを使うときには、結果を変数に入れるなどして参照として保持させます。
そしてそれをawait。そうしないと意図通りになりません。
実行します。
$ python sample.py mainスタート fast_funcスタート slow_funcスタート fast_func終了 slow_func終了 main終了
トータルでかかったのは5秒ほど。
望んでいた非同期処理が実現しました。
ちょっと実験してみます。
main内部のみ変更。
async def main():
print('mainスタート')
fast_task = asyncio.create_task(fast_func())
slow_task = asyncio.create_task(slow_func())
print(asyncio.all_tasks())
await fast_task
await slow_task
print(asyncio.all_tasks())
print("main終了")
2つのタスクを作成した直後とmain終了間際に「print(asyncio.all_tasks())」を追加しています。
文字通りタスクのリスト表示です。
mainスタート {<Task pending name='Task-2' coro=<fast_func()>>, <Task pending name='Task-1' coro=<main()> cb=[_run_until_complete_cb()>, <Task pending name='Task-3' coro=<slow_func()>>} fast_funcスタート slow_funcスタート fast_func終了 slow_func終了 {<Task pending name='Task-1' coro=<main()> cb=[_run_until_complete_cb()>} main終了
※リストは一部編集済み。
タスク作成直後のリストを見ると、Task-1・main()、Task-2・fast_func()、Task-3・slow_func()が登録されています。
main()をTaskオブジェクトにした覚えはありませんが、追加されています。
asyncio.runで動かしたときでしょうか。
もうひとつ。
async def main():
print('mainスタート')
fast_task = asyncio.create_task(fast_func())
slow_task = asyncio.create_task(slow_func())
await fast_task
print("MIDDLE")
await slow_task
print("main終了")
awaitとawaitのあいだに「print("MIDDLE")」を記述。
出力は次のようにはならず、
mainスタート fast_funcスタート slow_funcスタート MIDDLE fast_func終了 slow_func終了 main終了
実際はこう。
mainスタート fast_funcスタート slow_funcスタート fast_func終了 slow_func終了 MIDDLE main終了
create_taskで追加したタスクが終わるまでmainに戻っていません。
最後です。
async def main():
print('mainスタート')
fast_task = asyncio.create_task(fast_func())
slow_task = asyncio.create_task(slow_func())
print("main終了")
awaitの2行を削除したもの。
$ python sample.py mainスタート main終了 fast_funcスタート slow_funcスタート
awaitは付けていませんが、create_taskを実行した段階で処理はスタートしたようです。
gather
gather関数を使って引数にコールチンを複数渡すと、タスク化して並行処理してくれます。
create_taskはひとつひとつタスクを追加する感じでしたが、これはグループとしてタスクを処理します。
asyncio.gather(*awaitables)
タスクが正常に終了すると、結果のリストを戻り値として受け取ることもできます。
import asyncio
async def slow_func(delay):
msg = f"slow_func({delay})"
print(msg + "スタート")
await asyncio.sleep(delay)
print(msg + "終了")
return msg
async def calc(num1, num2):
result = num1 * num2
print(f"{num1} * {num2} = {result}")
return f"{num1} * {num2} = {result}"
async def print_odd():
for i in range(1, 6, 2):
print(i)
await asyncio.sleep(0)
return "奇数表示終了"
async def print_even():
for j in range(2, 8, 2):
print(j)
await asyncio.sleep(0)
return "偶数表示終了"
async def main():
print('mainスタート')
result_list1 = await asyncio.gather(slow_func(5), calc(23, 24), print_odd(), print_even())
result_list2 = await asyncio.gather(slow_func(10), calc(32, 16), print_odd(), print_even())
print(result_list1)
print(result_list2)
print("main終了")
asyncio.run(main())
gather関数に、時間のかかる処理、計算、奇数表示、偶数表示をするコールチンを渡しました。(awaitを忘れずに)
for文の中では、一時停止し処理を切り替えるために「asyncio.sleep(0)」を使用。
mainスタート slow_func(5)スタート 23 * 24 = 552 1 2 3 4 5 6 slow_func(5)終了 slow_func(10)スタート 32 * 16 = 512 1 2 3 4 5 6 slow_func(10)終了 ['slow_func(5)', '23 * 24 = 552', '奇数表示終了', '偶数表示終了'] ['slow_func(10)', '32 * 16 = 512', '奇数表示終了', '偶数表示終了'] main終了
gather関数は2つありますが、最初の関数内で並行処理し、終わるまでは次の関数は実行されません。
結果の出力は奇数と偶数が上手いこと表示されているものの、実行順が保証されているかどうかはわかりませんでした。
結果のリストに関しては引数に入れた順になっています。
wait
gather関数と同じように引数として渡されたリストやタプルなどをグループとして並行処理し、処理が終わるまでは待機しますが、引数を自動でタスク化はしません。
asyncio.wait(awaitables_iterable)
import asyncio
async def sleep_and_print(delay):
msg = f"{delay}秒スリープ"
print(msg + "スタート")
await asyncio.sleep(delay)
print(msg + "終了")
return msg + "は正常終了しています"
async def main():
print('mainスタート')
done, pending = await asyncio.wait([asyncio.create_task(sleep_and_print(i)) for i in range(1, 4)])
print("-" * 20)
await asyncio.wait([asyncio.create_task(sleep_and_print(i)) for i in range(4, 7)])
print("-" * 20)
for task in done:
print(task.result())
print("main終了")
asyncio.run(main())
wait関数の戻り値は2つ――タスクオブジェクトとフューチャーオブジェクトの集合です。
mainスタート 1秒スリープスタート 2秒スリープスタート 3秒スリープスタート 1秒スリープ終了 2秒スリープ終了 3秒スリープ終了 -------------------- 4秒スリープスタート 5秒スリープスタート 6秒スリープスタート 4秒スリープ終了 5秒スリープ終了 6秒スリープ終了 -------------------- 2秒スリープは正常終了しています 1秒スリープは正常終了しています 3秒スリープは正常終了しています main終了
タスクオブジェクトの集合「done」変数は、ひとつひとつ取り出してresultメソッドで結果が取り出せます。
ただ集合のため順序はバラバラですが。
その他のAPI
高水準の API インデックス — Python 3.11.2 ドキュメント
Python3.11からはTask Groupsが追加。
Python 3.11の新機能:asyncio.TaskGroupを使った予測可能でより安全な非同期処理 | gihyo.jp
ユースケース
自分が使いそうな状況のコードを書きました。
片方が動いているあいだ動かしたい
import asyncio
async def slow_func():
print("slow_funcスタート")
await asyncio.sleep(10)
print("slow_func終了")
async def print_message(task):
while not task.done():
print("終了を待っています")
await asyncio.sleep(2)
async def main():
print('mainスタート')
task1 = asyncio.create_task(slow_func())
task2 = asyncio.create_task(print_message(task1))
await task1
await task2
print("main終了")
asyncio.run(main())
mainスタート slow_funcスタート 終了を待っています 終了を待っています 終了を待っています 終了を待っています 終了を待っています slow_func終了 main終了
タイムアウトでキャンセル
タイムアウトを設定し、キャンセルできる「wait_for」を使います。
import asyncio
async def slow_func(delay):
print("slow_funcスタート")
await asyncio.sleep(delay)
print("slow_func終了")
async def main():
print('mainスタート')
try:
await asyncio.wait_for(slow_func(10), timeout=3)
except asyncio.TimeoutError:
print("キャンセルしました")
print("main終了")
asyncio.run(main())
例外としてasyncio.TimeoutErrorが出るので捉えます。
mainスタート slow_funcスタート キャンセルしました main終了
コールバック
処理が終わった後のコールバックを追加するには「add_done_callback」
import asyncio
async def slow_func():
print("slow_funcスタート")
await asyncio.sleep(5)
print("slow_func終了")
return "成功!"
def my_callback(context):
print(f"contextのタイプは{type(context)}")
print(context.result())
async def main():
print('mainスタート')
task = asyncio.create_task(slow_func())
task.add_done_callback(my_callback)
await task
print("main終了")
asyncio.run(main())
コールバック関数は()を付けずに渡しますが、コールバック関数自体にひとつ引数が渡されるため、中身は使わなくても変数を設定します(上記の場合のcontext)。
簡単な処理ならlambda式でも十分ですが。
mainスタート slow_funcスタート slow_func終了 contextのタイプは<class '_asyncio.Task'> 成功! main終了