高级篇(2):并发编程之协程asyncio

x33g5p2x  于2021-03-13 发布在 Java  
字(16.6k)|赞(0)|评价(0)|浏览(245)

协程(微线程,纤程)

基础概念

多线程和多进程的模型虽然解决了并发问题,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,所以,一旦线程数量过多,CPU的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。由于我们要解决的问题是CPU高速执行能力和IO设备的龟速严重不匹配,多线程和多进程只是解决这一问题的一种方法。

另一种解决IO问题的方法是异步IO。当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。

同步IO模型和异步IO模型

# 同步IO模型:
do_some_code()
f = open('/path/to/file', 'r')
r = f.read() # <== 线程停在此处等待IO操作结果
# IO操作完成后线程才能继续执行:
do_some_code(r)

# 异步io模型
loop = get_event_loop()
# 需要一个消息循环
while True:
    # 主线程不断地重复“读取消息-处理消息”这一过程
    event = loop.get_event()
    process_event(event)

在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。

协程是实现异步IO的高级形式,又称微线程,纤程。英文名Coroutine。

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。

子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

def A():
    print('1')
    print('2')
    print('3')

def B():
    print('x')
    print('y')
    print('z')

# 协程执行结果,可能如下
1
2
x
y
3
z

看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?

主要行效率比多线程高很多,主要表现一下两点:

  1. 不用切换线程,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
  2. 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了。

因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

Python对协程的支持是通过generator实现的。

在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。

import inspect

def consumer():
    r = ''
    while True:

        # 3. 通过yield拿到消息n(最开始send进来的n为none,不返回只做启动用)
        # yield关键字右边可以不需要加表达式(yield默认返回None)
        n = yield r
        if not n:
            return

        # 4. 拿到n之后进行处理
        print('[CONSUMER] Consuming %s...' % n)

        # 5. 处理完成,再下个循环又通过yield返回
        r = '200 OK'

def produce(c):
    # 1. 调用c.send(None)启动生成器;
    # GEN_CREATED: 等待开始执行
    print(inspect.getgeneratorstate(c))
    c.send(None)

    n = 0
    while n < 3:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        # 2. 一旦生产了东西,通过c.send(n)切换到consumer执行
        r = c.send(n)

        print(inspect.getgeneratorstate(c))
        # 6. 得到consumer处理的结果,再通过下一个循环,继续生产下一条消息
        print('[PRODUCER] Consumer return: %s' % r)

    # 在close前,状态 都是 GEN_SUSPENDED # 在yield表达式处暂停
    print(inspect.getgeneratorstate(c))

    # 7. produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
    c.close()

    # close后 状态为GEN_CLOSED # 执行结束
    print(inspect.getgeneratorstate(c))

c = consumer()
produce(c)

整个流程无锁,由一个线程执行,produceconsumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。

协程生成器的基本行为

协程有四个状态,可以使用inspect.getgeneratorstate(...)函数确定:

GEN_CREATED # 等待开始执行

GEN_RUNNING # 解释器正在执行(只有在多线程应用中才能看到这个状态)

GEN_SUSPENDED # 在yield表达式处暂停

GEN_CLOSED # 执行结束

生成器api

  1. .send(value)方法,生成器可以使用.send(...)方法发送数据,发送的数据会成为生成器函数中yield表达式的值。如上列中的n和r
  2. .throw(...)方法,让调用方抛出异常,在生成器中处理
  3. .close()方法,终止生成器

asyncio

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持,asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

@asyncio.coroutine

用asyncio提供的@asyncio.coroutine可以把一个generator标记为coroutine类型,然后在coroutine内部用yield from调用另一个coroutine实现异步操作。

import asyncio
import threading

# @asyncio.coroutine把一个generator标记为coroutine类型
@asyncio.coroutine
def baby(num):
    print('baby %s sleep! (%s)' % (num,threading.currentThread()))
    # 异步调用asyncio.sleep(2)生成器: 假设是一个耗时2秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。
    yield from asyncio.sleep(2)
    print('baby %s week up! (%s)' % (num,threading.currentThread()))

# 获取EventLoop: 事件循环对象
loop = asyncio.get_event_loop()

tasks = [baby(1), baby(2), baby(3)]
# 把上面coroutine扔到EventLoop中执行
loop.run_until_complete(asyncio.wait(tasks))

loop.close()

