python中多进程+协程的使用以及为什么要用它

Python018

python中多进程+协程的使用以及为什么要用它,第1张

前面讲了为什么python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。

不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。

举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?

例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):

1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的

2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约4.63天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)

3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。

还有更好的方法吗?答案是肯定的,它就是:

4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)

what:

协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。

why:

目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。

不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。

而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。

因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。

how:

python里面怎么使用协程?答案是使用gevent,使用方法:看这里

使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。

所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)

多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。

小例子:

[python] view plain copy

#-*- coding=utf-8 -*-

import requests

from multiprocessing import Process

import gevent

from gevent import monkey monkey.patch_all()

import sys

reload(sys)

sys.setdefaultencoding('utf8')

def fetch(url):

try:

s = requests.Session()

r = s.get(url,timeout=1)#在这里抓取页面

except Exception,e:

print e

return ''

def process_start(url_list):

tasks = []

for url in url_list:

tasks.append(gevent.spawn(fetch,url))

gevent.joinall(tasks)#使用协程来执行

def task_start(filepath,flag = 100000):#每10W条url启动一个进程

with open(filepath,'r') as reader:#从给定的文件中读取url

url = reader.readline().strip()

url_list = []#这个list用于存放协程任务

i = 0 #计数器,记录添加了多少个url到协程队列

while url!='':

i += 1

url_list.append(url)#每次读取出url,将url添加到队列

if i == flag:#一定数量的url就启动一个进程并执行

p = Process(target=process_start,args=(url_list,))

p.start()

url_list = [] #重置url队列

i = 0 #重置计数器

url = reader.readline().strip()

if url_list not []:#若退出循环后任务队列里还有url剩余

p = Process(target=process_start,args=(url_list,))#把剩余的url全都放到最后这个进程来执行

p.start()

if __name__ == '__main__':

task_start('./testData.txt')#读取指定文件

细心的同学会发现:上面的例子中隐藏了一个问题:进程的数量会随着url数量的增加而不断增加,我们在这里不使用进程池multiprocessing.Pool来控制进程数量的原因是multiprocessing.Pool和gevent有冲突不能同时使用,但是有兴趣的同学可以研究一下gevent.pool这个协程池。

asyncio 是 Python 中的异步IO库,用来编写并发协程,适用于IO阻塞且需要大量并发的场景,例如爬虫、文件读写。

asyncio 在 Python3.4 被引入,经过几个版本的迭代,特性、语法糖均有了不同程度的改进,这也使得不同版本的 Python 在 asyncio 的用法上各不相同,显得有些杂乱,以前使用的时候也是本着能用就行的原则,在写法上走了一些弯路,现在对 Python3.7+ 和 Python3.6 中 asyncio 的用法做一个梳理,以便以后能更好的使用。

协程,又称微线程,它不被操作系统内核所管理,而完全是由程序控制,协程切换花销小,因而有更高的性能。

协程可以比作子程序,不同的是,执行过程中协程可以挂起当前状态,转而执行其他协程,在适当的时候返回来接着执行,协程间的切换不需要涉及任何系统调用或任何阻塞调用,完全由协程调度器进行调度。

Python 中以 asyncio 为依赖,使用 async/await 语法进行协程的创建和使用,如下 async 语法创建一个协程函数:

在协程中除了普通函数的功能外最主要的作用就是:使用 await 语法等待另一个协程结束,这将挂起当前协程,直到另一个协程产生结果再继续执行:

asyncio.sleep() 是 asyncio 包内置的协程函数,这里模拟耗时的IO操作,上面这个协程执行到这一句会挂起当前协程而去执行其他协程,直到sleep结束,当有多个协程任务时,这种切换会让它们的IO操作并行处理。

注意,执行一个协程函数并不会真正的运行它,而是会返回一个协程对象,要使协程真正的运行,需要将它们加入到事件循环中运行,官方建议 asyncio 程序应当有一个主入口协程,用来管理所有其他的协程任务:

在 Python3.7+ 中,运行这个 asyncio 程序只需要一句: asyncio.run(main()) ,而在 Python3.6 中,需要手动获取事件循环并加入协程任务:

事件循环就是一个循环队列,对其中的协程进行调度执行,当把一个协程加入循环,这个协程创建的其他协程都会自动加入到当前事件循环中。

其实协程对象也不是直接运行,而是被封装成一个个待执行的 Task ,大多数情况下 asyncio 会帮我们进行封装,我们也可以提前自行封装 Task 来获得对协程更多的控制权,注意,封装 Task 需要 当前线程有正在运行的事件循环 ,否则将引 RuntimeError,这也就是官方建议使用主入口协程的原因,如果在主入口协程之外创建任务就需要先手动获取事件循环然后使用底层方法 loop.create_task() ,而在主入口协程之内是一定有正在运行的循环的。任务创建后便有了状态,可以查看运行情况,查看结果,取消任务等:

asyncio.create_task() 是 Python3.7 加入的高层级API,在 Python3.6,需要使用低层级API asyncio.ensure_future() 来创建 Future,Future 也是一个管理协程运行状态的对象,与 Task 没有本质上的区别。

通常,一个含有一系列并发协程的程序写法如下(Python3.7+):

并发运行多个协程任务的关键就是 asyncio.gather(*tasks) ,它接受多个协程任务并将它们加入到事件循环,所有任务都运行完成后会返回结果列表,这里我们也没有手动封装 Task,因为 gather 函数会自动封装。

并发运行还有另一个方法 asyncio.wait(tasks) ,它们的区别是: