golang使用Nsq

Python017

golang使用Nsq,第1张

1. 介绍

最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。

官方和第三方还为NSQ开发了众多客户端功能库,如官方提供的基于HTTP的nsqd、Go客户端go-nsq、Python客户端pynsq、基于Node.js的JavaScript客户端nsqjs、异步C客户端libnsq、Java客户端nsq-java以及基于各种语言的众多第三方客户端功能库。

1.1 Features

1). Distributed

NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性。

2). Scalable易于扩展

NSQ支持水平扩展,没有中心化的brokers。内置的发现服务简化了在集群中增加节点。同时支持pub-sub和load-balanced 的消息分发。

3). Ops Friendly

NSQ非常容易配置和部署,生来就绑定了一个管理界面。二进制包没有运行时依赖。官方有Docker image。

4.Integrated高度集成

官方的 Go 和 Python库都有提供。而且为大多数语言提供了库。

1.2 组件

1.3 拓扑结构

NSQ推荐通过他们相应的nsqd实例使用协同定位发布者,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个消费者读取。更重要的是,发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息。

NSQ

首先,一个发布者向它的本地nsqd发送消息,要做到这点,首先要先打开一个连接,然后发送一个包含topic和消息主体的发布命令,在这种情况下,我们将消息发布到事件topic上以分散到我们不同的worker中。

事件topic会复制这些消息并且在每一个连接topic的channel上进行排队,在我们的案例中,有三个channel,它们其中之一作为档案channel。消费者会获取这些消息并且上传到S3。

nsqd

每个channel的消息都会进行排队,直到一个worker把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。Nsqd节点首先会向nsqlookup广播他们的位置信息,一旦它们注册成功,worker将会从nsqlookup服务器节点上发现所有包含事件topic的nsqd节点。

nsqlookupd

2. Internals

2.1 消息传递担保

1)客户表示已经准备好接收消息

2)NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout)

3)客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队消息

这确保了消息丢失唯一可能的情况是不正常结束 nsqd 进程。在这种情况下,这是在内存中的任何信息(或任何缓冲未刷新到磁盘)都将丢失。

如何防止消息丢失是最重要的,即使是这个意外情况可以得到缓解。一种解决方案是构成冗余 nsqd对(在不同的主机上)接收消息的相同部分的副本。因为你实现的消费者是幂等的,以两倍时间处理这些消息不会对下游造成影响,并使得系统能够承受任何单一节点故障而不会丢失信息。

2.2 简化配置和管理

单个 nsqd 实例被设计成可以同时处理多个数据流。流被称为“话题”和话题有 1 个或多个“通道”。每个通道都接收到一个话题中所有消息的拷贝。在实践中,一个通道映射到下行服务消费一个话题。

在更底的层面,每个 nsqd 有一个与 nsqlookupd 的长期 TCP 连接,定期推动其状态。这个数据被 nsqlookupd 用于给消费者通知 nsqd 地址。对于消费者来说,一个暴露的 HTTP /lookup 接口用于轮询。为话题引入一个新的消费者,只需启动一个配置了 nsqlookup 实例地址的 NSQ 客户端。无需为添加任何新的消费者或生产者更改配置,大大降低了开销和复杂性。

2.3 消除单点故障

NSQ被设计以分布的方式被使用。nsqd 客户端(通过 TCP )连接到指定话题的所有生产者实例。没有中间人,没有消息代理,也没有单点故障。

这种拓扑结构消除单链,聚合,反馈。相反,你的消费者直接访问所有生产者。从技术上讲,哪个客户端连接到哪个 NSQ 不重要,只要有足够的消费者连接到所有生产者,以满足大量的消息,保证所有东西最终将被处理。对于 nsqlookupd,高可用性是通过运行多个实例来实现。他们不直接相互通信和数据被认为是最终一致。消费者轮询所有的配置的 nsqlookupd 实例和合并 response。失败的,无法访问的,或以其他方式故障的节点不会让系统陷于停顿。

