利用python的dask搭建分布式集群

Python012

利用python的dask搭建分布式集群,第1张

dask官网地址: https://dask.org/

优势:dask内部自动实现了分布式调度、无需用户自行编写复杂的调度逻辑和程序;通过调用简单的方法就可以进行分布式计算、并支持部分模型的并行化处理;内部实现的分布式算法:xgboost、LR、sklearn的部分方法等

用一句话说:dask就是python版本的spark,是一个用Python 语言实现的分布式计算框架

建议使用:Anaconda3工具包

系统:windows、linux

关于分布式版本安装的注意事项(针对macos)请参考官网:

https://distributed.dask.org/en/latest/install.html

本人实验环境:一台windows机器+3台虚拟化linux服务器,并4台机器均已按照上面步骤安装配置dask

选择Windows机器作为主节点,启动命令:

$ dask-scheduler

控制台显示信息如下:

在其他每台linux机器命令行输入:

$ dask-worker 192.168.1.42:8786

注意:后面跟的ip和端口是主节点的ip和对应服务的端口

通过官网提供的测试例子可以看出dask的确体现了分布式的优势。

如果您觉得有帮助的话,可以扫码,赞赏鼓励一下!谢谢!

一、分布式爬虫架构

在了解分布式爬虫架构之前,首先回顾一下Scrapy的架构,如下图所示。

Scrapy单机爬虫中有一个本地爬取队列Queue,这个队列是利用deque模块实现的。如果新的Request生成就会放到队列里面,随后Request被Scheduler调度。之后,Request交给Downloader执行爬取,简单的调度架构如下图所示。

如果两个Scheduler同时从队列里面取Request,每个Scheduler都有其对应的Downloader,那么在带宽足够、正常爬取且不考虑队列存取压力的情况下,爬取效率会有什么变化?没错,爬取效率会翻倍。

这样,Scheduler可以扩展多个,Downloader也可以扩展多个。而爬取队列Queue必须始终为一个,也就是所谓的共享爬取队列。这样才能保证Scheduer从队列里调度某个Request之后,其他Scheduler不会重复调度此Request,就可以做到多个Schduler同步爬取。这就是分布式爬虫的基本雏形,简单调度架构如下图所示。

我们需要做的就是在多台主机上同时运行爬虫任务协同爬取,而协同爬取的前提就是共享爬取队列。这样各台主机就不需要各自维护爬取队列,而是从共享爬取队列存取Request。但是各台主机还是有各自的Scheduler和Downloader,所以调度和下载功能分别完成。如果不考虑队列存取性能消耗,爬取效率还是会成倍提高。

二、维护爬取队列

那么这个队列用什么来维护?首先需要考虑的就是性能问题。我们自然想到的是基于内存存储的Redis,它支持多种数据结构,例如列表(List)、集合(Set)、有序集合(Sorted Set)等,存取的操作也非常简单。

Redis支持的这几种数据结构存储各有优点。

列表有lpush()、lpop()、rpush()、rpop()方法,我们可以用它来实现先进先出式爬取队列,也可以实现先进后出栈式爬取队列。

集合的元素是无序的且不重复的,这样我们可以非常方便地实现随机排序且不重复的爬取队列。

有序集合带有分数表示,而Scrapy的Request也有优先级的控制,我们可以用它来实现带优先级调度的队列。

我们需要根据具体爬虫的需求来灵活选择不同的队列。

三、如何去重

Scrapy有自动去重,它的去重使用了Python中的集合。这个集合记录了Scrapy中每个Request的指纹,这个指纹实际上就是Request的散列值。我们可以看看Scrapy的源代码,如下所示:

import hashlib

def request_fingerprint(request, include_headers=None):

if include_headers:

include_headers = tuple(to_bytes(h.lower())

for h in sorted(include_headers))

cache = _fingerprint_cache.setdefault(request, {})

if include_headers not in cache:

fp = hashlib.sha1()

fp.update(to_bytes(request.method))

fp.update(to_bytes(canonicalize_url(request.url)))

fp.update(request.body or b'')

if include_headers:

for hdr in include_headers:

if hdr in request.headers:

fp.update(hdr)

for v in request.headers.getlist(hdr):

fp.update(v)

cache[include_headers] = fp.hexdigest()

return cache[include_headers]

request_fingerprint()就是计算Request指纹的方法,其方法内部使用的是hashlib的sha1()方法。计算的字段包括Request的Method、URL、Body、Headers这几部分内容,这里只要有一点不同,那么计算的结果就不同。计算得到的结果是加密后的字符串,也就是指纹。每个Request都有独有的指纹,指纹就是一个字符串,判定字符串是否重复比判定Request对象是否重复容易得多,所以指纹可以作为判定Request是否重复的依据。

