python2.7怎么实现异步

Python052

python2.7怎么实现异步,第1张

改进之前

之前,我的查询步骤很简单,就是:

前端提交查询请求 -->建立数据库连接 -->新建游标 -->执行命令 -->接受结果 -->关闭游标、连接

这几大步骤的顺序执行。

这里面当然问题很大:

建立数据库连接实际上就是新建一个套接字。这是进程间通信的几种方法里,开销最大的了。

在“执行命令”和“接受结果”两个步骤中,线程在阻塞在数据库内部的运行过程中,数据库连接和游标都处于闲置状态。

这样一来,每一次查询都要顺序的新建数据库连接,都要阻塞在数据库返回结果的过程中。当前端提交大量查询请求时,查询效率肯定是很低的。

第一次改进

之前的模块里,问题最大的就是第一步——建立数据库连接套接字了。如果能够一次性建立连接,之后查询能够反复服用这个连接就好了。

所以,首先应该把数据库查询模块作为一个单独的守护进程去执行,而前端app作为主进程响应用户的点击操作。那么两条进程怎么传递消息呢?翻了几天Python文档,终于构思出来:用队列queue作为生产者(web前端)向消费者(数据库后端)传递任务的渠道。生产者,会与SQL命令一起,同时传递一个管道pipe的连接对象,作为任务完成后,回传结果的渠道。确保,任务的接收方与发送方保持一致。

作为第二个问题的解决方法,可以使用线程池来并发获取任务队列中的task,然后执行命令并回传结果。

第二次改进

第一次改进的效果还是很明显的,不用任何测试手段。直接点击页面链接,可以很直观地感觉到反应速度有很明显的加快。

但是对于第二个问题,使用线程池还是有些欠妥当。因为,CPython解释器存在GIL问题,所有线程实际上都在一个解释器进程里调度。线程稍微开多一点,解释器进程就会频繁的切换线程,而线程切换的开销也不小。线程多一点,甚至会出现“抖动”问题(也就是刚刚唤醒一个线程,就进入挂起状态,刚刚换到栈帧或内存的上下文,又被换回内存或者磁盘),效率大大降低。也就是说,线程池的并发量很有限。

试过了多进程、多线程,只能在单个线程里做文章了。

Python中的asyncio库

Python里有大量的协程库可以实现单线程内的并发操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio库同样可以实现协程并发。asyncio库大大降低了Python中协程的实现难度,就像定义普通函数那样就可以了,只是要在def前面多加一个async关键词。async def函数中,需要阻塞在其他async def函数的位置前面可以加上await关键词。

import asyncio

async def wait():

await asyncio.sleep(2)

async def execute(task):

process_task(task)

await wait()

continue_job()

async def函数的执行稍微麻烦点。需要首先获取一个loop对象,然后由这个对象代为执行async def函数。

loop = asyncio.get_event_loop()

loop.run_until_complete(execute(task))

loop.close()

loop在执行execute(task)函数时,如果遇到await关键字,就会暂时挂起当前协程,转而去执行其他阻塞在await关键词的协程,从而实现协程并发。

不过需要注意的是,run_until_complete()函数本身是一个阻塞函数。也就是说,当前线程会等候一个run_until_complete()函数执行完毕之后,才会继续执行下一部函数。所以下面这段代码并不能并发执行。

for task in task_list:

loop.run_until_complete(task)

对与这个问题,asyncio库也有相应的解决方案:gather函数。

loop = asyncio.get_event_loop()

tasks = [asyncio.ensure_future(execute(task))

for task in task_list]

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

当然了,async def函数的执行并不只有这两种解决方案,还有call_soon与run_forever的配合执行等等,更多内容还请参考官方文档。

Python下的I/O多路复用

协程,实际上,也存在上下文切换,只不过开销很轻微。而I/O多路复用则完全不存在这个问题。

目前,Linux上比较火的I/O多路复用API要算epoll了。Tornado,就是通过调用C语言封装的epoll库,成功解决了C10K问题(当然还有Pypy的功劳)。

在Linux里查文档,可以看到epoll只有三类函数,调用起来比较方便易懂。

创建epoll对象,并返回其对应的文件描述符(file descriptor)。

int epoll_create(int size)

int epoll_create1(int flags)

控制监听事件。第一个参数epfd就对应于前面命令创建的epoll对象的文件描述符;第二个参数表示该命令要执行的动作:监听事件的新增、修改或者删除;第三个参数,是要监听的文件对应的描述符;第四个,代表要监听的事件。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