2.4 效率

对于数据的协议,通过推送数据到客户端最大限度地提高性能和吞吐量的,而不是等待客户端拉数据。这个概念,称之为 RDY 状态,基本上是客户端流量控制的一种形式。

efficiency

2.5 心跳和超时

组合应用级别的心跳和 RDY 状态,避免头阻塞现象,也可能使心跳无用(即,如果消费者是在后面的处理消息流的接收缓冲区中,操作系统将被填满,堵心跳)为了保证进度,所有的网络 IO 时间上限势必与配置的心跳间隔相关联。这意味着,你可以从字面上拔掉之间的网络连接 nsqd 和消费者,它会检测并正确处理错误。当检测到一个致命错误,客户端连接被强制关闭。在传输中的消息会超时而重新排队等待传递到另一个消费者。最后,错误会被记录并累计到各种内部指标。

2.6 分布式

因为NSQ没有在守护程序之间共享信息,所以它从一开始就是为了分布式操作而生。个别的机器可以随便宕机随便启动而不会影响到系统的其余部分,消息发布者可以在本地发布,即使面对网络分区。

这种“分布式优先”的设计理念意味着NSQ基本上可以永远不断地扩展,需要更高的吞吐量?那就添加更多的nsqd吧。唯一的共享状态就是保存在lookup节点上,甚至它们不需要全局视图,配置某些nsqd注册到某些lookup节点上这是很简单的配置,唯一关键的地方就是消费者可以通过lookup节点获取所有完整的节点集。清晰的故障事件——NSQ在组件内建立了一套明确关于可能导致故障的的故障权衡机制,这对消息传递和恢复都有意义。虽然它们可能不像Kafka系统那样提供严格的保证级别,但NSQ简单的操作使故障情况非常明显。

2.7 no replication

不像其他的队列组件,NSQ并没有提供任何形式的复制和集群,也正是这点让它能够如此简单地运行,但它确实对于一些高保证性高可靠性的消息发布没有足够的保证。我们可以通过降低文件同步的时间来部分避免,只需通过一个标志配置,通过EBS支持我们的队列。但是这样仍然存在一个消息被发布后马上死亡,丢失了有效的写入的情况。

2.8 没有严格的顺序

虽然Kafka由一个有序的日志构成,但NSQ不是。消息可以在任何时间以任何顺序进入队列。在我们使用的案例中,这通常没有关系,因为所有的数据都被加上了时间戳,但它并不适合需要严格顺序的情况。

2.9 无数据重复删除功能

NSQ对于超时系统,它使用了心跳检测机制去测试消费者是否存活还是死亡。很多原因会导致我们的consumer无法完成心跳检测,所以在consumer中必须有一个单独的步骤确保幂等性。

3. 实践安装过程

本文将nsq集群具体的安装过程略去,大家可以自行参考官网,比较简单。这部分介绍下笔者实验的拓扑,以及nsqadmin的相关信息。

3.1 拓扑结构

topology

实验采用3台NSQD服务,2台LOOKUPD服务。

采用官方推荐的拓扑,消息发布的服务和NSQD在一台主机。一共5台机器。

NSQ基本没有配置文件,配置通过命令行指定参数。

主要命令如下:

LOOKUPD命令

NSQD命令

工具类,消费后存储到本地文件。

发布一条消息

3.2 nsqadmin

对Streams的详细信息进行查看,包括NSQD节点,具体的channel,队列中的消息数,连接数等信息。

nsqadmin

channel

列出所有的NSQD节点:

nodes

消息的统计:

msgs

lookup主机的列表:

hosts

4. 总结

NSQ基本核心就是简单性,是一个简单的队列,这意味着它很容易进行故障推理和很容易发现bug。消费者可以自行处理故障事件而不会影响系统剩下的其余部分。

