Python语法 您所在的位置:网站首页 python多线程和多进程 Python语法

Python语法

#Python语法| 来源: 网络整理| 查看: 265

相关概念并发和并行 并发:指一个时间段内,在一个CPU(CPU核心)能运行的程序的数量。 并行:指在同一时刻,在多个CPU上运行多个程序,跟CPU(CPU核心)数量有关。

因为

计算机CPU(CPU核心)在同一时刻只能运行一个程序。

同步和异步 同步是指代码调用的时候必须等待执行完成才能执行剩余的逻辑。 异步是指代码在调用的时候,不用等待操作完成,直接执行剩余逻辑。 阻塞和非阻塞 阻塞是指调用函数的时候当前线程被挂起。 非阻塞是指调用函数时当前线程不会被挂起,而是立即返回。 CPU密集型和I/O密集型

CPU密集型(CPU-bound):

CPU密集型又叫做计算密集型,指I/O在很短时间就能完成,CPU需要大量的计算和处理,特点是CPU占用高。

例如:压缩解压缩、加密解密、正则表达式搜索。

IO密集型(I/O-bound):

IO密集型是指系统运行时大部分时间时CPU在等待IO操作(硬盘/内存)的读写操作,特点是CPU占用较低。

例如:文件读写、网络爬虫、数据库读写。

多进程、多线程、多协程的对比 类型 优点 缺点 适用 多进程Process(multiprocessing) 可以利用CPU多核并行运算 占用资源最多可启动数目比线程少 CPU密集型计算 多线程Thread(threading) 相比进程更轻量占用资源少 相比进程,多线程只能并发执行,不能利用多CPU(GIL)相比协程启动数目有限制,占用内存资源有线程切换开销 IO密集型计算、同时运行的任务要求不多 多协程Coroutine(asyncio) 内存开销最少,启动协程数量最多 支持库的限制代码实现复杂 IO密集型计算、同时运行的较多任务

GIL全称Global Interpreter Lock

下图为GIL的运行

img

Python的多线程是伪多线程,同时只能有一个线程运行。

一个进程能够启动N个线程,数量受系统限制。

一个线程能够启动N个协程,数量不受限制。

怎么选择

对于其他语言来说,多线程是能同时利用多CPU(核)的,所以是适用CPU密集型计算的,但是Python由于GIL的限制,只能使用IO密集型计算。

所以对于Python来说:

对于IO密集型来说能用多协程就用多协程,没有库支持才用多线程。

对于CPU密集型就只能用多进程了。

协程(异步IO)简单示例123456789101112131415import asyncioasync def test(): await asyncio.sleep(3) return "123"async def main(): result = await test() print(result)if __name__ == '__main__': asyncio.run(main()) 单次请求查看结果12345678910111213141516171819import threadingimport asyncioasync def myfun(index): print(f'[{index}]({threading.currentThread().name})') await asyncio.sleep(1) return indexdef getfuture(future): print(f"结果为:{future.result()}")if __name__ == "__main__": loop = asyncio.get_event_loop() future = asyncio.ensure_future(myfun(1)) future.add_done_callback(getfuture) loop.run_until_complete(future) loop.close()

或者

123456789101112131415import threadingimport asyncioasync def myfun(index): print(f'[{index}]({threading.currentThread().name})') await asyncio.sleep(1) return indexif __name__ == "__main__": loop = asyncio.get_event_loop() future = asyncio.ensure_future(myfun(1)) loop.run_until_complete(future) print(f"结果为:{future.result()}") loop.close() 批量请求查看结果123456789101112131415161718import threadingimport asyncioasync def myfun(index): print(f'线程({threading.currentThread().name}) 传入参数({index})') await asyncio.sleep(1) return indexloop = asyncio.get_event_loop()future_list = []for item in range(3): future = asyncio.ensure_future(myfun(item)) future_list.append(future)loop.run_until_complete(asyncio.wait(future_list))for future in future_list: print(f"结果为:{future.result()}")loop.close() asyncio.wait和asyncio.gather1234567891011121314import threadingimport asyncioasync def myfun(index): print(f'[{index}]({threading.currentThread().name})') await asyncio.sleep(1)loop = asyncio.get_event_loop()tasks = [myfun(1), myfun(2)]loop.run_until_complete(asyncio.wait(tasks))#loop.run_until_complete(asyncio.gather(*tasks))loop.close()