'''
baby 1 sleep! (<_MainThread(MainThread, started 29028)>)
baby 2 sleep! (<_MainThread(MainThread, started 29028)>)
baby 3 sleep! (<_MainThread(MainThread, started 29028)>)

# (暂停约2秒,并且是在同一线程里面,实现了并发)
baby 1 week up! (<_MainThread(MainThread, started 29028)>)
baby 2 week up! (<_MainThread(MainThread, started 29028)>)
baby 3 week up! (<_MainThread(MainThread, started 29028)>)

1. baby(1)执行到yield,线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环baby(2)
2. baby(2)执行到yield,线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环baby(3)
3. baby(3)执行到yield,线程不会等待asyncio.sleep(),而是直接中断并执行消息循环baby(1),至此所有操作都是以极快的时间完成的,花费了极少时间,此时三个baby同时都在睡眠,(asyncio.sleep)
4. 等待baby(1)睡眠完成,此时几乎同时其他baby也的睡眠也结束了,所以接着执行各个baby的打印wake up操作.结束整个消息循环,run_until_complete结束.
'''

用asyncio的异步网络连接来获取sina、sohu和163的网站首页

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    # 刷新底层传输的写缓冲区。也就是把需要发送出去的数据,从缓冲区发送出去。没有手工刷新,asyncio为你自动刷新了。当执行到reader.readline()时,asyncio知道应该把发送缓冲区的数据发送出去了。
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

async/await

async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

  1. 把@asyncio.coroutine替换为async;
  2. 把yield from替换为await。

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行

import asyncio
import threading

async def baby(num):
    print('baby %s sleep! (%s)' % (num,threading.currentThread()))
    await asyncio.sleep(1)
    print('baby %s week up! (%s)' % (num,threading.currentThread()))

loop = asyncio.get_event_loop()

# ???? 执行完的顺序让人疑惑
tasks = [baby(2), baby(1), baby(3),baby(4),baby(5)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

tips: await 和 yield from 可以理解为 “不等了” (主线程是一个事件循环,执行到await,就“我不等了,您慢慢执行,我先走一步,好了再给我说”)

绑定回调

import time
import asyncio

now = lambda : time.time()

async def do_some_work(x):
    print('Waiting: ', x)
    return 'Done after {}s'.format(x)

def callback(future):
    print('Callback: ', future.result())

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)

print('TIME: ', now() - start)

利用future对象回调别的函数

?? future对象的特性,以下代码不太懂 ??

import asyncio
import functools


def callback(future, n):
    print('{}: future done: {}'.format(n, future.result()))


async def register_callbacks(all_done):
    print('registering callbacks on future')
    # 偏函数配合回调,all_done是future对象
    all_done.add_done_callback(functools.partial(callback, n=1))
    all_done.add_done_callback(functools.partial(callback, n=2))


async def main(all_done):
    # 到此同步中断,异步执行回调函数注册
    await register_callbacks(all_done)
    print('setting result of future')
    all_done.set_result('the result')


event_loop = asyncio.get_event_loop()

try:
    all_done = asyncio.Future()
    event_loop.run_until_complete(main(all_done))
finally:
    event_loop.close()

'''
registering callbacks on future
setting result of future
1: future done: the result
2: future done: the result
'''

多线程与asyncio对比

多线程

# sinner_thread.py

import threading
import itertools
import time
import sys

# 这个类定义一个可变对象,用于从外部控制线程
class Signal:
    go = True


# 这个函数会在单独的线程中运行,signal 参数是前边定义的Signal类的实例
def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    # itertools.cycle 函数从指定的序列中反复不断地生成元素
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))  # 使用退格符把光标移回行首
        time.sleep(.1)  # 每 0.1 秒刷新一次
        if not signal.go:  # 如果 go属性不是 True,退出循环
            break

    write(' ' * len(status) + '\x08' * len(status))  # 使用空格清除状态消息,把光标移回开头


def slow_function():  # 模拟耗时操作
    # 假装等待I/O一段时间
    time.sleep(20)  # 调用sleep 会阻塞主线程,这么做事为了释放GIL,创建从属线程
    return 42


# 这个函数设置从属线程,显示线程对象,运行耗时计算,最后杀死进程
def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', signal))
    print('spinner object:', spinner)  # 显示线程对象 输出 spinner object: <Thread(Thread-1, initial)>
    spinner.start()  # 启动从属进程
    result = slow_function()  # 运行slow_function 行数,阻塞主线程。同时从属线程以动画形式旋转指针
    
    # python 并没有提供终止线程的API,所以若想关闭线程,必须给线程发送消息。这里我们使用signal.go 属性:在主线程中把它设置为False后,spinner 线程会接收到,然后退出
    signal.go = False
    spinner.join()  # 等待spinner 线程结束
    return result

def main():
    result = supervisor()
    print('Answer', result)


