Go语言基于Etcd实现的定时任务

Python015

Go语言基于Etcd实现的定时任务,第1张

利用 Etcd 的Lease租约特性来实现定时功能,同时通过Watch机制来实现多节点情况下只有一个节点执行该任务。通过定时任务库 Cron 的时间字符串解析器Parser来解析任务执行时间。

Etcd

Cron

源码链接

    目前APP业务中启用的定时任务已达到400+,目前管理比较混乱,很多任务运行时占用服务器资源巨大,其中不乏一些非紧急的任务,平时并不会有太大影响,但是当流量高峰来临时,这些定时任务可能会成为压死骆驼的最后一根稻草。为了避免出现这样的问题,我们通常会在高流量来之前去调整一些定时任务的执行间隔时间或者暂停一些不影响服务的定时任务。这样做的弊端是工作量很大,同时难免会有遗漏。由此衍生除了对任务分级的诉求。对任务分级后,高峰流量时,可视情况降级相关等级的定时任务。

        PS:设计核心流程的任务等,如支付回调

        PS:任务中设计到事务等

    基于gocron的任务节点做任务分级,不同级别的任务对应不同的gocron节点。如下图:

    把三级任务放在三级节点上跑,如下图:

    以此类推,不同级别的任务跑在对应级别的节点上。

    当流量高峰来临时,我们想通过停掉所有三级任务来实现快速降级,而这个操作仅仅需要关闭对应节点的连接即可。如下图

PS:这个操作同时会停止所有正在运行的任务

举个例子:目前我的三级任务节点上运行了一个同步数据的任务(预计5分钟左右能执行完),当我把三级任务节点关闭时,这个任务会直接失败,在节点对应的机器上我们可以看到所有进程也被直接kill掉了,即使我的任务是多进程在跑,相应的子进程也会被kill掉。如下:

当前正在服务的三级节点-asgard三级定时任务

当前正在节点-asgard三级定时任务上运行的任务-商品数据整合同步搜索个推库

节点服务器上正在运行的进程

这时候我们关闭asgard三级定时任务这个节点

可以看到任务直接执行失败了

同时,节点服务器上的进程也被kill掉了

    由于二级任务可能涉及到事务等操作,非万分紧急情况下不能直接终止,以免导致脏数据的产生。对于这种任务的降级我们不能直接通过节点的方式停止任务。可以通过关闭任务的方式停止。如下:

PS:关闭任务的操作会等当前的任务执行完成再关闭,不会对当前任务产生任何影响

举个例子:

还拿asgard三级定时任务这个节点来看,目前这个节点在链接状态

这个节点下跑了一个任务

同样的,节点服务器上有对应的进程在跑着

这时候,我们关闭这个任务

我们可以看到,关闭这个任务,不会影响正在执行的任务

节点对应的服务器上的任务也正常在跑

PS:这个关闭任务对应的是,完成当前任务后不再执行新的任务。

    1、基于gocron的任务节点对任务做分级处理

    2、一、二、三级任务的划分

    3、服务降级的两种方式:关闭节点&关闭任务

在linux下实现定时器主要有如下方式

在这当中 基于时间轮方式实现的定时器 时间复杂度最小,效率最高,然而我们可以通过 优先队列 实现时间轮定时器。

优先队列的实现可以使用最大堆和最小堆,因此在队列中所有的数据都可以定义排序规则自动排序。我们直接通过队列中 pop 函数获取数据,就是我们按照自定义排序规则想要的数据。

在 Golang 中实现一个优先队列异常简单,在 container/head 包中已经帮我们封装了,实现的细节,我们只需要实现特定的接口就可以。

下面是官方提供的例子

因为优先队列底层数据结构是由二叉树构建的,所以我们可以通过数组来保存二叉树上的每一个节点。

改数组需要实现 Go 预先定义的接口 Len , Less , Swap , Push , Pop 和 update 。

timerType结构是定时任务抽象结构

首先的 start 函数,当创建一个 TimeingWheel 时,通过一个 goroutine 来执行 start ,在start中for循环和select来监控不同的channel的状态

通过for循环从队列中取数据,直到该队列为空或者是遇见第一个当前时间比任务开始时间大的任务, append 到 expired 中。因为优先队列中是根据 expiration 来排序的,

所以当取到第一个定时任务未到的任务时,表示该定时任务以后的任务都未到时间。

当 getExpired 函数取出队列中要执行的任务时,当有的定时任务需要不断执行,所以就需要判断是否该定时任务需要重新放回优先队列中。 isRepeat 是通过判断任务中 interval 是否大于 0 判断,

如果大于0 则,表示永久就生效。

防止外部滥用,阻塞定时器协程,框架又一次封装了timer这个包,名为 timer_wapper 这个包,它提供了两种调用方式。

参数和上面的参数一样,只是在第三个参数中使用了任务池,将定时任务放入了任务池中。定时任务的本身执行就是一个 put 操作。

至于put以后,那就是 workers 这个包管理的了。在 worker 包中, 也就是维护了一个任务池,任务池中的任务会有序的执行,方便管理。