tokibito先生 (id:nullpobug) がオフィスに遊びにおいでと声かけてくれたので、オープンコレクターさんに遊びに行ってました。 aodag先生 (id:aodag) と3人で雑談してたんですが、ふと以前気になっていたことを思い出したので聞いてみた。
気になっていたこと
とある勉強会の発表資料 を作っている時に、 asyncioとaiohttpを使ってとあるサーバにHTTPのリクエストを送るコード例を用意した。
import aiohttp import asyncio async def fetch(l, url): async with aiohttp.ClientSession(loop=l) as session: async with session.get(url) as response: return await response.text() async def main(l, url, num): tasks = [fetch(l, url) for _ in range(num)] return await asyncio.gather(*tasks) if __name__ == '__main__': loop = asyncio.get_event_loop() results = loop.run_until_complete(main(loop, 'http://localhost:8000', 3)) for r in results: print(r)
PyCharmでは、「Concurrency Diagram」を表示する機能があり、スレッドやプロセスの動きを確認できるのですが、 このコードを実行したときのスレッドの動きは次のようになる。
なぜか、 concurrent.futures.ThreadPoolExecutor
が現れている。
ドキュメントに何か書いてあるのか調べてみたのですが、それらしい記述が見つからず諦めていたのでaodag先生とtokibito先生に聞いてみた。
socket.getaddrinfo
数十分で原因を見つけてくれた。
socket.getaddrinfo
が同期的に実行されてしまうため、cpythonの実装としてはこれを非同期に実行できるように変えるのではなく、ひとまず concurrent.futures.ThreadPoolExecutor
により複数のスレッドで実行するようにしているらしい。
- https://github.com/python/cpython/blob/6f0eb93183519024cb360162bdd81b9faec97ba6/Lib/asyncio/base_events.py#L666-L673
- https://github.com/python/cpython/blob/6f0eb93183519024cb360162bdd81b9faec97ba6/Lib/asyncio/base_events.py#L627-L636
試しにgetaddrinfoが使われないコードサンプルとして、 aioredisを使ったサンプルを用意した。 ローカルに建てたRedisのサーバにUNIXドメインソケットで繋いでみる。
import asyncio import aioredis async def connection_example(key): conn = await aioredis.create_connection('/tmp/redis.sock') return await conn.execute('GET', key) async def main(num): tasks = [connection_example('my-key') for _ in range(num)] return await asyncio.gather(*tasks) if __name__ == '__main__': loop = asyncio.get_event_loop() results = loop.run_until_complete(main(3)) for r in results: print(r)
ちなみにredisのconfigは↓。
daemonize no pidfile /var/run/redis.pid unixsocket /tmp/redis.sock unixsocketperm 700 logfile "" databases 1
この時のConcurrency Diagramを見ると、
たしかにスレッドが生成されていない。
外部のRedisサーバへのアクセス
一方でRedisのサーバを外部に用意して繋いでみると (今回は arukas.io を使わせていただきました)、
import asyncio import aioredis async def connection_example(key): conn = await aioredis.create_connection( ('seaof-xxx-xxx.arukascloud.io', 311390), db=0, password='xxxxxxxxxxxxxxx') return await conn.execute('GET', key) async def main(num): tasks = [connection_example('my-key') for _ in range(num)] return await asyncio.gather(*tasks) if __name__ == '__main__': loop = asyncio.get_event_loop() results = loop.run_until_complete(main(3)) for r in results: print(r)
実行すると次の通り。
おー やっぱりワーカースレッドが生成されてしまうらしい。
ThreadPoolExecutorのワーカースレッドはいくつまで生成されるのか。
Semaphoreで同時に実行される数を3つに制限した時のConcurrency Diagramは次のようになる。
どうやらThreadPoolExecutor内のThreadが再利用されていない。 どこまで生成されるのかは、ドキュメントのThreadPoolExecutorのところに書いてあった。
max_workers が None か指定されない場合のデフォルト値はマシンのプロセッサの数に 5 を掛けたものになります
試しに30個ぐらいリクエストを送ってみる(Semaphoreは3)。
20個まで生成され、それ以降は再利用されているのを確認できた。 実装上ワーカースレッドの上限は変更できないけど、たしかに別に困るケースもなさそう。
uvloop
uvloopの方は、POSIXスレッド使わずに頑張ってるかもという話が出たので確認。
import uvloop # 中略 loop = uvloop.new_event_loop() asyncio.set_event_loop(loop)
実行すると、
おー ほんとだ。
おわりに
aodag先生とtokibito先生すごい… 自分はドキュメントを読みつつモヤモヤしたまま放置してたのですが、数十分で原因を見つけて教えてくれた。 自分もいよいよ明日から社会人なので、お二人目指して精進します。
aodag先生とtokibito先生のいるオープンコレクターさん、お仕事募集中だそうです(お世話になったので一言宣伝)。 ありがとうございました!