事实上,简单性是我们决定使用NSQ的首要因素,这方便与我们的许多其他软件一起维护,通过引入队列使我们得到了堪称完美的表现,通过队列甚至让我们增加了几个数量级的吞吐量。越来越多的consumer需要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能。

结合我们的业务系统来看,对于我们所需要传输的发票消息,相对比较敏感,无法容忍某个nsqd宕机,或者磁盘无法使用的情况,该节点堆积的消息无法找回。这是我们没有选择该消息中间件的主要原因。简单性和可靠性似乎并不能完全满足。相比Kafka,ops肩负起更多负责的运营。另一方面,它拥有一个可复制的、有序的日志可以提供给我们更好的服务。但对于其他适合NSQ的consumer,它为我们服务的相当好,我们期待着继续巩固它的坚实的基础。

TiDB 是 PingCAP 自主研发的开源分布式关系型数据库,具备商业级数据库的数据可靠性,可用性,安全性等特性,支持在线弹性水平扩展,兼容 MySQL 协议及生态,创新性实现 OLTP 及 OLAP 融合。

TiDB 3.0 版本显著提升了大规模集群的稳定性,集群支持 150+ 存储节点,300+TB 存储容量长期稳定运行。易用性方面引入大量降低用户运维成本的优化,包括引入 Information_Schema 中的多个实用系统视图、EXPLAIN ANALYZE、SQL Trace 等。在性能方面,特别是 OLTP 性能方面,3.0 比 2.1 也有大幅提升,其中 TPC-C 性能提升约 4.5 倍,Sysbench 性能提升约 1.5 倍,OLAP 方面,TPC-H 50G Q15 因实现 View 可以执行,至此 TPC-H 22 个 Query 均可正常运行。新功能方面增加了窗口函数、视图(实验特性)、分区表、插件系统、悲观锁(实验特性)。

截止本文发稿时 TiDB 已在 500+ 用户的生产环境中长期稳定运行,涵盖金融、保险、制造,互联网, 游戏 等领域,涉及交易、数据中台、 历史 库等多个业务场景。不同业务场景对关系型数据库的诉求可用 “百花齐放”来形容,但对关系数据库最根本的诉求未发生任何变化,如数据可靠性,系统稳定性,可扩展性,安全性,易用性等。请跟随我们的脚步梳理 TiDB 3.0 有什么样的惊喜。

3.0 与 2.1 版本相比,显著提升了大规模集群的稳定性,支持单集群 150+ 存储节点,300+TB 存储容量长期稳定运行,主要的优化点如下:

1. 优化 Raft 副本之间的心跳机制,按照 Region 的活跃程度调整心跳频率,减小冷数据对集群的负担。

2. 热点调度策略支持更多参数配置,采用更高优先级,并提升热点调度的准确性。

3. 优化 PD 调度流程,提供调度限流机制,提升系统稳定性。

4. 新增分布式 GC 功能,提升 GC 的性能,降低大集群 GC 时间,提升系统稳定性。

众所周知,数据库查询计划的稳定性对业务至关重要,TiDB 3.0 版本采用多种优化手段提升查询计划的稳定性,如下:

1. 新增 Fast Analyze 功能,提升收集统计信息的速度,降低集群资源的消耗及对业务的影响。

2. 新增 Incremental Analyze 功能,提升收集单调递增的索引统计信息的速度,降低集群资源的消耗及对业务的影响。

3. 在 CM-Sketch 中新增 TopN 的统计信息,缓解 CM-Sketch 哈希冲突导致估算偏大,提升代价估算的准确性,提升查询计划的稳定性。

4. 引入 Skyline Pruning 框架,利用规则防止查询计划过度依赖统计信息,缓解因统计信息滞后导致选择的查询计划不是最优的情况,提升查询计划的稳定性。

5. 新增 SQL Plan Management 功能,支持在查询计划不准确时手动绑定查询计划,提升查询计划的稳定性。

