组件分享之后端组件——基于Golang实现的高性能和弹性的流处理器benthos

Python011

组件分享之后端组件——基于Golang实现的高性能和弹性的流处理器benthos,第1张

近期正在探索前端、后端、系统端各类常用组件与工具,对其一些常见的组件进行再次整理一下,形成标准化组件专题,后续该专题将包含各类语言中的一些常用组件。欢迎大家进行持续关注。

本节我们分享的是基于Golang实现的高性能和弹性的流处理器 benthos ,它能够以各种代理模式连接各种 源 和 接收器,并对有效负载执行 水合、浓缩、转换和过滤 。

它带有 强大的映射语言 ,易于部署和监控,并且可以作为静态二进制文件、docker 映像或 无服务器函数 放入您的管道,使其成为云原生。

Benthos 是完全声明性的,流管道在单个配置文件中定义,允许您指定连接器和处理阶段列表:

Apache Pulsar, AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), Cassandra, Elasticsearch, File, GCP (Pub/Sub, Cloud storage), HDFS, HTTP (server and client, including websockets), Kafka, Memcached, MQTT, Nanomsg, NATS, NATS JetStream, NATS Streaming, NSQ, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL), Stdin/Stdout, TCP &UDP, sockets and ZMQ4.

1、docker安装

具体使用方式可以参见该 文档

有关如何配置更高级的流处理概念(例如流连接、扩充工作流等)的指导,请查看 说明书部分。

有关在 Go 中构建您自己的自定义插件的指导,请查看 公共 API。

AMQP,即 高级消息队列协议 (Advanced Message Queuing Protocol),是一个消息中间件应用层协议,用于组件之间的解耦,来提供 统一消息服务。主要功能是 排序消息,路由消息(包括点对点和订阅-发布),保证消息的可靠性和安全性。

遵循AMPQ协议的客户端,都能通过 消息中间件 相互通信。这样 客户端 就可以采用不同的开发语言实现,彼此无强依赖关系,降低客户端复杂性,提高开发效率也利于后期维护。

AMQP 的模型架构如下:

rabbitMQ是AMQP协议的一个开源实现。架构模型同样可以用以下的图来表示:

如上图,simple模式,单个publisher,单个queue,单个consumer

如上图,work模式

多个consumer共用一个queue的message

此种模式下,rabbitMQ会自动做负载均衡,将消息轮询发送给各个消费者,即一个消息只能被一个消费者获取

如上图,publish / subscribe 发布订阅模式(广播模式)

相对前2种模式,多了一个 exchange (type为fanout) ,message先发送到exchange,exchange再分别发送到对应的所有queue。而consumer订阅自己的queue,在自己订阅的queue上消费message。

示例应用场景,如下图示:

比如 网上购物,下单支付成功后,通知用户的方式有许多种,app推送,短信,email 等等。

message到来后被exchange发送到3个queue(app推送q,短信q,email_q)

之后 app推送服务,短信通知服务,email通知服务 从各自订阅的queue获取消息,通知用户支付成功

如上图示,exchange类型设定为direct

此时 message中的rountingKey 和 exchange中的bindingKey匹配,两者相等则发送对应的queue中,如果匹配不到bindingKey,则丢弃该message。

示例应用场景,如下图示:

比如服务产生的日志,日志有许多类型,error,info,debuf等类型的日志,而我们的需求只想要将 error 类型的日志写入磁盘,就可以用routing模式,将error日志路由到error queue,再由相应的 写入磁盘服务获取message,写入磁盘

如上图示,exchange类型为topic,相对于第4种模式,相同点是都根据 rountingKey 匹配,不同点是 topic 模式支持模糊匹配。

上一篇主要介绍了AMQP的一些知识,接下来开始正式步入Spring AMQP。

Message:在AMQP中并没有定义消息的模型,Spring为了方便我们理解与使用,新增了Message接口,在构建消息的时候Spring提供了builde API,MessageBuilder.xx.xx的形式使用起来很方便。

Exchange:这个接口和AMQP中定义的exchange基本相同,就不说了

Queue:同上。

Binding:一般叫他绑定关系,AMQP也有对其的抽象模型,只不过我认为他只不过相当于是附加在队列与交换机上的属性,所以在上篇关于AMQP的介绍中并没有详细说明。呃,其实spring对其的定义就是代表了队列与交换机的绑定关系。。。

spring提供了ConnectionFactory接口,当我们使用的时候会使用它的实现类CachingConnectionFactory,看名字也知道就是基于缓存的连接池,默认的池大小为25。Spring也提供了对于多个connectionFactory的支持接口例如SimpleRoutingConnectionFactory等。

我们使用SpringBoot进行测试,最小化的配置如下

这里先给出一个简单的例子然后再具体讲解。