if __name__ == '__main__':
    main()

协程

# spinner_asyncio.py
# 通过协程以动画的形式显示文本式旋转指针

import asyncio
import itertools
import sys

async def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):  # itertools.cycle 函数从指定的序列中反复不断地生成元素
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))  # 使用退格符把光标移回行首
        try:
            # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 这样的休眠不会阻塞事件循环
            # 除非想阻塞主线程,从而冻结事件循环或整个应用,否则不要再 asyncio 协程中使用 time.sleep().如果协程需要在一段时间内什么都不做,应该使用 yield from asyncio.sleep(DELAY)
            # 此处相当于另一协程
            await asyncio.sleep(0.1)
        # 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求
        except asyncio.CancelledError as e:
            print(str(e))
            break

    write(' ' * len(status) + '\x08' * len(status))  # 使用空格清除状态消息,把光标移回开头


async def slow_function():  # 5 现在此函数是协程,使用休眠假装进行I/O 操作时,使用 yield from 继续执行事件循环
    # 假装等待I/O一段时间
    await asyncio.sleep(3)  # 此表达式把控制权交给主循环,在休眠结束后回复这个协程
    return 42

# ?? 不能改为asynic supervisor 否则asyncio.async会报错 ,已找到原因已被asyncio.ensure_future替代??
async def supervisor():
    spinner = asyncio.ensure_future(spin('thinking!'))  # asyncio.async() 函数排定协程的运行时间,使用一个 Task 对象包装spin 协程,并立即返回
    print('spinner object:', spinner)  # Task 对象,输出类似 spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
    # 驱动slow_function() 函数,结束后,获取返回值。同事事件循环继续运行,
    # 因为slow_function 函数最后使用yield from asyncio.sleep(3) 表达式把控制权交给主循环
    result = await slow_function()
    # Task 对象可以取消;取消后会在协程当前暂停的yield处抛出 asyncio.CancelledError 异常
    # 协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消
    spinner.cancel()

    return result

def main():
    loop = asyncio.get_event_loop()  # 获取事件循环引用
    # 驱动supervisor 协程,让它运行完毕;这个协程的返回值是这次调用的返回值
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer', result)


if __name__ == '__main__':
    main()

分析两段代码

  1. Task对象不由自己动手实例化,而是通过把协程传给 asyncio.async(...) 函数或 loop.create_task(...) 方法获取Task 对象已经排定了运行时间;而Thread 实例必须调用start方法,明确告知它运行

  2. 在线程版supervisor函数中,slow_function 是普通的函数,由线程直接调用,而异步版的slow_function 函数是协程,由yield from 驱动。

  3. 没有API能从外部终止线程,因为线程随时可能被中断。而协程如果想终止任务,可以使用Task.cancel() 实例方法,在协程内部抛出CancelledError 异常。协程可以在暂停的yield 处捕获这个异常,处理终止请求

  4. supervisor 协程必须在main 函数中由loop.run_until_complete 方法执行。

  5. 协程和线程相比关键的一个优点是线程必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,而协程默认会做好保护,我们必须显式产出(使用yield 或 yield from 交出控制权)才能让程序的余下部分运行。

asyncio.Future:故意不阻塞

asyncio.Future 类与 concurrent.futures.Future 类的接口基本一致,不过实现方式不同,不可互换。在 concurrent.futures.Future 中,future只是调度执行某物的结果。在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一个协程,排定它的运行时间,然后返回一个asyncio.Task 实例(也是asyncio.Future 类的实例,因为 Task 是 Future 的子类,用于包装协程。(在 concurrent.futures.Future 中,类似的操作是Executor.submit(...))。

与concurrent.futures.Future 类似,asyncio.Future 类也提供了:

  • .done() 返回布尔值,表示Future 是否已经执行

  • .add_done_callback() 这个方法只有一个参数,类型是可调用对象,Future运行结束后会回调这个对象。

  • .result() 这个方法没有参数,因此不能指定超时时间。 如果调用 .result() 方法时期还没有运行完毕,会抛出 asyncio.InvalidStateError 异常。

协程嵌套 (协程的常用方式)

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    '''
    # wait 有timeout参数
    dones, pendings = await asyncio.wait(tasks)

    for task in dones:
        print('Task ret: ', task.result())
    '''

    # 如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果。
    # 使用asyncio.wait(tasks)返回的顺序有点难以理解,但使用asyncio.gather(*tasks)返回值的顺序就好理解得多
    results = await asyncio.gather(*tasks)
    for result in results:
        print('Task ret: ', result)

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print('TIME: ', now() - start)

抛出返回值到run_until_complete:

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    return await asyncio.gather(*tasks)


start = now()

loop = asyncio.get_event_loop()

# 也可以直接返回到run_until_complete处理协程结果
results = loop.run_until_complete(main())
for result in results:
    print('Task ret: ', result)

print('TIME: ', now() - start)

*使用as_completed

async def main(num):
    tasks = []
    i = 1
    while i <= num :
        tasks.append(asyncio.ensure_future(do_some_work(i)))
        i += 1

    for task in asyncio.as_completed(tasks):
        result = await task 
        print('Task ret: {}'.format(result))

start = now()
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main(10))
print('TIME: ', now() - start)

