|
| 1 | +## concurrent.futures模块 |
| 2 | +python标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。 |
| 3 | +#### 1.单线程下载任务 |
| 4 | +```python |
| 5 | +import time |
| 6 | +import requests |
| 7 | + |
| 8 | +def download(index, pic_url): |
| 9 | + resp = requests.get(pic_url) |
| 10 | + with open('down.png', 'wb') as fw: |
| 11 | + fw.write(resp.content) |
| 12 | + return index |
| 13 | + |
| 14 | +if __name__ == '__main__': |
| 15 | + url = 'https://github.com/daacheng/PythonBasic/blob/master/pic/python_basic/tuple2.png' |
| 16 | + urls = [(index, url) for index in range(1, 6)] |
| 17 | + t0 = time.time() |
| 18 | + for index, pic_url in urls: |
| 19 | + res = download(index, pic_url) |
| 20 | + print('下载成功-{}'.format(res)) |
| 21 | + elapsed = time.time() - t0 |
| 22 | + print('下载耗时: {}'.format(elapsed)) |
| 23 | +``` |
| 24 | +运行结果 |
| 25 | +```python |
| 26 | +下载成功-1 |
| 27 | +下载成功-2 |
| 28 | +下载成功-3 |
| 29 | +下载成功-4 |
| 30 | +下载成功-5 |
| 31 | +下载耗时: 3.3739817142486572 |
| 32 | +``` |
| 33 | + |
| 34 | +#### 2.使用concurrent.futures的多线程下载任务 |
| 35 | +#### 方式一:主要是concurrent.futures.Executor对象和concurrent.futures.Future对象 |
| 36 | +1. **futures.ThreadPoolExecutor(workers)** 创建线程池Executor对象,指定工作线程的数量。 |
| 37 | +2. **executor.submit(download, index, pic_url)** 把任务交给concurrent.futures.Executor对象,通过executor.submit() 创建concurrent.futures.Future对象。 |
| 38 | +3. **futures.as_completed(todo)** concurrent.futures.as_completed函数的参数是一个Future对象列表,返回值是一个迭代器,在Future对象运行结束后返回Future对象。 |
| 39 | +4. future.result(timeout)方法,会阻塞调用方所在的线程,直到有结果可返回. |
| 40 | + |
| 41 | +```python |
| 42 | +import time |
| 43 | +import requests |
| 44 | +from concurrent import futures |
| 45 | + |
| 46 | +def download(index, pic_url): |
| 47 | + resp = requests.get(pic_url) |
| 48 | + with open('down.png', 'wb') as fw: |
| 49 | + fw.write(resp.content) |
| 50 | + return index |
| 51 | + |
| 52 | +if __name__ == '__main__': |
| 53 | + url = 'https://github.com/daacheng/PythonBasic/blob/master/pic/python_basic/tuple2.png' |
| 54 | + urls = [(index, url) for index in range(1, 6)] |
| 55 | + workers = len(urls) |
| 56 | + t0 = time.time() |
| 57 | + with futures.ThreadPoolExecutor(workers) as executor: |
| 58 | + todo = {executor.submit(download, index, pic_url) for index, pic_url in urls} |
| 59 | + for future in futures.as_completed(todo): |
| 60 | + # 本示例中调用future.result()方法绝不会阻塞,因为future是由as_completed函数产出。 |
| 61 | + res = future.result() |
| 62 | + print('下载成功-{}'.format(res)) |
| 63 | + |
| 64 | + elapsed = time.time() - t0 |
| 65 | + print('下载耗时: {}'.format(elapsed)) |
| 66 | +``` |
| 67 | +运行结果 |
| 68 | +```python |
| 69 | +下载成功-4 |
| 70 | +下载成功-3 |
| 71 | +下载成功-2 |
| 72 | +下载成功-5 |
| 73 | +下载成功-1 |
| 74 | +下载耗时: 0.7290542125701904 |
| 75 | +``` |
| 76 | + |
| 77 | +#### 方式二 |
| 78 | +executor.map()方法的作用与内置的map函数类似,download函数会在多个线程中并发调用, map方法返回一个生成器,因此可以迭代,获取各个线程返回的值。 |
| 79 | + |
| 80 | +```python |
| 81 | +import time |
| 82 | +import requests |
| 83 | +from concurrent import futures |
| 84 | + |
| 85 | +def download(index, url): |
| 86 | + resp = requests.get(url) |
| 87 | + with open('down.png', 'wb') as fw: |
| 88 | + fw.write(resp.content) |
| 89 | + return index |
| 90 | + |
| 91 | +if __name__ == '__main__': |
| 92 | + url = 'https://github.com/daacheng/PythonBasic/blob/master/pic/python_basic/tuple2.png' |
| 93 | + urls = [(index, url) for index in range(1, 6)] |
| 94 | + workers = len(urls) |
| 95 | + t0 = time.time() |
| 96 | + with futures.ThreadPoolExecutor(workers) as executor: |
| 97 | + download_tasks_res = executor.map(download, (i[0] for i in urls), (i[1] for i in urls)) |
| 98 | + |
| 99 | + for res in download_tasks_res: |
| 100 | + print('下载成功-{}'.format(res)) |
| 101 | + |
| 102 | + elapsed = time.time() - t0 |
| 103 | + print('下载耗时: {}'.format(elapsed)) |
| 104 | +``` |
| 105 | +运行结果 |
| 106 | +```python |
| 107 | +下载成功-1 |
| 108 | +下载成功-2 |
| 109 | +下载成功-3 |
| 110 | +下载成功-4 |
| 111 | +下载成功-5 |
| 112 | +下载耗时: 0.5195839405059814 |
| 113 | +``` |
| 114 | + |
| 115 | + |
| 116 | +#### 3. futures.ProcessPoolExecutor与futures.ThreadPoolExecutor的用法类似 |
| 117 | +对CPU密集型工作来说,要启动多个进程,规避GIL。创建多个进程最简单的方式是用futures.ProcessPoolExecutor 类。对于比较复杂的多线程,多进程使用场景,还是要用threading和multiprocessing来替代futures.ThreadPoolExecutor和futures.ProcessPoolExecutor。 |
0 commit comments