-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathprocesspool.py
79 lines (57 loc) · 2.16 KB
/
processpool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""
This example demonstrates that CPU-bound operations such as `db.search()`
can be run in a process pool to prevent blocking the event loop.
"""
import asyncio
import concurrent.futures
import os
import tempfile
from typing import TypeVar
from tinydb import where
from tinydb.table import Document
from aiotinydb import AIOTinyDB
TempFileT = TypeVar("TempFileT", bound=tempfile._TemporaryFileWrapper)
async def create_db_in_process_pool(filename: str) -> None:
global create_db # local functions can't be pickled
def create_db():
async def _create_db():
print("Creating dummy database in process pool..")
async with AIOTinyDB(filename) as db:
return db.insert_multiple(
[{hex(j): i + j for j in range(2**10)} for i in range(2**15)]
)
asyncio.run(_create_db())
print("Created dummy database.")
with concurrent.futures.ProcessPoolExecutor() as executor:
await asyncio.get_event_loop().run_in_executor(executor, create_db)
async def search_in_process_pool(filename: str) -> list[Document]:
global search # local functions can't be pickled
def search():
async def _search():
print("Starting search in process pool.")
async with AIOTinyDB(filename) as db:
return db.search(where("0xff") == 1024)
result = asyncio.run(_search())
print("Completed search.")
return result
with concurrent.futures.ProcessPoolExecutor() as executor:
return await asyncio.get_event_loop().run_in_executor(executor, search)
async def still_running():
try:
while True:
await asyncio.sleep(1)
print("Event loop is still running.")
except asyncio.CancelledError:
pass
async def main():
still_running_task = asyncio.create_task(still_running())
file = tempfile.NamedTemporaryFile("r+", delete=False)
try:
await create_db_in_process_pool(file.name)
await search_in_process_pool(file.name)
finally:
file.close()
os.remove(file.name)
still_running_task.cancel()
if __name__ == "__main__":
asyncio.run(main())