golang实现本地延迟队列

Python014

golang实现本地延迟队列,第1张

有个服务会大量使用延迟消息,进行事件处理。随着业务量不断上涨。在晚间、节假日等流量高峰期消息延迟消息队列限流会导致事件丢失,影响业务。与下游沟通后给上调到了最大限流值,问题依然存在,于是决定自己搞一套降级方案。

下游服务触发限流时,能降级部分流量到本地延迟队列,把业务损失降到最低。

本地延迟队列承接部分mq流量

流程如下:

1. 使用zset 存储延迟消息,其中:score为执行时间,value为消息体

2. 启动协程轮询zset,获取score最小的10条数据,协程执行间隔时间xs

        如果最小分值小于等于当前时间戳,则发送消息

        若最小分值大于当前时间戳,sleep等待执行

需要对key进行hash,打散到多个分片中,避免大key和热key问题,官方大key定义

因此,需保证每个key中value数量n<5000,单个value大小不超过 10240/n kb

假设承接10w qps,如何处理?

10w qps延迟120s时,最开始消息队列会积累100000*120=12000000条消息

假如每条消息大小500b,需占用存储6000000kb = 6000Mb = 6GB

为避免大key问题,每个zset存放4000个元素,需要哈希到3000(3000是key的数量,可配置)个zset中。

整个集群假设500台实例,每个处理qps平均在200左右。

单实例消费能力计算:

遍历每个zset,针对每个zset起goroutine处理,此示例中需要 起3000个

但是每秒能处理成功的只有200个,其他都在空跑

综上:

将redis key分片数n和每次处理的消息数m进行动态配置,便于调整

当流量上涨时,调大分片数n和单实例单分片并发数m即可,假如消费间隔200ms,集群处理能力为n*m*5 qps

n = (qps * 120) / 4000

若qps=q,则计算公式如下

zadd = q

zRange = 500 * 5 * n / 500

zRemove = q

setNx = 500 * 5 * n

若10w qps,则

读qps = 15000 + 500*3000*5 =7515000,写 20w

pros

redis 读写性能好,可支持较大并发量,zrange可直接取出到达执行时间的消息

cons

redis 大key问题导致对数据量有一定的限制

分片数量扩缩容会漏消费,会导致事件丢失,业务有损

key分片数量过多时,redis读写压力较大

机器资源浪费,3000个协程,单实例同一秒只有200个针对处理,其他都在空跑

流程如下:

使用带缓冲的channel来实现延迟队列,channel中存放的数据为消息体(包括执行时间),channel能保证先进先出

从channel中取出数据后,判断是否到达执行时间

到达,同步发送mq

未到达,sleep 剩余执行时间,然后再次执行

从channel读出的数据如果未到达执行时间,无法再次放入channel中,需要协程sleep(执行时间-当前时间)

10w qps延迟120s时,最开始消息队列会积累100000*120=12000000条消息,假设每条消息大小500b,需要6G存储空间

channel 大小 = (qps*120)/ c , c=集群实例数,c=500 =>channel大小为24000,占用12M内存

要处理10w qps,分摊到每个机器的处理速度为 100000/500 = 200,假设单协程处理10qps,开20个即可。

pros:

本地存储,相比redis,读写速度更快;协程数量少,开销低;资源利用率较方案一高

cons:

稳定性不如redis,实例故障可能导致数据丢失;worker池和channel扩缩容依赖服务重启,成本高速度慢

综上,我们以10w qps为例,对比两种方案在以下指标差异,选择方案二。

附上demo

内核线程(Kernel-Level Thread ,KLT)

轻量级进程(Light Weight Process,LWP):轻量级进程就是我们通常意义上所讲的线程,由于每个轻量级进程都由一个内核线程支持,因此只有先支持内核线程,才能有轻量级进程

用户线程与系统线程一一对应,用户线程执行如lo操作的系统调用时,来回切换操作开销相对比较大

多个用户线程对应一个内核线程,当内核线程对应的一个用户线程被阻塞挂起时候,其他用户线程也阻塞不能执行了。

多对多模型是可以充分利用多核CPU提升运行效能的

go线程模型包含三个概念:内核线程(M),goroutine(G),G的上下文环境(P);

GMP模型是goalng特有的。

P与M一般是一一对应的。P(上下文)管理着一组G(goroutine)挂载在M(内核线程)上运行,图中左边蓝色为正在执行状态的goroutine,右边为待执行状态的goroutiine队列。P的数量由环境变量GOMAXPROCS的值或程序运行runtime.GOMAXPROCS()进行设置。

当一个os线程在执行M1一个G1发生阻塞时,调度器让M1抛弃P,等待G1返回,然后另起一个M2接收P来执行剩下的goroutine队列(G2、G3...),这是golang调度器厉害的地方,可以保证有足够的线程来运行剩下所有的goroutine。

当G1结束后,M1会重新拿回P来完成,如果拿不到就丢到全局runqueue中,然后自己放到线程池或转入休眠状态。空闲的上下文P会周期性的检查全局runqueue上的goroutine,并且执行它。

另一种情况就是当有些P1太闲而其他P2很忙碌的时候,会从其他上下文P2拿一些G来执行。

详细可以翻看下方第一个参考链接,写得真好。

最后用大佬的总结来做最后的收尾————

Go语言运行时,通过核心元素G,M,P 和 自己的调度器,实现了自己的并发线程模型。调度器通过对G,M,P的调度实现了两级线程模型中操作系统内核之外的调度任务。整个调度过程中会在多种时机去触发最核心的步骤 “一整轮调度”,而一整轮调度中最关键的部分在“全力查找可运行G”,它保证了M的高效运行(换句话说就是充分使用了计算机的物理资源),一整轮调度中还会涉及到M的启用停止。最后别忘了,还有一个与Go程序生命周期相同的系统监测任务来进行一些辅助性的工作。

浅析Golang的线程模型与调度器

Golang CSP并发模型

Golang线程模型