等候。这是一个阻塞函数,调用者会等候内核通知所注册的事件被触发。

int epoll_wait(int epfd, struct epoll_event *events,

int maxevents, int timeout)

int epoll_pwait(int epfd, struct epoll_event *events,

int maxevents, int timeout,

const sigset_t *sigmask)

在Python的select库里:

select.epoll()对应于第一类创建函数;

epoll.register(),epoll.unregister(),epoll.modify()均是对控制函数epoll_ctl的封装;

epoll.poll()则是对等候函数epoll_wait的封装。

Python里epoll相关API的最大问题应该是在epoll.poll()。相比于其所封装的epoll_wait,用户无法手动指定要等候的事件,也就是后者的第二个参数struct epoll_event *events。没法实现精确控制。因此只能使用替代方案:select.select()函数。

根据Python官方文档,select.select(rlist, wlist, xlist[, timeout])是对Unix系统中select函数的直接调用,与C语言API的传参很接近。前三个参数都是列表,其中的元素都是要注册到内核的文件描述符。如果想用自定义类,就要确保实现了fileno()方法。

其分别对应于:

rlist: 等候直到可读

wlist: 等候直到可写

xlist: 等候直到异常。这个异常的定义,要查看系统文档。

select.select(),类似于epoll.poll(),先注册文件和事件,然后保持等候内核通知,是阻塞函数。

实际应用

Psycopg2库支持对异步和协程,但和一般情况下的用法略有区别。普通数据库连接支持不同线程中的不同游标并发查询;而异步连接则不支持不同游标的同时查询。所以异步连接的不同游标之间必须使用I/O复用方法来协调调度。

所以,我的大致实现思路是这样的:首先并发执行大量协程,从任务队列中提取任务,再向连接池请求连接,创建游标,然后执行命令,并返回结果。在获取游标和接受查询结果之前,均要阻塞等候内核通知连接可用。

其中,连接池返回连接时,会根据引用连接的协程数量,返回负载最轻的连接。这也是自己定义AsyncConnectionPool类的目的。

我的代码位于:bottle-blog/dbservice.py

存在问题

当然了,这个流程目前还一些问题。

首先就是每次轮询拿到任务之后,都会走这么一个流程。

获取连接 -->新建游标 -->执行任务 -->关闭游标 -->取消连接引用

本来,最好的情况应该是:在轮询之前,就建好游标;在轮询时,直接等候内核通知,执行相应任务。这样可以减少轮询时的任务量。但是如果协程提前对应好连接,那就不能保证在获取任务时,保持各连接负载均衡了。

所以这一块,还有工作要做。

还有就是epoll没能用上,有些遗憾。

以后打算写点C语言的内容,或者用Python/C API,或者用Ctypes包装共享库,来实现epoll的调用。

最后,请允许我吐槽一下Python的epoll相关文档:简直太弱了!!!必须看源码才能弄清楚功能。

异步:一种通讯方式,对设备需求简单。我们的PC机提供的标准通信接口都是异步的。

异步双方不需要共同的时钟,也就是接收方不知道发送方什么时候发送,所以在发送的信息中就要有提示接收方开始接收的信息,如开始位,同时在结束时有停止位。

异步的另外一种含义是计算机多线程的异步处理。与同步处理相对,异步处理不用阻塞当前线程来等待处理完成,而是允许后续操作,直至其它线程将处理完成,并回调通知此线程。

但此处需要明确的是:异步与多线程与并行不是同一个概念.

不需要

如果你厌倦了多线程,不妨试试python的异步编程,再引入async, await关键字之后语法变得更加简洁和直观,又经过几年的生态发展,现在是一个很不错的并发模型。

下面介绍一下python异步编程的方方面面。

因为GIL的存在,所以Python的多线程在CPU密集的任务下显得无力,但是对于IO密集的任务,多线程还是足以发挥多线程的优势的,而异步也是为了应对IO密集的任务,所以两者是一个可以相互替代的方案,因为设计的不同,理论上异步要比多线程快,因为异步的花销更少, 因为不需要额外系统申请额外的内存,而线程的创建跟系统有关,需要分配一定量的内存,一般是几兆,比如linux默认是8MB。

虽然异步很好,比如可以使用更少的内存,比如更好地控制并发(也许你并不这么认为:))。但是由于async/await 语法的存在导致与之前的语法有些割裂,所以需要适配,需要付出额外的努力,再者就是生态远远没有同步编程强大,比如很多库还不支持异步,所以你需要一些额外的适配。