协程停止

future对象有几个状态:Pending,Running,Done,Cancelled. 创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task

try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(5))
except KeyboardInterrupt as e:
    for task in asyncio.Task.all_tasks():
        # 启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。
        print(task)
        # 返回true或false,已执行的返回false
        print(task.cancel())
        
    #loop stop之后还需要再次开启事件循环,最后再close,不然还会抛出异常:
    # 抛出异常后要重新启动循环
    loop.stop()
    loop.run_forever()
    
finally:
    loop.close()

批量停止

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(main(5))
try:
    loop.run_until_complete(task)
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    print('-------------------')
    # 批量停止,如果全部停止成功就直接返回true,与上列不同
    print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

不同线程的事件循环

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。

import asyncio
from threading import Thread
import time

now = lambda: time.time()

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
# 这里的计时没有意义,因为more_work具体的执行是在新的两个线程里面
print('TIME: {}'.format(now() - start))

启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法,后者因为time.sleep操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3

新线程协程

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

print('TIME: {}'.format(time.time() - start))

上述的例子,主线程中创建一个new_loop,然后在另外的两个子线程中开启一个无限事件循环。主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block。一共执行的时间大概在6s左右。

master-worker主从模式

对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。

为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。

while True:
    task = rcon.rpop("queue")
    if not task:
        time.sleep(1)
        continue
    asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)

停止子线程

如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,会抛出KeyboardInterrupt错误,我们修改一下主循环:

try:
    while True:
        task = rcon.rpop("queue")
        if not task:
            time.sleep(1)
            continue
        asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:
    print(e)
    new_loop.stop()

可是实际上并不好使,虽然主线程try了KeyboardInterrupt异常,但是子线程并没有退出,为了解决这个问题,可以设置子线程为守护线程,这样当主线程结束的时候,子线程也随机退出。

new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True)    # 设置子线程为守护线程
t.start()
 
try:
    while True:
        # print('start rpop')
        task = rcon.rpop("queue")
        if not task:
            time.sleep(1)
            continue
        asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:
    print(e)
    new_loop.stop()

线程停止程序的时候,主线程退出后,子线程也随机退出才了,并且停止了子线程的协程任务。

try:
    while True:
        # 用brpop方法,会block住task,如果主线程有消息,才会消费。
        # 这种方式更适合队列消费,不用上面的要停顿一秒
        _, task = rcon.brpop("queue")
        asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:
    print('error', e)
    new_loop.stop()
finally:
    pass

协程消费

主线程用于监听队列,然后子线程的做事件循环的worker是一种方式。还有一种方式实现这种类似master-worker的方案。即把监听队列的无限循环挪进协程中。程序初始化就创建若干个协程,实现类似并行的效果。一般这个方案就可以了

import time
import asyncio
import redis

now = lambda : time.time()

# 最多开多少个协程
MAX_COROUTINES = 10

def get_redis():
    connection_pool = redis.ConnectionPool(host='127.0.0.1', db=1,port=6379)
    # connection_pool = redis.ConnectionPool(host='172.28.3.24', db=1,port=6379)
    return redis.Redis(connection_pool=connection_pool)

rcon = get_redis()

async def worker():
    print('Start worker')

    while True:
        start = now()
        task = rcon.rpop("queue")
        if not task:
            await asyncio.sleep(1)
            continue
        print('Wait ', int(task))
        await asyncio.sleep(int(task))
        print('Done ', task, now() - start)

def main():
    i = 0
    while i < MAX_COROUTINES:
        asyncio.ensure_future(worker())
        i += 1

    loop = asyncio.get_event_loop()
    try:
        loop.run_forever()
    except KeyboardInterrupt as e:
        print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

if __name__ == '__main__':
    main()

aiohttp

asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。

实现web服务器

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b'<h1>Index</h1>')

async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, %s!</h1>' % request.match_info['name']
    return web.Response(body=text.encode('utf-8'))

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/hello/{name}', hello)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

相关文章