优化Python爬虫速度的方法有哪些

Python017

优化Python爬虫速度的方法有哪些,第1张

很多爬虫工作者都遇到过抓取非常慢的问题,尤其是需要采集大量数据的情况下。那么如何提高爬虫采集效率就十分关键,那一块了解如何提高爬虫采集效率问题。

1.尽可能减少网站访问次数

单次爬虫的主要把时间消耗在网络请求等待响应上面,所以能减少网站访问就减少网站访问,既减少自身的工作量,也减轻网站的压力,还降低被封的风险。

第一步要做的就是流程优化,尽量精简流程,避免在多个页面重复获取。

随后去重,同样是十分重要的手段,一般根据url或者id进行唯一性判别,爬过的就不再继续爬了。

2.分布式爬虫

即便把各种法子都用尽了,单机单位时间内能爬的网页数仍是有限的,面对大量的网页页面队列,可计算的时间仍是很长,这种情况下就必须要用机器换时间了,这就是分布式爬虫。

第一步,分布式并不是爬虫的本质,也并不是必须的,对于互相独立、不存在通信的任务就可手动对任务分割,随后在多个机器上各自执行,减少每台机器的工作量,费时就会成倍减少。

例如有200W个网页页面待爬,可以用5台机器各自爬互不重复的40W个网页页面,相对来说单机费时就缩短了5倍。

可是如果存在着需要通信的状况,例如一个变动的待爬队列,每爬一次这个队列就会发生变化,即便分割任务也就有交叉重复,因为各个机器在程序运行时的待爬队列都不一样了——这种情况下只能用分布式,一个Master存储队列,其他多个Slave各自来取,这样共享一个队列,取的情况下互斥也不会重复爬取。IPIDEA提供高匿稳定的IP同时更注重用户隐私的保护,保障用户的信息安全。含有240+国家地区的ip,支持API批量使用,支持多线程高并发使用。

Python 在程序并行化方面多少有些声名狼藉。撇开技术上的问题,例如线程的实现和 GIL1,我觉得错误的教学指导才是主要问题。常见的经典 Python 多线程、多进程教程多显得偏“重”。而且往往隔靴搔痒,没有深入探讨日常工作中最有用的内容。

传统的例子

简单搜索下“Python 多线程教程”,不难发现几乎所有的教程都给出涉及类和队列的例子:

#Example.py

'''

Standard Producer/Consumer Threading Pattern

'''

import time

import threading

import Queue

class Consumer(threading.Thread):

def __init__(self, queue):

threading.Thread.__init__(self)

self._queue = queue

def run(self):

while True:

# queue.get() blocks the current thread until

# an item is retrieved.

msg = self._queue.get()

# Checks if the current message is

# the "Poison Pill"

if isinstance(msg, str) and msg == 'quit':

# if so, exists the loop

break

# "Processes" (or in our case, prints) the queue item

print "I'm a thread, and I received %s!!" % msg

# Always be friendly!

print 'Bye byes!'

def Producer():

# Queue is used to share items between

# the threads.

queue = Queue.Queue()

# Create an instance of the worker

worker = Consumer(queue)

# start calls the internal run() method to

# kick off the thread

worker.start()

# variable to keep track of when we started

start_time = time.time()

# While under 5 seconds..

while time.time() - start_time <5:

# "Produce" a piece of work and stick it in

# the queue for the Consumer to process

queue.put('something at %s' % time.time())

# Sleep a bit just to avoid an absurd number of messages

time.sleep(1)

# This the "poison pill" method of killing a thread.

queue.put('quit')

# wait for the thread to close down

worker.join()

if __name__ == '__main__':

Producer()

哈,看起来有些像 Java 不是吗?

我并不是说使用生产者/消费者模型处理多线程/多进程任务是错误的(事实上,这一模型自有其用武之地)。只是,处理日常脚本任务时我们可以使用更有效率的模型。

问题在于…

首先,你需要一个样板类;

其次,你需要一个队列来传递对象;

而且,你还需要在通道两端都构建相应的方法来协助其工作(如果需想要进行双向通信或是保存结果还需要再引入一个队列)。

worker 越多,问题越多

按照这一思路,你现在需要一个 worker 线程的线程池。下面是一篇 IBM 经典教程中的例子——在进行网页检索时通过多线程进行加速。

#Example2.py

