β

Python网络爬虫4 – 多线程抓取

致一 306 阅读

之前的内容已经大致实现了如何获取网页、分析网页、获取目标内容。接下来的目标是如何让网页抓取进行得更效率些。在进行抓取的时候,时间的消耗主要是在请求等待的时间上,所以一个最容易想到的优化方式就是使用多线程。

多线程

多线程的实现还是比较简单的,下面是一个简单的线程实现方案:

#!python
# encoding: utf-8
from threading import Thread


def func():
    print("this is multi thread")


def start():
    t = Thread(target=func)
    t.start()


if __name__ == '__main__':
    start()

就是这么简单。

线程池

在抓取网页的时候,一个简单的思路就是为每个网页启动一个线程。在要抓取的网页比较少的时候——比如百十来个——这样子还是可行的。但是网页比较多的时候,这样做就不太合理了。因为线程的创建启动和运行都会消耗很多的资源,线程启动太多会耗尽资源导致机器卡死。而且,创建线程后只执行一次也是一种浪费。为了减少线程的创建、实现线程的重复利用,我们需要引入线程池。

可以使用python的ThreadPoolExecutor来创建线程池:

#!python
# encoding: utf-8
from concurrent.futures import ThreadPoolExecutor


def func():
    print("this is multi thread")


def start_pool():
    pool = ThreadPoolExecutor(64)
    for i in range(10):
        pool.submit(func)


if __name__ == '__main__':
    start_pool()

也很简单,甚至不比单线程多一行代码。在代码里创建了一个总数为64的线程池,然后在一个循环中每次取出一条线程来执行func函数,没有空闲线程时则会进入等待。

按照这样的思路,我们也可以使用Queue来自己创建线程池:

class CustomThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.__queue = queue

    def run(self):
        while True:
            cmd = self.__queue.get()
            cmd()
            self.__queue.task_done()


def custom_pool():
    queue = Queue(5)
    for i in range(queue.maxsize):
        t = CustomThread(queue)
        t.setDaemon(True)
        t.start()

    for i in range(20):
        queue.put(func)
    queue.join()

在上面的代码里创建了一个长度为5的队列,然后在队列里入了几个线程,并立刻启动。每个线程随时待命,一旦队列里面有了要执行的任务就会立即执行,并在执行完成后发送通知给队列。queue.join()方法则会在队列中的所有任务执行完成前阻塞住线程,待所有任务执行完成后再继续执行后面的代码。

并行

前面所述的方案是并发处理的方案。并发处理的方案可以充分利用CPU。不过现在的CPU通常都是多核的,为了利用多核CPU的特点,可以考虑使用并行处理的方案。下面是一个进程的演示:

#!python
# encoding: utf-8
from multiprocessing import Process


def process():
    p = Process(target=func)
    p.start()

当然也有对应的进程池了:

#!python
# encoding: utf-8
from multiprocessing.pool import Pool

from multiprocessing import Process


def func():
    print("this is multi thread")


def process_pool():
    pool = Pool(processes=3)
    for i in range(6):
        pool.apply_async(func)
    pool.close()
    pool.join()

在我使用爬虫的经历中极少会用到多进程或进程池。不过,既然说起来了就得说个全套,所以才耐着性子把这块儿写完。

很多人可能就是因为要学习爬虫才开始看Python的。不过看了python却只知道爬虫未免有些可惜,我还是希望能够多接触到一些较深入的东西的。多线程和多进程是学习Python的经历中无论如何也不应该绕过的部分,如果不想浅尝而止的话,还是建议多看看。

完整的抓取程序在下面。程序写于半年前,有很多不成熟的地方,也没心思修改了,凑合看吧。因为代码太长所以折叠了起来:

#!python
# encoding: utf8
# zhyea.com
from urllib.request import Request
from urllib import parse
import urllib.request
import threading
from bs4 import BeautifulSoup
from queue import Queue


class Client:
    def __init__(self):
        self.__TIMEOUT = 300
        self.__CHARSET = "utf8"
        self.__HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:43.0) Gecko/20100101 Firefox/43.0"}

    def get(self, url):
        request = Request(url, headers=self.__HEADERS)
        response = urllib.request.urlopen(request, timeout=self.__TIMEOUT)
        content = response.read().decode(self.__CHARSET)
        response.close()
        return content

    def post(self, url, **paras):
        data = parse.urlencode(paras).encode(self.__CHARSET)
        request = Request(url, data, headers=self.__HEADERS)
        response = urllib.request.urlopen(request, timeout=self.__TIMEOUT)
        content = response.read().decode(self.__CHARSET)
        response.close()
        return content


class GrabThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.__queue = queue

    def run(self):
        while True:
            cmd, arg = self.__queue.get()
            cmd(arg)
            self.__queue.task_done()


class Grabber:
    def __init__(self):
        self.__URL_SEARCH = "http://www.xxxxxxx.net/search/"
        self.__SAVE_PATH = "D://xxxx.log"
        self.__OUTPUT = open(self.__SAVE_PATH, 'a', encoding='utf-8')
        self.__queue = Queue()

    def __write(self, title, magnent):
        self.__OUTPUT.write(
            "".join(["\n", title, "\n", magnent, "\n", "---------------------------------------------"]))

    def __grab_anchor(self, url):
        content = Client().get(url);
        magnet_anchors = BeautifulSoup(content, "html.parser").select('a[href^="magnet:?xt"]')
        for i in range(len(magnet_anchors)):
            self.__write(magnet_anchors[i].attrs['title'], magnet_anchors[i].attrs['href'])

    def __grab(self, key):
        content = Client().post(self.__URL_SEARCH, q=parse.quote(key));
        page_anchors = BeautifulSoup(content, "html.parser").select(".pagination > a")
        if len(page_anchors) == 0:
            self.__queue.put((self.__grabAnchor, "".join([self.__URL_SEARCH, "/", key])))
        else:
            page_total = int(page_anchors[-2].get_text())
            for i in range(page_total):
                self.__queue.put((self.__grabAnchor, "".join([self.__URL_SEARCH, "/", key, "/", "%d" % i])))
        self.__queue.join()

    def start(self):
        for i in range(32):
            t = GrabThread(self.__queue)
            t.daemon = True
            t.start()

        keys = ["key1", "key2"]
        for i in range(len(keys)):
            self.__grab(keys[i])
        self.__OUTPUT.close()


if __name__ == "__main__":
    Grabber().start()

参考文档

####################

作者:致一
预则立,不预则废
原文地址:Python网络爬虫4 – 多线程抓取, 感谢原作者分享。