asyncio.gather 和asyncio.wait区别:

在内部wait()使用一个set保存它创建的Task实例。因为set是无序的所以这也就是我们的任务不是顺序执行的原因。wait的返回值是一个元组,包括两个集合,分别表示已完成和未完成的任务。wait第二个参数为一个超时值达到这个超时时间后,未完成的任务状态变为pending,当程序退出时还有任务没有完成此时就会看到如下的错误提示。

gather的使用gather的作用和wait类似不同的是。

gather任务无法取消。 返回值是一个结果列表 可以按照传入参数的 顺序,顺序输出。 协程和多线程结合同时多个请求123456789101112131415161718192021222324import asyncioimport timefrom concurrent.futures import ThreadPoolExecutorimport requestsdef myquery(url): r = requests.get(url) print(r.text) return r.textif __name__ == "__main__": loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(3) urls = ["https://www.psvmc.cn/userlist.json", "https://www.psvmc.cn/login.json"] tasks = [] start_time = time.time() for url in urls: task = loop.run_in_executor(executor, myquery, url) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print(f"用时{time.time() - start_time}")

结果

123{"code":0,"msg":"success","obj":{"name":"小明","sex":"男","token":"psvmc"}}{"code":0,"msg":"success","obj":[{"name":"小明","sex":"男"},{"name":"小红","sex":"女"},{"name":"小刚","sex":"未知"}]}用时0.11207175254821777 单个请求添加回调123456789101112131415161718192021222324252627282930import asyncioimport threadingimport timefrom concurrent.futures import ThreadPoolExecutorimport requestsdef myquery(url): print(f"请求所在线程:{threading.current_thread().name}") r = requests.get(url) return r.textdef myfuture(future): print(f"回调所在线程:{threading.current_thread().name}") print(future.result())if __name__ == "__main__": loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(3) url = "https://www.psvmc.cn/userlist.json" tasks = [] start_time = time.time() task = loop.run_in_executor(executor, myquery, url) future = asyncio.ensure_future(task) future.add_done_callback(myfuture) loop.run_until_complete(future) print(f"用时{time.time() - start_time}") 多线程引用模块12345678from threading import Threaddef func(num): return numt = Thread(target=func, args=(100,))t.start()t.join() 数据通信12345import queueq = queue.Queue()q.put(1)item = q.get() 锁12345from threading import Locklock = Lock()with lock: pass 池化技术12345678from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor() as executor: # 方法1 results = executor.map(func, [1, 2, 3]) # 方法2 future = executor.submit(func, 1) result = future.result() 方法单个参数12345678910111213141516171819202122232425from concurrent.futures import ThreadPoolExecutorimport threadingimport time# 定义一个准备作为线程任务的函数def action(num): print(threading.current_thread().name) time.sleep(num) return num + 100if __name__ == "__main__": # 创建一个包含3条线程的线程池 with ThreadPoolExecutor(max_workers=3) as pool: future1 = pool.submit(action, 3) future1.result() print(f"单个任务返回:{future1.result()}") print('------------------------------') # 使用线程执行map计算 results = pool.map(action, (1, 3, 5)) for r in results: print(f"多个任务返回:{r}")

结果

123456789ThreadPoolExecutor-0_0单个任务返回:103------------------------------ThreadPoolExecutor-0_0ThreadPoolExecutor-0_1ThreadPoolExecutor-0_2多个任务返回:101多个任务返回:103多个任务返回:105 方法多个参数

单个请求

12345678910111213141516from concurrent.futures import ThreadPoolExecutor# 定义一个准备作为线程任务的函数def myfun(num1, num2): return num1 + num2if __name__ == "__main__": with ThreadPoolExecutor(max_workers=3) as executor: # 单个请求 future1 = executor.submit(myfun, *(1, 2)) # 或者 # future1 = executor.submit(lambda paras: myfun(*paras), (1, 2)) future1.result() print(f"单个任务返回:{future1.result()}")

