[Python]非同期処理[asyncio]

Python

非同期処理を行いたくなったため、標準ライブラリ「asyncio」を深掘り。

このライブラリは並行処理を行うもので、サーバーからのレスポンス待ちなど時間がかかる処理を待つ間、別のことをさせたいときに有効です。

まだ完全に理解したとは言えませんが、実際に非同期処理が使えるぐらいにはまとめていきたいと思います。

※このページのコードを動かすにはPython3.7以上が必要です。

使用時の環境
WSL2 Ubuntu - 20.04
Python 3.10.0

参考ページ
asyncio --- 非同期 I/O — Python 3.11.2 ドキュメント

 Androidアプリを作成しました。
 感情用のメモ帳です。

スポンサーリンク
スポンサーリンク

並行処理と非同期

たとえば自分が雑誌の編集者だと仮定して、モデルの写真撮影、記事の執筆、といった2つの独立したタスクを抱えているとします。

どちらのタスクも一日では終わらず、時間がかかりそうです。

次のような仕事の進め方があるでしょう。

逐次処理
モデルの写真を全員撮り終わってから、記事を書く。
またその逆。

普段のPythonの処理方法です。
コードは上から順番に実行され、時間がかかる処理も終わるまで待たなければいけません。

並行処理
モデルの写真を撮りつつ、記事を書き進める。
交互に取り掛かる。

自分ひとりでやらなければいけないなら一般的な処理方法でしょう。
時間がかかるものに着手しつつ、合間に別の作業を行います。

パソコン内部で行われている処理のように、タスクを高速で切り替えながら進めると、人の目には同時に進んでいるように映ります。

このページで行いたい処理です。

並列処理
写真撮影を同僚に頼み、記事を自分で書く。
別のリソースが処理を担当。

Pythonでの並列処理のライブラリは「threading」や「multiprocessing」、「concurrent.futures

そして、タスクに順序があり、完了するまで次のタスクに手をつけられないのが同期処理
タスクに順序がなく、他に縛られずに着手できるのが非同期処理、というのが私の理解です。

「モデルの写真を撮りつつ、記事を書き進める」のは並行処理で非同期ですが、それはお互いのタスクが関係していなかったからで、もし写真を元にして記事を書く必要があったなら、並行でも同期的な処理をする必要があります。

動作の確認


標準ライブラリ「asyncio」をインポートし、動かしてみます。

並行処理(非同期処理)を行うためにルールとしてまとめると、

  1. キーワード「async」を付けて関数を定義
  2. 処理の実行や一時停止、再開させたいところで「await
  3. await式はasync関数の中だけで使える。またawait式の右に置けるのは、コルーチン、タスク、フューチャーの各オブジェクト
  4. 実行をスケジューリング
  5. 最上位のエントリーポイントをイベントループ内で実行

asyncを付けて定義した関数の戻り値は、コルーチンオブジェクト(そもそもコルーチンとは、実行、一時停止、または再開できる機能を持ったもの)です。

フューチャーは非同期処理の最終的な結果を表すオブジェクト。

Future — Python 3.11.2 ドキュメント

import asyncio


async def main():
    print('Hello')
    await asyncio.sleep(2)
    await asyncio.sleep(5)
    print('World!')

asyncio.run(main())

最上位のエントリーポイントとしてmain関数を定義し、asyncio.runに渡しています。

asyncio.sleepはコルーチン。
コルーチンはawaitを付けないと動きません。

ただ上記コードは期待どおりにはなりません。

$ python sample.py 
Hello
(7秒待つ)
World!
※()は注釈です。

非同期処理であれば「Hello」の後、5秒後に「World!」と表示されるはずですが、結果は7秒後に表示されていました。

これは、上のルール4、実行をスケジューリングするようになっていないからです。

コルーチンを非同期処理するにはただawaitするだけではいけなくて、タスクオブジェクトでラップしてからawaitします。

タスクオブジェクト化すると、実行がスケジューリングされます。

並行処理させる

並行処理を行う関数はいくつか用意されていますが、ここでは3つ紹介します。

  • create_task
    • コルーチンをタスクオブジェクトにラップ。
  • gather
    • 渡されたものを並行処理。渡されたものがコルーチンなら自動でタスク化。
  • wait
    • 渡されたイテラブルを並行処理。コルーチンを自動でタスク化しない。

create_task

create_task関数の引数にコルーチンオブジェクトを渡すと、戻り値としてタスクオブジェクトにラップされて返ってきます。

asyncio.create_task(coroutine)

このタスクオブジェクトをawaitすると、そこで処理が一時停止し別のタスクに処理が移ります。

ただタスクがひとつだけでは意味がありません。
処理を一時停止したとしても別の処理するタスクがないからです。

