Reading:《Python_Concurrency_with_asyncio–Matthew_Fowler》

Reading:《Python_Concurrency_with_asyncio–Matthew_Fowler》

Ch1

  • Python asyncio是一个使用单线程事件循环模型的异步库,不止可以用于IO密集任务,也可以用于CPU密集型任务
  • IO密集与计算密集任务实例:
    image
  • 理解并发、并行和多任务:
    • python多进程在windows下为什么要放到main​里?因 为windows实现似乎会把整个代码重新运行一遍,然后在多进程代码处创建指定的进程,然后这样就会无限运行新进程,可以参考以下代码:
      import multiprocessing
      import os
      
      
      def print_main():
          print(f'the name of pid[{os.getpid()}] is {__name__}')
      
      
      if __name__ == '__main__':
          p1 = multiprocessing.Process(target=print_main)
          p2 = multiprocessing.Process(target=print_main)
          print_main()
          p1.start()
          p2.start()
          p1.join()
          p2.join()
      # windows输出:
      # the name of pid[26660] is __main__
      # the name of pid[21528] is __mp_main__ 多进程的代码__name__不同
      # the name of pid[28788] is __mp_main__  这样就无法进入多进程入口的模块,只能获取前面的变量
      

      WSL输出:

      image
      可以看到这个是实现了真正的多进程,实际上Linux下如果用fork判断一下返回是不是0就行了,windows嘛不知道winapi什么实现方式。
    • 全局解释器锁(GIL)
      • 只有Cpython有GIL,这是因为Cpython使用引用计数来回收垃圾,而C是没有线程安全的语言,所以会导致引用计数竞态以及出错,故引入GIL防止真正的CPU多线程出现
      • 但是为什么仍然要设计多线程模块?这是因为GIL只会锁CPU,这也是它设计的目的。线程IO操作(使用系统调用,事实上是python和操作系统之间的并发)的时候会释放GIL,这样我们在IO操作上是可以实现多线程并发的!
  • 非阻塞Socket
    • Socket模型(计网)通常是阻塞的,但是通过操作系统的消息系统可以实现非阻塞的socket:
      • kqueue—FreeBSD and MacO
      • epoll—Linux
      • IOCP (I/O completion port)—Windows
    • 以上三种和select​ / poll​ 模式的轮询有区别并且更加高效,详见译:6. Epoll, Kqueue与IOCP – 知乎 (zhihu.com),它们采用回调机制和边缘触发。
    • 下图描述了使用回调的机制:
      image
    • 事件循环EventLoop

Ch2 Asyncio基础

1.协程 coroutine 和await/async

  • 运行模型:
    • 例:
      image
      image
      以上模型看起来还是顺序执行,怎么回事?这是因为我们await第一个add_one的时候main被暂停了,然后将add_one加入事件循环,并设置回调完成时将main唤醒,此时事件循环中只有一个处于唤醒状态的协程,于是开始执行add_one,并且执行完时将返回权交回main,就是说这个时候第二个add_one并没有加入事件循环,而是因为main是循序执行的,前述过程完成后才被加入,所以没法并行。
    • Task的引入
      • TasK是对协程的一个包装。它会将协程加入事件循环,并且保证尽早运行它,这个过程是非阻塞的,意味着实际上协程并没有被运行。
      • Task也要被await,并且当且仅当此时才能获得运行,那不是和协程一样?不,区别是它已经在事件队列中了!如果此时一个正在Task中使用了await,那么它就有权力去争夺运行权!这样就可以“并行”
      • 例:
        image
        image
        如上代码按照await顺序先运行hello_every_seconds直到第一个sleep(1),然后寻找事件循环的下一个任务,按照队列的FIFO,应该是first_dalay,运行它直到碰到await sleep(3),然后再到事件循环寻找,这次是second_delay,同理await后交还运行权,这个时候事件循环中没有唤醒的事件就一直等待,等待大概1秒(忽略前面所有执行耗时)hello_every_second的sleep(1)到期(模拟IO传输回调),唤醒hello_every_second,然后执行到下一个await,此时又没有激活的任务,事件循环继续等待,然后直到sleep(1)又好了,唤醒hello_every_second执行。随后再等待1秒,两个delay都被唤醒,执行完毕。main后面的那几个await等的是完成的协程,直接返回结果。
  • Task的取消
    • 调用task.cancel方法取消任务,如果此时await这个任务会导致CancelledError
    • 超时与保护:
      • asyncio.wait_for 超时将控制权交还await它的函数,取消协程并在这个取得控制权的函数中引发TimeoutError
      • asyncio.shield 保护协程不会被取消,如果和wait_for一起用交还控制权任然会引发TimeoutError但是可以继续await原来的任务

2.Awaitable 可等待类型

1)future

A future is a Python object that contains a single value that you expect to get at some point in the future but may not yet have.

future 是一个 Python 对象,其中包含您期望在未来某个时刻获得但可能尚未获得的单个值。

Future相当于js中的Promise,代表一个将要完成并获得值的对象,使用future.set_value来设置值并报告成功,使用set_exception来报告错误,就像Promise的resolve和reject一样,不过这些需要在Future外的逻辑中完成而不是Promise直接在构造函数中定义。

(确实很像,返回Future的函数表现得和协程一样可以直接在await后面,js返回Promise的函数与async函数表现一致)

下面的例子模拟了网络请求的实现:

image

🎈我们真正需要等待的地方其实都是通过async.sleep这种回调、激活的模式执行的,事件循环从来不阻塞,程序阻塞只是现在事件循环没有可执行的task了,在一直空循环直到有Task被唤醒

三种awaitable的关系:

image

3.Blocking I/O 阻塞io

所有非异步的IO和CPU密集型的阻塞都会阻塞事件循环,在异步函数中使用这样的代码会导致整个事件循环阻塞,整个协程模型都在等待,所以异步中要尽量避免这些操作。

4.EventLoop事件循环

  • new_event_loop
  • get_running_loop
  • loop.call_soon
  • loop.run_until_complete

5.Debug 调试模式

可以在事件循环、asyncio.run以及命令行参数中开启调试模式,这将会报告时间消耗过长的任务,帮助找到阻塞事件循环的代码

Ch3 Build a Server

1.构建一个服务器

import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #这里用TCP
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

address = ('0.0.0.0', 8055)
server_socket.bind(address)
server_socket.listen()
server_socket.setblocking(False)#非阻塞IO,意味着建立连接不会阻塞

connections = []
try:
    while True: #循环轮询
        try:
            connection, client_address = server_socket.accept()#不阻塞,没有连接只会raise BlockingIOError
            connection.setblocking(False) #设置接受消息也是非阻塞的
            print(f'I got a connection from {client_address}!')
            connections.append(connection)
        except BlockingIOError:
            pass
        for connection in connections:
            try:
                buffer = b''
                while buffer[-2:] != b'\r\n':
                    data = connection.recv(2)#不阻塞,没有消息只会raise BlockingIOError
                    if not data:
                        break
                    else:
                        print(f'I got data: {data}!')
                    buffer = buffer + data
                print(f"All the data is: {buffer}")
                connection.send(buffer)
            except BlockingIOError:
                pass
finally:
    server_socket.close()

以上代码会导致CPU负载加重,这是因为不断地while轮询连接和消息,需要占用大量CPU

2.使用python的selectors调用OS的消息系统

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