'''

A more realistic thread pool example

'''

import time

import threading

import Queue

import urllib2

class Consumer(threading.Thread):

def __init__(self, queue):

threading.Thread.__init__(self)

self._queue = queue

def run(self):

while True:

content = self._queue.get()

if isinstance(content, str) and content == 'quit':

break

response = urllib2.urlopen(content)

print 'Bye byes!'

def Producer():

urls = [

'', ''

'', ''

# etc..

]

queue = Queue.Queue()

worker_threads = build_worker_pool(queue, 4)

start_time = time.time()

# Add the urls to process

for url in urls:

queue.put(url)

# Add the poison pillv

for worker in worker_threads:

queue.put('quit')

for worker in worker_threads:

worker.join()

print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):

workers = []

for _ in range(size):

worker = Consumer(queue)

worker.start()

workers.append(worker)

return workers

if __name__ == '__main__':

Producer()

这段代码能正确的运行,但仔细看看我们需要做些什么:构造不同的方法、追踪一系列的线程,还有为了解决恼人的死锁问题,我们需要进行一系列的 join 操作。这还只是开始……

至此我们回顾了经典的多线程教程,多少有些空洞不是吗?样板化而且易出错,这样事倍功半的风格显然不那么适合日常使用,好在我们还有更好的方法。

何不试试 map

map 这一小巧精致的函数是简捷实现 Python 程序并行化的关键。map 源于 Lisp 这类函数式编程语言。它可以通过一个序列实现两个函数之间的映射。

urls = ['', '']

results = map(urllib2.urlopen, urls)

上面的这两行代码将 urls 这一序列中的每个元素作为参数传递到 urlopen 方法中,并将所有结果保存到 results 这一列表中。其结果大致相当于:

results = []

for url in urls:

results.append(urllib2.urlopen(url))

map 函数一手包办了序列操作、参数传递和结果保存等一系列的操作。

为什么这很重要呢?这是因为借助正确的库,map 可以轻松实现并行化操作。

在 Python 中有个两个库包含了 map 函数: multiprocessing 和它鲜为人知的子库 multiprocessing.dummy.

这里多扯两句: multiprocessing.dummy? mltiprocessing 库的线程版克隆?这是虾米?即便在 multiprocessing 库的官方文档里关于这一子库也只有一句相关描述。而这句描述译成人话基本就是说:"嘛,有这么个东西,你知道就成."相信我,这个库被严重低估了!

dummy 是 multiprocessing 模块的完整克隆,唯一的不同在于 multiprocessing 作用于进程,而 dummy 模块作用于线程(因此也包括了 Python 所有常见的多线程限制)。

所以替换使用这两个库异常容易。你可以针对 IO 密集型任务和 CPU 密集型任务来选择不同的库。2

动手尝试

使用下面的两行代码来引用包含并行化 map 函数的库:

from multiprocessing import Pool

from multiprocessing.dummy import Pool as ThreadPool

实例化 Pool 对象:

pool = ThreadPool()

这条简单的语句替代了 example2.py 中 build_worker_pool 函数 7 行代码的工作。它生成了一系列的 worker 线程并完成初始化工作、将它们储存在变量中以方便访问。

Pool 对象有一些参数,这里我所需要关注的只是它的第一个参数:processes. 这一参数用于设定线程池中的线程数。其默认值为当前机器 CPU 的核数。

一般来说,执行 CPU 密集型任务时,调用越多的核速度就越快。但是当处理网络密集型任务时,事情有有些难以预计了,通过实验来确定线程池的大小才是明智的。

pool = ThreadPool(4) # Sets the pool size to 4

线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。

创建好 Pool 对象后,并行化的程序便呼之欲出了。我们来看看改写后的 example2.py

import urllib2

from multiprocessing.dummy import Pool as ThreadPool

urls = [

# etc..

]

# Make the Pool of workers

pool = ThreadPool(4)

# Open the urls in their own threads

# and return the results

results = pool.map(urllib2.urlopen, urls)

#close the pool and wait for the work to finish

pool.close()

pool.join()

实际起作用的代码只有 4 行,其中只有一行是关键的。map 函数轻而易举的取代了前文中超过 40 行的例子。为了更有趣一些,我统计了不同方法、不同线程池大小的耗时情况。

# results = []

# for url in urls:

# result = urllib2.urlopen(url)

# results.append(result)