批量请求

1234567891011121314from concurrent.futures import ThreadPoolExecutor# 定义一个准备作为线程任务的函数def myfun(num1, num2): return num1 + num2if __name__ == "__main__": with ThreadPoolExecutor(max_workers=3) as executor: # 使用线程执行map批量请求 results = executor.map(lambda paras: myfun(*paras), [(1, 2), (2, 3), (3, 4)]) for r in results: print(f"多个任务返回:{r}")

批量请求 全部返回后输出

123456789101112131415161718from concurrent.futures import ThreadPoolExecutor, waitdef myfun(num1, num2): return num1 + num2if __name__ == "__main__": with ThreadPoolExecutor(max_workers=3) as executor: # 批量请求 paras_list = [(1, 2), (2, 3), (3, 4)] future_list = [] for paras in paras_list: future = executor.submit(myfun, *paras) future_list.append(future) wait(future_list) for future in future_list: print(f"多个任务返回:{future.result()}")

批量请求 先返回先输出

1234567891011121314151617from concurrent.futures import ThreadPoolExecutor, as_completeddef myfun(num1, num2): return num1 + num2if __name__ == "__main__": with ThreadPoolExecutor(max_workers=3) as executor: # 批量请求 paras_list = [(1, 2), (2, 3), (3, 4)] future_list = [] for paras in paras_list: future = executor.submit(myfun, *paras) future_list.append(future) for future in as_completed(future_list): print(f"多个任务返回:{future.result()}") 多进程引用模块12345678from multiprocessing import Processdef func(num): return numt = Process(target=func, args=(100,))t.start()t.join() 数据通信1234import multiprocessingq = multiprocessing.Queue()q.put(1)item = q.get() 锁12345from multiprocessing import Locklock = Lock()with lock: pass 池化技术12345678from concurrent.futures import ProcessPoolExecutorwith ProcessPoolExecutor() as executor: # 方法1 results = executor.map(func, [1, 2, 3]) # 方法2 future = executor.submit(func, 1) result = future.result() 方法单个参数12345678910111213141516171819202122232425from concurrent.futures import ProcessPoolExecutorimport multiprocessingimport time# 定义一个准备作为进程任务的函数def action(num): print(multiprocessing.current_process().name) time.sleep(num) return num + 100if __name__ == "__main__": # 创建一个包含3条进程的进程池 with ProcessPoolExecutor(max_workers=3) as pool: future1 = pool.submit(action, 3) future1.result() print(f"单个任务返回:{future1.result()}") print('------------------------------') # 使用线程执行map计算 results = pool.map(action, [1, 3, 5]) for r in results: print(f"多个任务返回:{r}")

结果

123456789SpawnProcess-1单个任务返回:103------------------------------SpawnProcess-2SpawnProcess-3SpawnProcess-1多个任务返回:101多个任务返回:103多个任务返回:105 方法多个参数

单次请求

123456789101112131415from concurrent.futures import ProcessPoolExecutor, wait, as_completeddef myfun(num1, num2): return num1 + num2if __name__ == "__main__": with ProcessPoolExecutor(max_workers=3) as executor: # 单个请求 future1 = executor.submit(myfun, *(1, 2)) # 或者 # future1 = executor.submit(lambda paras: myfun(*paras), (1, 2)) future1.result() print(f"单个任务返回:{future1.result()}")

批量请求 全部返回后输出

123456789101112131415161718from concurrent.futures import ProcessPoolExecutor, waitdef myfun(num1, num2): return num1 + num2if __name__ == "__main__": with ProcessPoolExecutor(max_workers=3) as executor: # 批量请求 paras_list = [(1, 2), (2, 3), (3, 4)] future_list = [] for paras in paras_list: future = executor.submit(myfun, *paras) future_list.append(future) wait(future_list) for future in future_list: print(f"多个任务返回:{future.result()}")

批量请求 先返回先输出