如图,我们提前声明了一个名为hello的队列,浏览器访问/send时,可以看到控制台打印了相应的时间信息,即被@RabbitListener注解的方法被调用了。如果我们打开RabbitMq的webUI,会发现名为hello的队列中消息数量由0变为1再变为0。注意,这里我们并没有声明Exchange,MQ会为我们将队列绑定到默认的Exchange。

接下来就详细的说一下这个例子。对于操作RabbitMQ,Spring提供了 RabbitTemplate(对于batch操作,相应的是BatchingRabbitTemplate,在1.6版本以后,spring提供了异步的Template--AsyncRabbitTemplate)。我们使用它来发送与接收消息。当发送完消息的时候如何知道本次操作的成功或者失败呢?默认情况下不能被路由的消息将会被丢弃,这会导致消息丢失,不能保证消息可靠性(消息可靠性请参照上一篇AMQP介绍中的推荐)。发布确认机制是保证消息可靠性的第一步,发布确认保证我们知道消息是否成功到达队列中,返回ack则代表成功,nack则代表失败。要使用这个特性,我们需要将RabbitTemplate的mandatory属性和ConnectionFactory的publisherConfirms属性都设为true。这时我们可以在RabbitTemplate上设置setReturnCallback监听来接收MQ服务器返回的状态信息了。对于消息的确认,我们只需要设置RabbitTemplate.ConfirmCallback的回调方法即可。

当我们每次发送请求时,都会打印相应的ack,其中correlationData是生产者在发送数据时可以携带的相关信息。这里有个问题需要注意一下,RabbitTemplate只允许设置一个callback方法,这时你可以将RabbitTemplate的bean设为单例然后设置回调。

这样的缺点是所有使用这个template的地方都会使用这个回调,那么当我们想要为不同的操作定制callBack该怎么做?如果直接在别的地方继续设置会报"Only one ConfirmCallback is supported by each RabbitTemplate"异常,这时候我们就需要将RabbitTemplate的作用域设为@Scope,这样每个bean都是一个新的。难道这样就可以了么?我们的service类一般都是单例的,这意味着当service类生成后,注入的RabbitTemplate就已经不变了,这个就是Single域的bean中注入Scope域bean的问题。一种解决方法是实现ApplicationAware接口注入ApplicationContext,每次使用RabbitTemplate时调用其getBean方法。一个更好的解决方案是使用spring提供的lookup方法。

spring会帮我们代理lookup注解的方法,每次调用都会返回一个全新的bean。但其实平常使用一般都会将发送方单独抽取出来实现回调接口,不会涉及上面的问题,一般都如下配置,注意将template配置成scope即可。

RabbitTemplate可以添加消息转换器,作用就类似于mvc中配置的@ResponseBody消息转换器。

具体如何发送与接收消息感觉不用咋说了。。。就send,receive(x,x,x)这个用IDE看一下方法doc就知道咋用了。receive为拉模式,很少使用,关于接收方法我们更常使用的是异步接收,即推模式,一般使用@RabbitListener 实现

当hello队列中有消息时,方法会自动调用。

像我们平常做web开发,前端想要接受来自后台的消息无非俩个方法,前台请求和后台推送,前台轮询一般就是ajax定时器,推送一般使用WebSocket实现,MQ同样有两种模式:轮询请求队列看是否有消息即拉模式,队列中有消息即对消费者进行通知即推模式。

对于拉模式,Spring提供了receive,receiveAndConvert,和receiveAndReply方法。接收并回复的方法很有用,比如订单系统,下单消息被MQ处理完后再返回消息给其他队列,告诉她这个订单已经完成,可以进行付费操作了。接收并回复调用template.receiveAndReply实现自己的接收回调。对于推模式,项目中基本上使用@RabbitListener注解完成,该注解结合@SendTo注解完成receiveAndReply功能,若没有sendto,这个方法是不允许有返回值的。对于异常情况,配置@RabbitListener的errorHandler和returnExceptions即可。关于@RabbitListener注解的具体使用其实也挺复杂的,推荐直接看文档。使用监听器的过程中消息是默认经过消息转换器的,可以手动为其设置消息转换器。关于RabbitMQ LIstener的配置可以使用Config方式或者SpringBoot的配置文件方式。

上面只是官方文档的一部分,其实除了Listener大部分Config方式的配置都可以用配置文件方式替代。

声明队列与交换机:分为xml方式和Java Config方式(懒得写了,这个基本官网就是复制粘贴)

配置Broker:Spring对其的抽象为RabbitAdmin,也是官网。。

延时队列实现:设置交换机延时属性为true,通过convertAndSend中的MessagePostProcessor实现发送延时消息,这个方法需要安装延时交换机这样的一个插件(也可以通过死信队列实现)

好了。今天就先写这么多,因为实在是写的太乱了,以后有时间整理一下。。。