为了不给其他网站带来困扰,这里首先在自己电脑启动web服务用于测试,代码很简单。

本文所有依赖如下:

所有依赖可通过代码仓库的requirements.txt一次性安装。

首先看一个错误的例子

输出如下:

发现花费了3秒,不符合预期呀。。。。这是因为虽然用了协程,但是每个协程是串行的运行,也就是说后一个等前一个完成之后才开始,那么这样的异步代码并没有并发,所以我们需要让这些协程并行起来

为了让代码变动的不是太多,所以这里用了一个笨办法来等待所有任务完成, 之所以在main函数中等待是为了不让ClientSession关闭, 如果你移除了main函数中的等待代码会发现报告异常 RuntimeError: Session is closed ,而代码里的解决方案非常的不优雅,需要手动的等待,为了解决这个问题,我们再次改进代码。

这里解决的方式是通过 asyncio.wait 方法等待一个协程列表,默认是等待所有协程结束后返回,会返回一个完成(done)列表,以及一个待办(pending)列表。

如果我们不想要协程对象而是结果,那么我们可以使用 asyncio.gather

结果输出如下:

通过 asyncio.ensure_future 我们就能创建一个协程,跟调用一个函数差别不大,为了等待所有任务完成之后退出,我们需要使用 asyncio.wait 等方法来等待,如果只想要协程输出的结果,我们可以使用 asyncio.gather 来获取结果。

虽然前面能够随心所欲的创建协程,但是就像多线程一样,我们也需要处理协程之间的同步问题,为了保持语法及使用情况的一致,多线程中用到的同步功能,asyncio中基本也能找到, 并且用法基本一致,不一致的地方主要是需要用异步的关键字,比如 async with/ await 等

通过锁让并发慢下来,让协程一个一个的运行。

输出如下:

通过观察很容易发现,并发的速度因为锁而慢下来了,因为每次只有一个协程能获得锁,所以并发变成了串行。

通过事件来通知特定的协程开始工作,假设有一个任务是根据http响应结果选择是否激活。

输出如下:

可以看到事件(Event)等待者都是在得到响应内容之后输出,并且事件(Event)可以是多个协程同时等待。

上面的事件虽然很棒,能够在不同的协程之间同步状态,并且也能够一次性同步所有的等待协程,但是还不够精细化,比如想通知指定数量的等待协程,这个时候Event就无能为力了,所以同步原语中出现了Condition。

输出如下:

可以看到,前面两个等待的协程是在同一时刻完成,而不是全部等待完成。

通过创建协程的数量来控制并发并不是非常优雅的方式,所以可以通过信号量的方式来控制并发。

输出如下:

可以发现,虽然同时创建了三个协程,但是同一时刻只有两个协程工作,而另外一个协程需要等待一个协程让出信号量才能运行。

无论是协程还是线程,任务之间的状态同步还是很重要的,所以有了应对各种同步机制的同步原语,因为要保证一个资源同一个时刻只能一个任务访问,所以引入了锁,又因为需要一个任务等待另一个任务,或者多个任务等待某个任务,因此引入了事件(Event),但是为了更精细的控制通知的程度,所以又引入了条件(Condition), 通过条件可以控制一次通知多少的任务。

有时候的并发需求是通过一个变量控制并发任务的并发数而不是通过创建协程的数量来控制并发,所以引入了信号量(Semaphore),这样就可以在创建的协程数远远大于并发数的情况下让协程在指定的并发量情况下并发。

不得不承认异步编程相比起同步编程的生态要小的很多,所以不可能完全异步编程,因此需要一种方式兼容。

多线程是为了兼容同步得代码。

多进程是为了利用CPU多核的能力。

输出如下:

可以看到总耗时1秒,说明所有的线程跟进程是同时运行的。

下面是本人使用过的一些异步库,仅供参考

web框架

http客户端

数据库

ORM

虽然异步库发展得还算不错,但是中肯的说并没有覆盖方方面面。

虽然我鼓励大家尝试异步编程,但是本文的最后却是让大家谨慎的选择开发环境,如果你觉得本文的并发,同步,兼容多线程,多进程不值得一提,那么我十分推荐你尝试以异步编程的方式开始一个新的项目,如果你对其中一些还有疑问或者你确定了要使用的依赖库并且大多数是没有异步库替代的,那么我还是建议你直接按照自己擅长的同步编程开始。

异步编程虽然很不错,不过,也许你并不需要。