1234567891011121314151617from concurrent.futures import ProcessPoolExecutor, as_completeddef myfun(num1, num2): return num1 + num2if __name__ == "__main__": with ProcessPoolExecutor(max_workers=3) as executor: # 批量请求 paras_list = [(1, 2), (2, 3), (3, 4)] future_list = [] for paras in paras_list: future = executor.submit(myfun, *paras) future_list.append(future) for future in as_completed(future_list): print(f"多个任务返回:{future.result()}") 多进程/多线程/协程对比

异步 IO(asyncio)、多进程(multiprocessing)、多线程(multithreading)

IO 密集型应用CPU等待IO时间远大于CPU 自身运行时间,太浪费;

常见的 IO 密集型业务包括:浏览器交互、磁盘请求、网络爬虫、数据库请求等

Python 世界对于 IO 密集型场景的并发提升有 3 种方法:多进程、多线程、多协程;

理论上讲asyncio是性能最高的,原因如下:

进程、线程会有CPU上下文切换 进程、线程需要内核态和用户态的交互,性能开销大;而协程对内核透明的,只在用户态运行 进程、线程并不可以无限创建,最佳实践一般是 CPU*2;而协程并发能力强,并发上限理论上取决于操作系统IO多路复用(Linux下是 epoll)可注册的文件描述符的极限

那asyncio的实际表现是否如理论上那么强,到底强多少呢?我构建了如下测试场景:

请求10此,并sleep 1s模拟业务查询

方法 1;顺序串行执行 方法 2:多进程 方法 3:多线程 方法 4:asyncio 方法 5:asyncio+uvloop

最后的asyncio+uvloop和官方asyncio 最大不同是用 Cython+libuv 重新实现了asyncio 的事件循环(event loop)部分,

官方测试性能是 node.js的 2 倍,持平 golang。

顺序串行执行12345678910111213141516171819import timedef query(num): print(num) time.sleep(1)def main(): for h in range(10): query(h)# main entranceif __name__ == '__main__': start_time = time.perf_counter() main() end_time = time.perf_counter() print(f"时间差:{end_time-start_time}") 多进程123456789101112131415161718192021from concurrent import futuresimport timedef query(num): print(num) time.sleep(1)def main(): with futures.ProcessPoolExecutor() as executor: for future in executor.map(query, range(10)): pass# main entranceif __name__ == '__main__': start_time = time.perf_counter() main() end_time = time.perf_counter() print(f"时间差:{end_time-start_time}") 多线程123456789101112131415161718192021from concurrent import futuresimport timedef query(num): print(num) time.sleep(1)def main(): with futures.ThreadPoolExecutor() as executor: for future in executor.map(query, range(10)): pass# main entranceif __name__ == '__main__': start_time = time.perf_counter() main() end_time = time.perf_counter() print(f"时间差:{end_time-start_time}") asyncio1234567891011121314151617181920import asyncioimport timeasync def query(num): print(num) await asyncio.sleep(1)async def main(): tasks = [asyncio.create_task(query(num)) for num in range(10)] await asyncio.gather(*tasks)# main entranceif __name__ == '__main__': start_time = time.perf_counter() asyncio.run(main()) end_time = time.perf_counter() print(f"时间差:{end_time-start_time}") asyncio+uvloop

注意

Windows上不支持uvloop。

示例

12345678910111213141516171819202122import asyncioimport uvloopimport timeasync def query(num): print(num) await asyncio.sleep(1)async def main(): tasks = [asyncio.create_task(query(host)) for host in range(10)] await asyncio.gather(*tasks)# main entranceif __name__ == '__main__': uvloop.install() start_time = time.perf_counter() asyncio.run(main()) end_time = time.perf_counter() print(f"时间差:{end_time-start_time}") 运行时间对比 方式 运行时间 串行 10.0750972s 多进程 1.1638731999999998s 多线程 1.0146456s asyncio 1.0110082s asyncio+uvloop 1.01s

可以看出: 无论多进程、多线程还是asyncio都能大幅提升IO 密集型场景下的并发,但asyncio+uvloop性能最高!



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有