那么我们如何判定重复呢?Scrapy是这样实现的,如下所示:

def __init__(self):

self.fingerprints = set()

def request_seen(self, request):

fp = self.request_fingerprint(request)

if fp in self.fingerprints:

return True

self.fingerprints.add(fp)

在去重的类RFPDupeFilter中,有一个request_seen()方法,这个方法有一个参数request,它的作用就是检测该Request对象是否重复。这个方法调用request_fingerprint()获取该Request的指纹,检测这个指纹是否存在于fingerprints变量中,而fingerprints是一个集合,集合的元素都是不重复的。如果指纹存在,那么就返回True,说明该Request是重复的,否则这个指纹加入到集合中。如果下次还有相同的Request传递过来,指纹也是相同的,那么这时指纹就已经存在于集合中,Request对象就会直接判定为重复。这样去重的目的就实现了。

Scrapy的去重过程就是,利用集合元素的不重复特性来实现Request的去重。

对于分布式爬虫来说,我们肯定不能再用每个爬虫各自的集合来去重了。因为这样还是每个主机单独维护自己的集合,不能做到共享。多台主机如果生成了相同的Request,只能各自去重,各个主机之间就无法做到去重了。

那么要实现去重,这个指纹集合也需要是共享的,Redis正好有集合的存储数据结构,我们可以利用Redis的集合作为指纹集合,那么这样去重集合也是利用Redis共享的。每台主机新生成Request之后,把该Request的指纹与集合比对,如果指纹已经存在,说明该Request是重复的,否则将Request的指纹加入到这个集合中即可。利用同样的原理不同的存储结构我们也实现了分布式Reqeust的去重。

四、防止中断

在Scrapy中,爬虫运行时的Request队列放在内存中。爬虫运行中断后,这个队列的空间就被释放,此队列就被销毁了。所以一旦爬虫运行中断,爬虫再次运行就相当于全新的爬取过程。

要做到中断后继续爬取,我们可以将队列中的Request保存起来,下次爬取直接读取保存数据即可获取上次爬取的队列。我们在Scrapy中指定一个爬取队列的存储路径即可,这个路径使用JOB_DIR变量来标识,我们可以用如下命令来实现:

scrapy crawl spider -s JOB_DIR=crawls/spider

更加详细的使用方法可以参见官方文档,链接为:https://doc.scrapy.org/en/latest/topics/jobs.html。

在Scrapy中,我们实际是把爬取队列保存到本地,第二次爬取直接读取并恢复队列即可。那么在分布式架构中我们还用担心这个问题吗?不需要。因为爬取队列本身就是用数据库保存的,如果爬虫中断了,数据库中的Request依然是存在的,下次启动就会接着上次中断的地方继续爬取。

所以,当Redis的队列为空时,爬虫会重新爬取;当Redis的队列不为空时,爬虫便会接着上次中断之处继续爬取。

五、架构实现

我们接下来就需要在程序中实现这个架构了。首先实现一个共享的爬取队列,还要实现去重的功能。另外,重写一个Scheduer的实现,使之可以从共享的爬取队列存取Request。

幸运的是,已经有人实现了这些逻辑和架构,并发布成叫Scrapy-Redis的Python包。接下来,我们看看Scrapy-Redis的源码实现,以及它的详细工作原理

首先,这种框架现在市面上是有的。强烈建议,不要重复造轮子。

先介绍几种比较主流的。

Elastic-Job,是当当网开源的分布式调度解决方案,支持任务分片功能,可以充分利用资源。Elastic-Job有两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。具体实现可以参考官方教程。其整体架构图如下。

Elastic-Job的特点:

1、分布式调度 2、作业高可用 3、任务分片执行。

另外,还有其他的一些框架,可以对比使用。比如TBSchedule是阿里巴巴开源的分布式调度框架,完全由java实现,目前被应用于淘宝,阿里巴巴,支付宝,京东, 汽车 之家等。大众点评开源的xxl-job,也是应用比较广泛的分布式调度任务。

目前我使用过的有 Elastic-Job和xxl-job。两者功能都很强大,后台管理也比较完善。很容易上手。都可以满足日常的工作需要。区别就是 Elastic-Job依赖zk,但是xxl-job不依赖zk,只依赖数据库。

目前市面上应该还有一些其他的框架,但是以上是比较主流的,可以根据自己的需要来选择。切记不要重复造轮子,造轮子需要大量的时间去验证。会让你在坑里爬不出来。