1. OLTP

3.0 与 2.1 版本相比 Sysbench 的 Point Select,Update Index,Update Non-Index 均提升约 1.5 倍,TPC-C 性能提升约 4.5 倍。主要的优化点如下:

1. TiDB 持续优化 SQL 执行器,包括:优化 NOT EXISTS 子查询转化为 Anti Semi Join,优化多表 Join 时 Join 顺序选择等。

2. 优化 Index Join 逻辑,扩大 Index Join 算子的适用场景并提升代价估算的准确性。

3. TiKV 批量接收和发送消息功能,提升写入密集的场景的 TPS 约 7%,读密集的场景提升约 30%。

4. TiKV 优化内存管理,减少 Iterator Key Bound Option 的内存分配和拷贝,多个 Column Families 共享 block cache 提升 cache 命中率等手段大幅提升性能。

5. 引入 Titan 存储引擎插件,提升 Value 值超过 1KB 时性能,缓解 RocksDB 写放大问题,减少磁盘 IO 的占用。

6. TiKV 新增多线程 Raftstore 和 Apply 功能,提升单节点内可扩展性,进而提升单节点内并发处理能力和资源利用率,降低延时,大幅提升集群写入能力。

TiDB Lightning 性能与 2019 年年初相比提升 3 倍,从 100GB/h 提升到 300GB/h,即 28MB/s 提升到 85MB/s,优化点,如下:

1. 提升 SQL 转化成 KV Pairs 的性能,减少不必要的开销。

2. 提升单表导入性能,单表支持批量导入。

3. 提升 TiKV-Importer 导入数据性能,支持将数据和索引分别导入。

4. TiKV-Importer 支持上传 SST 文件限速功能。

RBAC(Role-Based Access Control,基于角色的权限访问控制) 是商业系统中最常见的权限管理技术之一,通过 RBAC 思想可以构建最简单“用户-角色-权限”的访问权限控制模型。RBAC 中用户与角色关联,权限与角色关联,角色与权限之间一般是多对多的关系,用户通过成为什么样的角色获取该角色所拥有的权限,达到简化权限管理的目的,通过此版本的迭代 RBAC 功能开发完成。

IP 白名单功能(企业版特性) :TiDB 提供基于 IP 白名单实现网络安全访问控制,用户可根据实际情况配置相关的访问策略。

Audit log 功能(企业版特性) :Audit log 记录用户对数据库所执行的操作,通过记录 Audit log 用户可以对数据库进行故障分析,行为分析,安全审计等,帮助用户获取数据执行情况。

加密存储(企业版特性) :TiDB 利用 RocksDB 自身加密功能,实现加密存储的功能,保证所有写入到磁盘的数据都经过加密,降低数据泄露的风险。

完善权限语句的权限检查 ,新增 ANALYZE,USE,SET GLOBAL,SHOW PROCESSLIST 语句权限检查。

1. 新增 SQL 方式查询慢查询,丰富 TiDB 慢查询日志内容,如:Coprocessor 任务数,平均/最长/90% 执行/等待时间,执行/等待时间最长的 TiKV 地址,简化慢查询定位工作,提高排查慢查询问题效率,提升产品易用性。

2. 新增系统配置项合法性检查,优化系统监控项等,提升产品易用性。

3. 新增对 TableReader、IndexReader 和 IndexLookupReader 算子内存使用情况统计信息,提高 Query 内存使用统计的准确性,提升处理内存消耗较大语句的效率。

4. 制定日志规范,重构日志系统,统一日志格式,方便用户理解日志内容,有助于通过工具对日志进行定量分析。

5. 新增 EXPLAIN ANALYZE 功能,提升SQL 调优的易用性。

6. 新增 SQL 语句 Trace 功能,方便排查问题。

7. 新增通过 unix_socket 方式连接数据库。