# # ------- VERSUS ------- #

# # ------- 4 Pool ------- #

# pool = ThreadPool(4)

# results = pool.map(urllib2.urlopen, urls)

# # ------- 8 Pool ------- #

# pool = ThreadPool(8)

# results = pool.map(urllib2.urlopen, urls)

# # ------- 13 Pool ------- #

# pool = ThreadPool(13)

# results = pool.map(urllib2.urlopen, urls)

结果:

#Single thread: 14.4 Seconds

# 4 Pool: 3.1 Seconds

# 8 Pool: 1.4 Seconds

# 13 Pool: 1.3 Seconds

很棒的结果不是吗?这一结果也说明了为什么要通过实验来确定线程池的大小。在我的机器上当线程池大小大于 9 带来的收益就十分有限了。

另一个真实的例子

生成上千张图片的缩略图

这是一个 CPU 密集型的任务,并且十分适合进行并行化。

基础单进程版本

import os

import PIL

from multiprocessing import Pool

from PIL import Image

SIZE = (75,75)

SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):

return (os.path.join(folder, f)

for f in os.listdir(folder)

if 'jpeg' in f)

def create_thumbnail(filename):

im = Image.open(filename)

im.thumbnail(SIZE, Image.ANTIALIAS)

base, fname = os.path.split(filename)

save_path = os.path.join(base, SAVE_DIRECTORY, fname)

im.save(save_path)

if __name__ == '__main__':

folder = os.path.abspath(

'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')

os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

images = get_image_paths(folder)

for image in images:

create_thumbnail(Image)

上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。

这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。

如果我们使用 map 函数来代替 for 循环:

import os

import PIL

from multiprocessing import Pool

from PIL import Image

SIZE = (75,75)

SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):

return (os.path.join(folder, f)

for f in os.listdir(folder)

if 'jpeg' in f)

def create_thumbnail(filename):

im = Image.open(filename)

im.thumbnail(SIZE, Image.ANTIALIAS)

base, fname = os.path.split(filename)

save_path = os.path.join(base, SAVE_DIRECTORY, fname)

im.save(save_path)

if __name__ == '__main__':

folder = os.path.abspath(

'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')

os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

images = get_image_paths(folder)

pool = Pool()

pool.map(creat_thumbnail, images)

pool.close()

pool.join()

5.6 秒!

虽然只改动了几行代码,我们却明显提高了程序的执行速度。在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

到这里,我们就实现了(基本)通过一行 Python 实现并行化。

1)首先你要明白爬虫怎样工作。

想象你是一只蜘蛛,现在你被放到了互联“网”上。那么,你需要把所有的网页都看一遍。怎么办呢?没问题呀,你就随便从某个地方开始,比如说人民日报的首页,这个叫initial pages,用$表示吧。

在人民日报的首页,你看到那个页面引向的各种链接。于是你很开心地从爬到了“国内新闻”那个页面。太好了,这样你就已经爬完了俩页面(首页和国内新闻)!暂且不用管爬下来的页面怎么处理的,你就想象你把这个页面完完整整抄成了个html放到了你身上。

突然你发现, 在国内新闻这个页面上,有一个链接链回“首页”。作为一只聪明的蜘蛛,你肯定知道你不用爬回去的吧,因为你已经看过了啊。所以,你需要用你的脑子,存下你已经看过的页面地址。这样,每次看到一个可能需要爬的新链接,你就先查查你脑子里是不是已经去过这个页面地址。如果去过,那就别去了。

好的,理论上如果所有的页面可以从initial page达到的话,那么可以证明你一定可以爬完所有的网页。

那么在python里怎么实现呢?

很简单

import Queue

initial_page = "初始化页"

url_queue = Queue.Queue()

seen = set()

seen.insert(initial_page)

url_queue.put(initial_page)

while(True): #一直进行直到海枯石烂

if url_queue.size()>0:

current_url = url_queue.get()#拿出队例中第一个的url

store(current_url) #把这个url代表的网页存储好

for next_url in extract_urls(current_url): #提取把这个url里链向的url

if next_url not in seen:

seen.put(next_url)

url_queue.put(next_url)

else:

break

写得已经很伪代码了。

所有的爬虫的backbone都在这里,下面分析一下为什么爬虫事实上是个非常复杂的东西——搜索引擎公司通常有一整个团队来维护和开发。