1.XXL-JOB

2.Elastic-Job

Elastic-Job 是一个分布式调度解决方案,由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-Cloud 组成。

定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。

支持分布式调度协调、弹性扩容缩容、失效转移、错过执行作业重触发、并行调度、自诊断和修复等等功能特性。

分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。选择该项目可以满足大多数it企业的需求。

Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。

轻量级无中心化:Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。

灵活的增删改查作业,集中式管理调度作业

支持高可用:一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

支持分片:作业分片一致性,保证同一分片在分布式环境中仅一个执行实例

任务监控:通过监听Elastic-Job-Lite的zookeeper注册中心的几个关键节点即可完成作业运行状态监控功能

一致性:使用zookeeper作为注册中心,为了保证作业的在分布式场景下的一致性,一旦作业与注册中心无法通信,运行中的作业会立刻停止执行,但作业的进程不会退出,这样做的目的是为了防止作业重分片时,将与注册中心失去联系的节点执行的分片分配给另外节点,导致同一分片在两个节点中同时执行。

同时支持动态扩容,将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片

3.opencron

opencron是一个功能完善且通用的开源定时任务调度系统,拥有先进可靠的自动化任务管理调度功能,提供可操作的 web 图形化管理满足多种场景下各种复杂的定时任务调度,同时集成了 linux 实时监控、webssh 等功能特性

4.quartz

支持集群和分布式,但是没有友好的管理界面,功能单一,对于管理调用的任务比较困难。

quartz使用数据库锁。在quartz的集群解决方案里有张表scheduler_locks,quartz采用了悲观锁的方式对triggers表进行行加锁,以保证任务同步的正确性。一旦某一个节点上面的线程获取了该锁,那么这个Job就会在这台机器上被执行,同时这个锁就会被这台机器占用。同时另外一台机器也会想要触发这个任务,但是锁已经被占用了,就只能等待,直到这个锁被释放。

quartz的分布式调度策略是以数据库为边界资源的一种异步策略。各个调度器都遵守一个基于数据库锁的操作规则从而保证了操作的唯一性。同时多个节点的异步运行保证了服务的可靠。但这种策略有自己的局限性:集群特性对于高CPU使用率的任务效果很好,但是对于大量的短任务,各个节点都会抢占数据库锁,这样就出现大量的线程等待资源。这种情况随着节点的增加会越来越严重。

缺点:quartz的分布式只是解决了高可用的问题,并没有解决任务分片的问题,还是会有单机处理的极限。

5.Saturn

Saturn

基于当当Elastic Job代码基础上自主研发的任务调度系统,是唯品会开源的分布式作业调度平台,取代传统的Linux Cron/Spring Batch Job的方式,做到统一配置,统一监控,任务高可用以及分片并发处理。主要是去中心化,高可用,可分片,动态扩容,有认证和授权功能。

主要特性

支持多种语言作业,语言无关(Java/Go/C++/PHP/Python/Ruby/shell)

支持秒级调度

支持作业分片并行执行

支持依赖作业串行执行

支持作业高可用和智能负载均衡

支持异常检测和自动失败转移

支持异地容灾

支持多个集群部署

支持跨机房区域部署

支持弹性动态扩容

支持优先级和权重设置

支持docker容器,容器化友好

支持cron时间表达式

支持多个时间段暂停执行控制

支持超时告警和超时强杀控制

支持灰度发布

支持异常、超时和无法高可用作业监控告警和简易的故障排除

支持失败率最高、最活跃和负荷最重的各域各节点TOP10的作业统计

优点:源码清晰,学习入手容易。应用部署简单,提供运维控制台,集中管理作业,运维控制台功能强大,提供作业统计报表 ,告警,增删改查作业,作业统一配置。

最后一个是国内团队封装的

前端时间研究了两款分布式任务调度框架,一个是XXL-Job,现在非常主流,很多常见的一些公司都在使用,像滴滴美团这样的公司都在用,这也是一款开源产品,下载下来导入IDEA就可以使用,分调度器和执行器和管理UI,有很美观的UI界面,可以对任务做增删改查,以及支持自定义开发,有很详细的帮助文档,还提供有demo,傻瓜式的,很简单,亮点是提供了管理界面。

另一个是Quartz,这个组件单机和集群都支持,单机的话是RAMJobStore任务存储,而要支持集群的话,就要将配置改成数据库方式,Quartz提供的有十几张表,其分布式的原理是利用了数据库的行锁,Quartz很简单,也是一款轻量级的开源产品,我们公司一直用这款组件,很成熟无Bug,推荐使用!

springcloudtask,springclouddataflow,正在学习中