8. 新增快速恢复被删除表功能,当误删除数据时可通过此功能快速恢复数据。

TiDB 3.0 新增 TiFlash 组件,解决复杂分析及 HTAP 场景。TiFlash 是列式存储系统,与行存储系统实时同步,具备低延时,高性能,事务一致性读等特性。 通过 Raft 协议从 TiKV 中实时同步行存数据并转化成列存储格式持久化到一组独立的节点,解决行列混合存储以及资源隔离性问题。TiFlash 可用作行存储系统(TiKV)实时镜像,实时镜像可独立于行存储系统,将行存储及列存储从物理隔离开,提供完善的资源隔离方案,HTAP 场景最优推荐方案;亦可用作行存储表的索引,配合行存储对外提供智能的 OLAP 服务,提升约 10 倍复杂的混合查询的性能。

TiFlash 目前处于 Beta 阶段,计划 2019 年 12 月 31 日之前 GA,欢迎大家申请试用。

未来我们会继续投入到系统稳定性,易用性,性能,弹性扩展方面,向用户提供极致的弹性伸缩能力,极致的性能体验,极致的用户体验。

稳定性方面 V4.0 版本将继续完善 V3.0 未 GA 的重大特性,例如:悲观事务模型,View,Table Partition,Titan 行存储引擎,TiFlash 列存储引擎;引入近似物理备份恢复解决分布数据库备份恢复难题;优化 PD 调度功能等。

性能方面 V4.0 版本将继续优化事务处理流程,减少事务资源消耗,提升性能,例如:1PC,省去获取 commit ts 操作等。

弹性扩展方面,PD 将提供弹性扩展所需的元信息供外部系统调用,外部系统可根据元信息及负载情况动态伸缩集群规模,达成节省成本的目标。

我们相信战胜“未知”最好的武器就是社区的力量,基础软件需要坚定地走开源路线。截止发稿我们已经完成 41 篇源码阅读文章。TiDB 开源社区总计 265 位 Contributor,6 位 Committer,在这里我们对社区贡献者表示由衷的感谢,希望更多志同道合的人能加入进来,也希望大家在 TiDB 这个开源社区能够有所收获。

TiDB 3.0 GA Release Notes: https://pingcap.com/docs-cn/v3.0/releases/3.0-ga/

一、关于连接池

一个数据库服务器只拥有有限的资源,并且如果你没有充分使用这些资源,你可以通过使用更多的连接来提高吞吐量。一旦所有的资源都在使用,那么你就不 能通过增加更多的连接来提高吞吐量。事实上,吞吐量在连接负载较大时就开始下降了。通常可以通过限制与可用的资源相匹配的数据库连接的数量来提高延迟和吞 吐量。

如何在Go语言中使用Redis连接池

如果不使用连接池,那么,每次传输数据,我们都需要进行创建连接,收发数据,关闭连接。在并发量不高的场景,基本上不会有什么问题,一旦并发量上去了,那么,一般就会遇到下面几个常见问题:

性能普遍上不去

CPU 大量资源被系统消耗

网络一旦抖动,会有大量 TIME_WAIT 产生,不得不定期重启服务或定期重启机器

服务器工作不稳定,QPS 忽高忽低

要想解决这些问题,我们就要用到连接池了。连接池的思路很简单,在初始化时,创建一定数量的连接,先把所有长连接存起来,然后,谁需要使用,从这里取走,干完活立马放回来。 如果请求数超出连接池容量,那么就排队等待、退化成短连接或者直接丢弃掉。

二、使用连接池遇到的坑

最近在一个项目中,需要实现一个简单的 Web Server 提供 Redis 的 HTTP interface,提供 JSON 形式的返回结果。考虑用 Go 来实现。

首先,去看一下 Redis 官方推荐的 Go Redis driver。官方 Star 的项目有两个:Radix.v2 和 Redigo。经过简单的比较后,选择了更加轻量级和实现更加优雅的 Radix.v2。