import asyncio


async def fast_func():
    print("fast_funcスタート")
    await asyncio.sleep(2)
    print("fast_func終了")

async def slow_func():
    print("slow_funcスタート")
    await asyncio.sleep(5)
    print("slow_func終了")

async def main():
    print('mainスタート')
    fast_task = asyncio.create_task(fast_func())
    slow_task = asyncio.create_task(slow_func())
    await fast_task
    await slow_task
    print("main終了")

asyncio.run(main())

上記のコードは、処理が速い関数と遅い関数を模したもの。

注意点として、create_taskを使うときには、結果を変数に入れるなどして参照として保持させます
そしてそれをawait。そうしないと意図通りになりません。

実行します。

$ python sample.py 
mainスタート
fast_funcスタート
slow_funcスタート
fast_func終了
slow_func終了
main終了

トータルでかかったのは5秒ほど。

望んでいた非同期処理が実現しました。


ちょっと実験してみます。
main内部のみ変更。

async def main():
    print('mainスタート')
    fast_task = asyncio.create_task(fast_func())
    slow_task = asyncio.create_task(slow_func())
    print(asyncio.all_tasks())
    await fast_task
    await slow_task
    print(asyncio.all_tasks())
    print("main終了")

2つのタスクを作成した直後とmain終了間際に「print(asyncio.all_tasks())」を追加しています。
文字通りタスクのリスト表示です。