2)效率

如果你直接加工一下上面的代码直接运行的话,你需要一整年才能爬下整个豆瓣的内容。更别说Google这样的搜索引擎需要爬下全网的内容了。

问题出在哪呢?需要爬的网页实在太多太多了,而上面的代码太慢太慢了。设想全网有N个网站,那么分析一下判重的复杂度就是N*log(N),因为所有网页要遍历一次,而每次判重用set的话需要log(N)的复杂度。OK,OK,我知道python的set实现是hash——不过这样还是太慢了,至少内存使用效率不高。

通常的判重做法是怎样呢?Bloom Filter. 简单讲它仍然是一种hash的方法,但是它的特点是,它可以使用固定的内存(不随url的数量而增长)以O(1)的效率判定url是否已经在set中。可惜天下没有白吃的午餐,它的唯一问题在于,如果这个url不在set中,BF可以100%确定这个url没有看过。但是如果这个url在set中,它会告诉你:这个url应该已经出现过,不过我有2%的不确定性。注意这里的不确定性在你分配的内存足够大的时候,可以变得很小很少。一个简单的教程:Bloom Filters by Example

注意到这个特点,url如果被看过,那么可能以小概率重复看一看(没关系,多看看不会累死)。但是如果没被看过,一定会被看一下(这个很重要,不然我们就要漏掉一些网页了!)。 [IMPORTANT: 此段有问题,请暂时略过]

好,现在已经接近处理判重最快的方法了。另外一个瓶颈——你只有一台机器。不管你的带宽有多大,只要你的机器下载网页的速度是瓶颈的话,那么你只有加快这个速度。用一台机子不够的话——用很多台吧!当然,我们假设每台机子都已经进了最大的效率——使用多线程(python的话,多进程吧)。

3)集群化抓取

爬取豆瓣的时候,我总共用了100多台机器昼夜不停地运行了一个月。想象如果只用一台机子你就得运行100个月了...

那么,假设你现在有100台机器可以用,怎么用python实现一个分布式的爬取算法呢?

我们把这100台中的99台运算能力较小的机器叫作slave,另外一台较大的机器叫作master,那么回顾上面代码中的url_queue,如果我们能把这个queue放到这台master机器上,所有的slave都可以通过网络跟master联通,每当一个slave完成下载一个网页,就向master请求一个新的网页来抓取。而每次slave新抓到一个网页,就把这个网页上所有的链接送到master的queue里去。同样,bloom filter也放到master上,但是现在master只发送确定没有被访问过的url给slave。Bloom Filter放到master的内存里,而被访问过的url放到运行在master上的Redis里,这样保证所有操作都是O(1)。(至少平摊是O(1),Redis的访问效率见:LINSERT – Redis)

考虑如何用python实现:

在各台slave上装好scrapy,那么各台机子就变成了一台有抓取能力的slave,在master上装好Redis和rq用作分布式队列。

代码于是写成

#slave.py

current_url = request_from_master()

to_send = []

for next_url in extract_urls(current_url):

to_send.append(next_url)

store(current_url)

send_to_master(to_send)

#master.py

distributed_queue = DistributedQueue()

bf = BloomFilter()

initial_pages = "www.renmingribao.com"

while(True):

if request == 'GET':

if distributed_queue.size()>0:

send(distributed_queue.get())

else:

break

elif request == 'POST':

bf.put(request.url)

好的,其实你能想到,有人已经给你写好了你需要的:darkrho/scrapy-redis · GitHub

4)展望及后处理

虽然上面用很多“简单”,但是真正要实现一个商业规模可用的爬虫并不是一件容易的事。上面的代码用来爬一个整体的网站几乎没有太大的问题。

但是如果附加上你需要这些后续处理,比如

有效地存储(数据库应该怎样安排)

有效地判重(这里指网页判重,咱可不想把人民日报和抄袭它的大民日报都爬一遍)

有效地信息抽取(比如怎么样抽取出网页上所有的地址抽取出来,“朝阳区奋进路中华道”),搜索引擎通常不需要存储所有的信息,比如图片我存来干嘛...

及时更新(预测这个网页多久会更新一次)

如你所想,这里每一个点都可以供很多研究者十数年的研究。虽然如此,

“路漫漫其修远兮,吾将上下而求索”。

所以,不要问怎么入门,直接上路就好了:)