Radix.v2 包是根据功能划分成一个个的 sub package,每一个 sub package 在一个独立的子目录中,结构非常清晰。我的项目中会用到的 sub package 有 redis 和 pool。

由于我想让这种被 fork 的进程最好简单点,做的事情单一一些,所以,在没有深入去看 Radix.v2 的 pool 的实现之前,我选择了自己实现一个 Redis pool。(这里,就不贴代码了。后来发现自己实现的 Redis pool 与 Radix.v2 实现的 Redis pool 的原理是一样的,都是基于 channel 实现的, 遇到的问题也是一样的。)

不过在测试过程中,发现了一个诡异的问题。在请求过程中经常会报 EOF 错误。而且是概率性出现,一会有问题,一会又好了。通过反复的测试,发现 bug 是有规律的,当程序空闲一会后,再进行连续请求,会发生3次失败,然后之后的请求都能成功,而我的连接池大小设置的是3。再进一步分析,程序空闲300秒 后,再请求就会失败,发现我的 Redis server 配置了 timeout 300,至此,问题就清楚了。是连接超时 Redis server 主动断开了连接。客户端这边从一个超时的连接请求就会得到 EOF 错误。

然后我看了一下 Radix.v2 的 pool 包的源码,发现这个库本身并没有检测坏的连接,并替换为新server{location/pool{content_by_lua_block{localredis=require"resty.redis"localred=redis:new()localok,err=red:connect("127.0.0.1",6379)ifnotokthenngx.say("failedtoconnect:",err)returnendok,err=red:set("hello","world")ifnotokthenreturnendred:set_keepalive(10000,100)}}}

发现有个 set_keepalive 的方法,查了一下官方文档,方法的原型是 syntax: ok, err = red:set_keepalive(max_idle_timeout, pool_size) 貌似 max_idle_timeout 这个参数,就是我们所缺少的东西,然后进一步跟踪源码,看看里面是怎么保证连接有效的。

function_M.set_keepalive(self,...)localsock=self.sockifnotsockthenreturnnil,"notinitialized"endifself.subscribedthenreturnnil,"subscribedstate"endreturnsock:setkeepalive(...)end

至此,已经清楚了,使用了 tcp 的 keepalive 心跳机制。

于是,通过与 Radix.v2 的作者一些讨论,选择自己在 redis 这层使用心跳机制,来解决这个问题。

四、最后的解决方案

在创建连接池之后,起一个 goroutine,每隔一段 idleTime 发送一个 PING 到 Redis server。其中,idleTime 略小于 Redis server 的 timeout 配置。连接池初始化部分代码如下:

p,err:=pool.New("tcp",u.Host,concurrency)errHndlr(err)gofunc(){for{p.Cmd("PING")time.Sleep(idelTime*time.Second)}}()

使用 redis 传输数据部分代码如下:

funcredisDo(p*pool.Pool,cmdstring,args...interface{})(reply*redis.Resp,errerror){reply=p.Cmd(cmd,args...)iferr=reply.Errerr!=nil{iferr!=io.EOF{Fatal.Println("redis",cmd,args,"erris",err)}}return}

其中,Radix.v2 连接池内部进行了连接池内连接的获取和放回,代码如下:

//Cmdautomaticallygetsoneclientfromthepool,executesthegivencommand//(returningitsresult),andputstheclientbackinthepoolfunc(p*Pool)Cmd(cmdstring,args...interface{})*redis.Resp{c,err:=p.Get()iferr!=nil{returnredis.NewResp(err)}deferp.Put(c)returnc.Cmd(cmd,args...)}

这样,我们就有了 keepalive 的机制,不会出现 timeout 的连接了,从 redis 连接池里面取出的连接都是可用的连接了。看似简单的代码,却完美的解决了连接池里面超时连接的问题。同时,就算 Redis server 重启等情况,也能保证连接自动重连。