Reading:《Python_Concurrency_with_asyncio–Matthew_Fowler》
Ch1
- Python asyncio是一个使用单线程事件循环模型的异步库,不止可以用于IO密集任务,也可以用于CPU密集型任务
- IO密集与计算密集任务实例:
- 理解并发、并行和多任务:
- python多进程在windows下为什么要放到
main
里?因 为windows实现似乎会把整个代码重新运行一遍,然后在多进程代码处创建指定的进程,然后这样就会无限运行新进程,可以参考以下代码:import multiprocessing import os def print_main(): print(f'the name of pid[{os.getpid()}] is {__name__}') if __name__ == '__main__': p1 = multiprocessing.Process(target=print_main) p2 = multiprocessing.Process(target=print_main) print_main() p1.start() p2.start() p1.join() p2.join() # windows输出: # the name of pid[26660] is __main__ # the name of pid[21528] is __mp_main__ 多进程的代码__name__不同 # the name of pid[28788] is __mp_main__ 这样就无法进入多进程入口的模块,只能获取前面的变量
WSL输出:
可以看到这个是实现了真正的多进程,实际上Linux下如果用fork判断一下返回是不是0就行了,windows嘛不知道winapi什么实现方式。 - 全局解释器锁(GIL)
- 只有Cpython有GIL,这是因为Cpython使用引用计数来回收垃圾,而C是没有线程安全的语言,所以会导致引用计数竞态以及出错,故引入GIL防止真正的CPU多线程出现
- 但是为什么仍然要设计多线程模块?这是因为GIL只会锁CPU,这也是它设计的目的。线程IO操作(使用系统调用,事实上是python和操作系统之间的并发)的时候会释放GIL,这样我们在IO操作上是可以实现多线程并发的!
- python多进程在windows下为什么要放到
- 非阻塞Socket
- Socket模型(计网)通常是阻塞的,但是通过操作系统的消息系统可以实现非阻塞的socket:
- kqueue—FreeBSD and MacO
- epoll—Linux
- IOCP (I/O completion port)—Windows
- 以上三种和
select
/poll
模式的轮询有区别并且更加高效,详见译:6. Epoll, Kqueue与IOCP – 知乎 (zhihu.com),它们采用回调机制和边缘触发。 - 下图描述了使用回调的机制:
- 事件循环EventLoop
- Python使用什么管理这些回调代码和普通代码之间的执行顺序?轮询式的事件循环,实际上这个最典型的是nodejs(Node.js 事件循环,定时器和 process.nextTick() | Node.js (nodejs.org))
- 下图说明了事件循环的原理:
- 举例:以下代码异步执行:
- Socket模型(计网)通常是阻塞的,但是通过操作系统的消息系统可以实现非阻塞的socket:
-
Ch2 Asyncio基础
1.协程 coroutine 和await/async
- 运行模型:
- 例:
以上模型看起来还是顺序执行,怎么回事?这是因为我们await第一个add_one的时候main被暂停了,然后将add_one加入事件循环,并设置回调完成时将main唤醒,此时事件循环中只有一个处于唤醒状态的协程,于是开始执行add_one,并且执行完时将返回权交回main,就是说这个时候第二个add_one并没有加入事件循环,而是因为main是循序执行的,前述过程完成后才被加入,所以没法并行。 - Task的引入
- TasK是对协程的一个包装。它会将协程加入事件循环,并且保证尽早运行它,这个过程是非阻塞的,意味着实际上协程并没有被运行。
- Task也要被await,并且当且仅当此时才能获得运行,那不是和协程一样?不,区别是它已经在事件队列中了!如果此时一个正在Task中使用了await,那么它就有权力去争夺运行权!这样就可以“并行”
- 例:
如上代码按照await顺序先运行hello_every_seconds直到第一个sleep(1),然后寻找事件循环的下一个任务,按照队列的FIFO,应该是first_dalay,运行它直到碰到await sleep(3),然后再到事件循环寻找,这次是second_delay,同理await后交还运行权,这个时候事件循环中没有唤醒的事件就一直等待,等待大概1秒(忽略前面所有执行耗时)hello_every_second的sleep(1)到期(模拟IO传输回调),唤醒hello_every_second,然后执行到下一个await,此时又没有激活的任务,事件循环继续等待,然后直到sleep(1)又好了,唤醒hello_every_second执行。随后再等待1秒,两个delay都被唤醒,执行完毕。main后面的那几个await等的是完成的协程,直接返回结果。
- 例:
- Task的取消
- 调用task.cancel方法取消任务,如果此时await这个任务会导致CancelledError
- 超时与保护:
- asyncio.wait_for 超时将控制权交还await它的函数,取消协程并在这个取得控制权的函数中引发TimeoutError
- asyncio.shield 保护协程不会被取消,如果和wait_for一起用交还控制权任然会引发TimeoutError但是可以继续await原来的任务
2.Awaitable 可等待类型
1)future
A future is a Python object that contains a single value that you expect to get at some point in the future but may not yet have.
future 是一个 Python 对象,其中包含您期望在未来某个时刻获得但可能尚未获得的单个值。
Future相当于js中的Promise,代表一个将要完成并获得值的对象,使用future.set_value来设置值并报告成功,使用set_exception来报告错误,就像Promise的resolve和reject一样,不过这些需要在Future外的逻辑中完成而不是Promise直接在构造函数中定义。
(确实很像,返回Future的函数表现得和协程一样可以直接在await后面,js返回Promise的函数与async函数表现一致)
下面的例子模拟了网络请求的实现:
🎈我们真正需要等待的地方其实都是通过async.sleep这种回调、激活的模式执行的,事件循环从来不阻塞,程序阻塞只是现在事件循环没有可执行的task了,在一直空循环直到有Task被唤醒
三种awaitable的关系:
3.Blocking I/O 阻塞io
所有非异步的IO和CPU密集型的阻塞都会阻塞事件循环,在异步函数中使用这样的代码会导致整个事件循环阻塞,整个协程模型都在等待,所以异步中要尽量避免这些操作。
4.EventLoop事件循环
- new_event_loop
- get_running_loop
- loop.call_soon
- loop.run_until_complete
5.Debug 调试模式
可以在事件循环、asyncio.run以及命令行参数中开启调试模式,这将会报告时间消耗过长的任务,帮助找到阻塞事件循环的代码
Ch3 Build a Server
1.构建一个服务器
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #这里用TCP
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
address = ('0.0.0.0', 8055)
server_socket.bind(address)
server_socket.listen()
server_socket.setblocking(False)#非阻塞IO,意味着建立连接不会阻塞
connections = []
try:
while True: #循环轮询
try:
connection, client_address = server_socket.accept()#不阻塞,没有连接只会raise BlockingIOError
connection.setblocking(False) #设置接受消息也是非阻塞的
print(f'I got a connection from {client_address}!')
connections.append(connection)
except BlockingIOError:
pass
for connection in connections:
try:
buffer = b''
while buffer[-2:] != b'\r\n':
data = connection.recv(2)#不阻塞,没有消息只会raise BlockingIOError
if not data:
break
else:
print(f'I got data: {data}!')
buffer = buffer + data
print(f"All the data is: {buffer}")
connection.send(buffer)
except BlockingIOError:
pass
finally:
server_socket.close()
以上代码会导致CPU负载加重,这是因为不断地while轮询连接和消息,需要占用大量CPU
2.使用python的selectors调用OS的消息系统