asyncioがPOSIXスレッドを使っている原因を調べる

tokibito先生 (id:nullpobug) がオフィスに遊びにおいでと声かけてくれたので、オープンコレクターさんに遊びに行ってました。 aodag先生 (id:aodag) と3人で雑談してたんですが、ふと以前気になっていたことを思い出したので聞いてみた。

気になっていたこと

とある勉強会の発表資料 を作っている時に、 asyncioとaiohttpを使ってとあるサーバにHTTPのリクエストを送るコード例を用意した。

import aiohttp
import asyncio

async def fetch(l, url):
    async with aiohttp.ClientSession(loop=l) as session:
        async with session.get(url) as response:
            return await response.text()


async def main(l, url, num):
    tasks = [fetch(l, url) for _ in range(num)]
    return await asyncio.gather(*tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(loop, 'http://localhost:8000', 3))
    for r in results:
        print(r)

PyCharmでは、「Concurrency Diagram」を表示する機能があり、スレッドやプロセスの動きを確認できるのですが、 このコードを実行したときのスレッドの動きは次のようになる。

f:id:nwpct1:20170330234242p:plain

なぜか、 concurrent.futures.ThreadPoolExecutor が現れている。 ドキュメントに何か書いてあるのか調べてみたのですが、それらしい記述が見つからず諦めていたのでaodag先生とtokibito先生に聞いてみた。

socket.getaddrinfo

数十分で原因を見つけてくれた。 socket.getaddrinfo が同期的に実行されてしまうため、cpythonの実装としてはこれを非同期に実行できるように変えるのではなく、ひとまず concurrent.futures.ThreadPoolExecutor により複数のスレッドで実行するようにしているらしい。

試しにgetaddrinfoが使われないコードサンプルとして、 aioredisを使ったサンプルを用意した。 ローカルに建てたRedisのサーバにUNIXドメインソケットで繋いでみる。

import asyncio
import aioredis

async def connection_example(key):
    conn = await aioredis.create_connection('/tmp/redis.sock')
    return await conn.execute('GET', key)


async def main(num):
    tasks = [connection_example('my-key') for _ in range(num)]
    return await asyncio.gather(*tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(3))
    for r in results:
        print(r)

ちなみにredisのconfigは↓。

daemonize no
pidfile /var/run/redis.pid
unixsocket /tmp/redis.sock
unixsocketperm 700
logfile ""
databases 1

この時のConcurrency Diagramを見ると、

f:id:nwpct1:20170331163806p:plain

たしかにスレッドが生成されていない。

外部のRedisサーバへのアクセス

一方でRedisのサーバを外部に用意して繋いでみると (今回は arukas.io を使わせていただきました)、

import asyncio
import aioredis

async def connection_example(key):
    conn = await aioredis.create_connection(
        ('seaof-xxx-xxx.arukascloud.io', 311390),
        db=0, password='xxxxxxxxxxxxxxx')
    return await conn.execute('GET', key)


async def main(num):
    tasks = [connection_example('my-key') for _ in range(num)]
    return await asyncio.gather(*tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(3))
    for r in results:
        print(r)

実行すると次の通り。

f:id:nwpct1:20170331163620p:plain

おー やっぱりワーカースレッドが生成されてしまうらしい。

ThreadPoolExecutorのワーカースレッドはいくつまで生成されるのか。

Semaphoreで同時に実行される数を3つに制限した時のConcurrency Diagramは次のようになる。

f:id:nwpct1:20170331164208p:plain

どうやらThreadPoolExecutor内のThreadが再利用されていない。 どこまで生成されるのかは、ドキュメントのThreadPoolExecutorのところに書いてあった。

max_workers が None か指定されない場合のデフォルト値はマシンのプロセッサの数に 5 を掛けたものになります

17.4. concurrent.futures – 並列タスク実行 — Python 3.6.1 ドキュメント

試しに30個ぐらいリクエストを送ってみる(Semaphoreは3)。

f:id:nwpct1:20170331164432p:plain

20個まで生成され、それ以降は再利用されているのを確認できた。 実装上ワーカースレッドの上限は変更できないけど、たしかに別に困るケースもなさそう。

uvloop

uvloopの方は、POSIXスレッド使わずに頑張ってるかもという話が出たので確認。

import uvloop

# 中略
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

実行すると、

f:id:nwpct1:20170330232815p:plain

おー ほんとだ。

おわりに

aodag先生とtokibito先生すごい… 自分はドキュメントを読みつつモヤモヤしたまま放置してたのですが、数十分で原因を見つけて教えてくれた。 自分もいよいよ明日から社会人なので、お二人目指して精進します。

aodag先生とtokibito先生のいるオープンコレクターさん、お仕事募集中だそうです(お世話になったので一言宣伝)。 ありがとうございました!