mainスタート
{<Task pending name='Task-2' coro=<fast_func()>>, <Task pending name='Task-1' coro=<main()> cb=[_run_until_complete_cb()>, <Task pending name='Task-3' coro=<slow_func()>>}
fast_funcスタート
slow_funcスタート
fast_func終了
slow_func終了
{<Task pending name='Task-1' coro=<main()> cb=[_run_until_complete_cb()>}
main終了

※リストは一部編集済み。
タスク作成直後のリストを見ると、Task-1・main()、Task-2・fast_func()、Task-3・slow_func()が登録されています。

main()をTaskオブジェクトにした覚えはありませんが、追加されています。
asyncio.runで動かしたときでしょうか。


もうひとつ。

async def main():
    print('mainスタート')
    fast_task = asyncio.create_task(fast_func())
    slow_task = asyncio.create_task(slow_func())
    await fast_task
    print("MIDDLE")
    await slow_task
    print("main終了")

awaitとawaitのあいだに「print("MIDDLE")」を記述。

出力は次のようにはならず、

mainスタート
fast_funcスタート
slow_funcスタート
MIDDLE
fast_func終了
slow_func終了
main終了

実際はこう。

mainスタート
fast_funcスタート
slow_funcスタート
fast_func終了
slow_func終了
MIDDLE
main終了

create_taskで追加したタスクが終わるまでmainに戻っていません。


最後です。

async def main():
    print('mainスタート')
    fast_task = asyncio.create_task(fast_func())
    slow_task = asyncio.create_task(slow_func())
    print("main終了")

awaitの2行を削除したもの。

$ python sample.py 
mainスタート
main終了
fast_funcスタート
slow_funcスタート

awaitは付けていませんが、create_taskを実行した段階で処理はスタートしたようです。

gather

gather関数を使って引数にコルーチンを複数渡すと、タスク化して並行処理してくれます。

create_taskはひとつひとつタスクを追加する感じでしたが、これはグループとしてタスクを処理します。

asyncio.gather(*awaitables)

タスクが正常に終了すると、結果のリストを戻り値として受け取ることもできます。

import asyncio


async def slow_func(delay):
    msg = f"slow_func({delay})"
    print(msg +  "スタート")
    await asyncio.sleep(delay)
    print(msg + "終了")
    return msg

async def calc(num1, num2):
    result = num1 * num2
    print(f"{num1} * {num2} = {result}")
    return f"{num1} * {num2} = {result}"

async def print_odd():
    for i in range(1, 6, 2):
        print(i)
        await asyncio.sleep(0)
    return "奇数表示終了"

async def print_even():
    for j in range(2, 8, 2):
        print(j)
        await asyncio.sleep(0)
    return "偶数表示終了"

async def main():
    print('mainスタート')
    result_list1 = await asyncio.gather(slow_func(5), calc(23, 24), print_odd(), print_even())
    result_list2 = await asyncio.gather(slow_func(10), calc(32, 16), print_odd(), print_even())
    print(result_list1)
    print(result_list2)
    print("main終了")


asyncio.run(main())

gather関数に、時間のかかる処理、計算、奇数表示、偶数表示をするコルーチンを渡しました。(awaitを忘れずに)

for文の中では、一時停止し処理を切り替えるために「asyncio.sleep(0)」を使用。

mainスタート
slow_func(5)スタート
23 * 24 = 552
1
2
3
4
5
6
slow_func(5)終了
slow_func(10)スタート
32 * 16 = 512
1
2
3
4
5
6
slow_func(10)終了
['slow_func(5)', '23 * 24 = 552', '奇数表示終了', '偶数表示終了']
['slow_func(10)', '32 * 16 = 512', '奇数表示終了', '偶数表示終了']
main終了

gather関数は2つありますが、最初の関数内で並行処理し、終わるまでは次の関数は実行されません。

結果の出力は奇数と偶数が上手いこと表示されているものの、実行順が保証されているかどうかはわかりませんでした。

結果のリストに関しては引数に入れた順になっています。

wait

gather関数と同じように引数として渡されたリストやタプルなどをグループとして並行処理し、処理が終わるまでは待機しますが、引数を自動でタスク化はしません。

asyncio.wait(awaitables_iterable)

import asyncio


async def sleep_and_print(delay):
    msg = f"{delay}秒スリープ"
    print(msg +  "スタート")
    await asyncio.sleep(delay)
    print(msg + "終了")
    return msg + "は正常終了しています"

async def main():
    print('mainスタート')
    done, pending = await asyncio.wait([asyncio.create_task(sleep_and_print(i)) for i in range(1, 4)])
    print("-" * 20)
    await asyncio.wait([asyncio.create_task(sleep_and_print(i)) for i in range(4, 7)])
    print("-" * 20)
    for task in done:
        print(task.result())
    print("main終了")

asyncio.run(main())

wait関数の戻り値は2つ――タスクオブジェクトとフューチャーオブジェクトの集合です。

mainスタート
1秒スリープスタート
2秒スリープスタート
3秒スリープスタート
1秒スリープ終了
2秒スリープ終了
3秒スリープ終了
--------------------
4秒スリープスタート
5秒スリープスタート
6秒スリープスタート
4秒スリープ終了
5秒スリープ終了
6秒スリープ終了
--------------------
2秒スリープは正常終了しています
1秒スリープは正常終了しています
3秒スリープは正常終了しています
main終了

タスクオブジェクトの集合「done」変数は、ひとつひとつ取り出してresultメソッドで結果が取り出せます。

ただ集合のため順序はバラバラですが。

その他のAPI
高水準の API インデックス — Python 3.11.2 ドキュメント

Python3.11からはTask Groupsが追加。

Python 3.11の新機能:asyncio.TaskGroupを使った予測可能でより安全な非同期処理 | gihyo.jp

ユースケース

自分が使いそうな状況のコードを書きました。

片方が動いているあいだ動かしたい

import asyncio


async def slow_func():
    print("slow_funcスタート")
    await asyncio.sleep(10)
    print("slow_func終了")

async def print_message(task):
    while not task.done():
        print("終了を待っています")
        await asyncio.sleep(2)

async def main():
    print('mainスタート')
    task1 = asyncio.create_task(slow_func())
    task2 = asyncio.create_task(print_message(task1))
    await task1
    await task2
    print("main終了")

asyncio.run(main())
mainスタート
slow_funcスタート
終了を待っています
終了を待っています
終了を待っています
終了を待っています
終了を待っています
slow_func終了
main終了

タイムアウトでキャンセル

タイムアウトを設定し、キャンセルできる「wait_for」を使います。

import asyncio


async def slow_func(delay):
    print("slow_funcスタート")
    await asyncio.sleep(delay)
    print("slow_func終了")

async def main():
    print('mainスタート')
    try:
        await asyncio.wait_for(slow_func(10), timeout=3)
    except asyncio.TimeoutError:
        print("キャンセルしました")
    print("main終了")

asyncio.run(main())

例外としてasyncio.TimeoutErrorが出るので捉えます。

mainスタート
slow_funcスタート
キャンセルしました
main終了

コールバック

処理が終わった後のコールバックを追加するには「add_done_callback

import asyncio


async def slow_func():
    print("slow_funcスタート")
    await asyncio.sleep(5)
    print("slow_func終了")
    return "成功!"

def my_callback(context):
    print(f"contextのタイプは{type(context)}")
    print(context.result())

async def main():
    print('mainスタート')
    task = asyncio.create_task(slow_func())
    task.add_done_callback(my_callback)
    await task
    print("main終了")

asyncio.run(main())

コールバック関数は()を付けずに渡しますが、コールバック関数自体にひとつ引数が渡されるため、中身は使わなくても変数を設定します(上記の場合のcontext)。

簡単な処理ならlambda式でも十分ですが。

mainスタート
slow_funcスタート
slow_func終了
contextのタイプは<class '_asyncio.Task'>
成功!
main終了
タイトルとURLをコピーしました