支持 Apache RocketMQ 云厂商列表

支持Apache RocketMQ云厂商列表#

阿里云RocketMQ#

2016年阿里云对外提供RocketMQ云服务,是全球第一个提供RocketMQ云服务的厂商。阿里云RocketMQ提供了更为丰富了消息类型、 支持消息路由,同时在稳定性、安全控制等方面做了增强,数据可靠性 99.99999999%,服务可用性 99.95%,通过Region化、多可用区、 分布式集群化部署,确保服务高可用,即便整个机房不可用仍可正常提供消息服务。 目前应用使用开源版本RocketMQ客户端可以无缝访问阿里云RocketMQ服务,获取帮助文档


青云RocketMQ#

RocketMQ on QingCloud 提供可一键部署的 RocketMQ 集群服务,以 AppCenter 云应用的形式交付给用户使用。 在青云上,可以方创建和管理一个 RocketMQ 集群,也可以选择多种部署方式,从而满足您不同业务需求。 青云RocketMQ目前支持 Region 跨区的部署方式,实现同城多活,增强业务容灾能力; 另外集群支持横向与纵向在线伸缩,同时监控告警等功能,可通过网页控制台对集群进行可视化管理,使得管理集群非常方便。

开源客户端访问阿里云 RocketMQ 说明

开源客户端访问阿里云RocketMQ说明#

1 目标#

本文档主要帮助RocketMQ开源用户访问阿里云RocketMQ,以下场景可能会需要该文档:
* 云迁移场景:想从开源RocketMQ迁移到阿里云RocketMQ
* 混合云场景:应用需要同时访问私有云的开源RocketMQ和阿里云RocketMQ。
* 测试环境访问开源RocketMQ,线上环境访问阿里云RocketMQ。

2 开源Java客户端上云#

已经使用开源Java SDK进行生产的用户,只要参考一下方法,重新配置一下参数,即可实现无缝上云。

2.1 客户端版本#

从RocketMQ4.5.1版本开始,开源版本支持链接阿里云MQ。 下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.5.1/

2.2. 资源准备#

控制台获取已创建的资源包括Topic、GroupID,Endpoint,还有AK SK

2.3. 关键接口介绍#

  • AccessChannel

阿里云AliMQ和开源的RocketMQ默认使用的鉴权通道不同 本地自建使用:AccessChannel:LOCAL 上云连接AliMQ设置:AccessChannel:CLOUD

  • EndPoint

阿里云AliMQ使用接入点用来做nameserver的负载均衡并屏蔽了的具体IP地址,并且针对实例化用户通过在接入点前加入实例ID用于区分,因此要接入阿里云直接使用接入点即可。

  • ACL

阿里云提供一套完整的鉴权控制,开源版本的SDK提供了ACL功能,能够兼容阿里云的鉴权算法,这里只要配置正确的ak,sk既可以实现互联互通。

2.4 POM 依赖#

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.5.1</version>
</dependency>

2.5 参数设置部分代码示例#

Producer

//设置为云上创建的GID, 以及替换为自己的ak,sk
DefaultMQProducer producer = new DefaultMQProducer("GID-xxx", new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)));
//设置为自己的云上接入点
producer.setNamesrvAddr("http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
// 云上消息轨迹需要设置为CLOUD
producer.setAccessChannel(AccessChannel.CLOUD);
producer.start();
// 设置为云上创建的topic 名字
Message msg = new Message("xx-topic", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);

Consumer

//设置为云上创建的GID, 以及替换为自己的ak,sk
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID-xxx", new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)), new AllocateMessageQueueAveragely());
//设置为云上接入点
consumer.setNamesrvAddr("http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
// 云上消息轨迹需要设置为CLOUD方式
consumer.setAccessChannel(AccessChannel.CLOUD);
// 设置为云上创建的topic
consumer.subscribe("xx-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

2.6 Demo#

参考官网提供的example示例:https://github.com/apache/rocketmq/tree/master/example/src/main/java/org/apache/rocketmq/example

3 开源CPP客户端上云#

开源的客户端连接 阿里云AliMQ,需要重新修改部分配置信息即可,使用方式和默认配置保持兼容。

3.1. 客户端版本#

从rocketmq-clienr-cpp 1.2.2版本开始,开源版本支持链接阿里云MQ。 下载地址:https://github.com/apache/rocketmq-client-cpp/releases/tag/1.2.2

3.2. 资源准备#

控制台获取已创建的资源,Topic、GroupID,Endpoint,用户AK,SK,实例化用户还包括实例名InstanceID。

3. 3. 支持的平台#

linux:centos6.X, centos7.X, REHL6x, REHL7X darwin: macOS Mojave 10.14.X windows: win10

3.4. 关键接口参数介绍#

  • SessionCredentials

CPP客户端提供了访问阿里云默认的鉴权方式,

void setSessionCredentials(const std::string& input_accessKey,
const std::string& input_secretKey,
const std::string& input_channel);
此处前两个参数分别为从控制台上获取AK,SK,如果访问阿里云channel设置为"ALIYUN"
  • EndPoint

阿里云AliMQ将访问的name server地址以接入点的方式提供

void setNamesrvAddr(const std::string& namesrvAddr);

NameServer Address设置为控制台上获取的endpoint地址

  • GroupID(ProducerID/ConsumerID)

由于阿里云AliMQ将原来的ProducerID和ConsumerID合并,统一称作GroupID,开源客户端除了在Producer和Consumer实例化的时候,在构造函数里可以指定GID/PID/CID外,额外提供了设置GID的接口。

void setGroupName(const std::string& groupname);

针对实例化用户整理还需要将GID重新拼接一下格式InstanceID%GID,参考实例代码。 注意,此接口调用必须在实例start之前。

  • Topic

对于新用户,需要在控制台创建topic,对于需要迁移的用户,在控制台上创建相同的topic即可,这里不用修改,如果是申请的实例化用户,这里需要把实例ID拼接上,格式为InstanceID%Topic

3.5. 参数设置核心部分代码示例#

访问阿里云需要使用阿里云的鉴权方式和接入点,下面针对非实例化用户和实例化用户,分别列出需要修改的配置信息,其余的配置和开源RocketMQ的使用完全兼容。

  • 公网默认实例(非实例化)

Producer

//控制台上申请的GID或者PID
DefaultMQProducer producer("GID_XXXXX");
//如果上一步初始化的时候已经设置过,这步设置可以跳过
producer.setGroupName("GID_XXXXX");
//控制台上获取的接入点信息
producer.setNamesrvAddr("http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");

Consumer

//控制台上申请的GID或者CID
DefaultMQPushConsumer consumer("GID_XXXXX");
//如果上一步初始化的时候已经设置过,这步设置可以跳过
consumer.setGroupName("GID_XXXXX");
//控制台上获取的接入点信息
consumer.setNamesrvAddr("http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
consumer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
//设置订阅的topic和TAGs
consumer.subscribe("TOPIC","TAGS");
........
//注册处理消息的回调函数
consumer.registerMessageListener(&msglistener);
  • 实例化用户

由于实例化给用户提供了自己的命名空间,用于逻辑资源隔离,所以使用开源客户端访问时,需要显式的设置实例ID,即InstanceId,从控制台获取,格式为MQ_INST_XXXXX_XXXX

Producer

//控制台上申请的GID或者PID,需要和InstanceId拼装一下
DefaultMQProducer producer("MQ_INST_XXXXX_XXXX%GID_XXXXX");
//如果上一步初始化的时候已经设置过,这步设置可以跳过
producer.setGroupName("MQ_INST_XXXXX_XXXX%GID_XXXXX");
//控制台上获取的接入点信息,接入点包含了实例化信息
producer.setNamesrvAddr("http://MQ_INST_XXXX_XXXX、.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
...........
//发送消息的时候,Topic也需要指定InstanceId
MQMessage msg("MQ_INST_XXXXX_XXXX%YOURTOPIC", "TAG", "BODY");

Consumer

//控制台上申请的GID或者CID,需要和InstanceId拼装一下
DefaultMQPushConsumer consumer("MQ_INST_XXXXX_XXXX%GID_XXXXX");
//如果上一步初始化的时候已经设置过,这步设置可以跳过
consumer.setGroupName("MQ_INST_XXXXX_XXXX%GID_XXXXX");
//控制台上获取的接入点信息
consumer.setNamesrvAddr("http://MQ_INST_XXXX_XXXX、.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
consumer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
//设置订阅的topic和TAGs,由于实例化topic需要拼装一下
consumer.subscribe("MQ_INST_XXXXX_XXXX%TOPIC","TAGS");
........
//注册处理消息的回调函数
consumer.registerMessageListener(&msglistener);

3.6. Demo#

StepByStep使用方法,参考:https://github.com/apache/rocketmq-client-cpp/blob/master/README.md

4 开源Python客户端上云#

开源的客户端连接 阿里云AliMQ,需要重新修改部分配置信息即可,使用方式和默认配置保持兼容。

4.1. 客户端版本#

由于从rocketmq-client-cpp 1.2.2版本开始,开源版本支持链接阿里云MQ。因此Python客户端将CPP动态库依赖升级至1.2.2后,也可以无缝上云。 CPP SDK下载地址:https://github.com/apache/rocketmq-client-cpp/releases/tag/1.2.2

4.2. 资源准备#

控制台获取已创建的资源,Topic、GroupID,Endpoint,用户AK,SK,实例化用户还包括实例名InstanceID。

4.3. 支持的平台#

linux:centos6.X, centos7.X, REHL6x, REHL7X darwin: macOS Mojave 10.14.X

4.4. 关键接口参数介绍#

  • SessionCredentials

Python客户端提供了访问阿里云默认的鉴权方式,

SetProducerSessionCredentials(producerid,accessKey,secretKey,channel);
SetConsumerSessionCredentials(consumerid,accessKey,secretKey,channel);
此处前两个参数分别为从控制台上获取AK,SK,如果访问阿里云channel设置为"ALIYUN"
  • EndPoint

阿里云AliMQ将访问的name server地址以接入点的方式提供

SetProducerNameServerAddressr(producerid,endpoint);
SetConsumerNameServerAddressr(consumerid,endpoint);

NameServer Address设置为控制台上获取的endpoint地址

  • GroupID(ProducerID/ConsumerID)

由于阿里云AliMQ将原来的ProducerID和ConsumerID合并,统一称作GroupID,开源客户端除了在Producer和Consumer实例化的时候,在构造函数里可以指定GID/PID/CID外,不再额外提供了设置GID的接口。 针对实例化用户整理还需要将GID重新拼接一下格式InstanceID%GID,参考实例代码。

  • Topic

对于新用户,需要在控制台创建topic,对于需要迁移的用户,在控制台上创建相同的topic即可,这里不用修改,如果是申请的实例化用户,这里需要把实例ID拼接上,格式为InstanceID%Topic

4.5. 参数设置核心部分代码示例#

访问阿里云需要使用阿里云的鉴权方式和接入点,下面针对非实例化用户和实例化用户,分别列出需要修改的配置信息,其余的配置和开源RocketMQ的使用完全兼容。

  • 公网默认实例(非实例化)

    Producer

//控制台上申请的GID或者PID
producer = CreateProducer("GID_XXXX")
//控制台上获取的接入点信息
SetProducerNameServerAddress(producer,"http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80")
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
SetProducerSessionCredentials(producer,"mq acesskey", "mq secretkey", "ALIYUN");

Consumer

//控制台上申请的GID或者CID
consumer = CreatePushConsumer("GID_XXXX")
//控制台上获取的接入点信息
SetPushConsumerNameServerAddress(consumer,"http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80")
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
SetProducerSessionCredentials(consumer,"mq acesskey", "mq secretkey", "ALIYUN")
//设置订阅的topic和TAGs
Subscribe(consumer,"TOPIC","TAGS")
........
//注册处理消息的回调函数
RegisterMessageCallback(consumer, consumerMessage, None)
  • 实例化用户

由于实例化给用户提供了自己的命名空间,用于逻辑资源隔离,所以使用开源客户端访问时,需要显式的设置实例ID,即InstanceId,从控制台获取,格式为MQ_INST_XXXXX_XXXX

Producer

//控制台上申请的GID或者PID,需要和InstanceId拼装一下
producer = CreateProducer("MQ_INST_XXXXX_XXXX%GID_XXXXX")
//控制台上获取的接入点信息,接入点包含了实例化信息
SetProducerNameServerAddress(producer,"http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
SetProducerSessionCredentials(producer,"mq acesskey", "mq secretkey", "ALIYUN");
...........
//发送消息的时候,Topic也需要指定InstanceId
msg = CreateMessage("MQ_INST_XXXXX_XXXX%YOURTOPIC")

Consumer

//控制台上申请的GID或者CID,需要和InstanceId拼装一下
consumer = CreatePushConsumer("MQ_INST_XXXXX_XXXX%GID_XXXXX")
//控制台上获取的接入点信息
SetPushConsumerNameServerAddress(consumer,"http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80")
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
SetProducerSessionCredentials(consumer,"mq acesskey", "mq secretkey", "ALIYUN")
//设置订阅的topic和TAGs,由于实例化topic需要拼装一下
Subscribe(consumer,"MQ_INST_XXXXX_XXXX%TOPIC","TAGS")
........
//注册处理消息的回调函数
RegisterMessageCallback(consumer, consumerMessage, None)

4.6. Demo#

StepByStep使用方法,参考:https://github.com/apache/rocketmq-client-python/blob/master/doc/Introduction.md

5 RocketMQ Spring上云#

5.1. 版本#

RocketMQ Spring版本:2.0.3 RocketMQ客户端:4.5.1。 下载地址:https://github.com/apache/rocketmq-spring/releases/tag/rocketmq-spring-all-2.0.3

5.2. 资源准备#

控制台获取已创建的资源,Topic、GroupID,Endpoint,用户AK,SK,实例化用户还包括实例名InstanceID。

5.3 Pom 依赖#

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${RELEASE.VERSION}</version>
</dependency>

5.4. 参数设置#

参考开源Java客户端测设置,使用spring链接上云需要同样设置部分ACL参数和接入点。打开application.properties文件,修改参数即可

Producer

//控制台上获取的接入点信息
rocketmq.name-server=http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
//控制台上申请的GID或者PID
rocketmq.producer.group=GID_XXXXX
//用户的AK和SK,控制台上可以获取
rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK

Consumer

//控制台上获取的接入点信息
rocketmq.name-server=http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
//控制台申请的Topic资源
rocketmq.topic=TOPIC
//控制台上申请的GID或者PID
rocketmq.consumer.group=GID_XXXXX
//用户的AK和SK,控制台上可以获取
rocketmq.consumer.access-key=AK
rocketmq.consumer.secret-key=SK

5.5. Demo#

StepByStep使用方法,参考:https://github.com/apache/rocketmq-spring

6. 开源Go客户端上云#

开源的Go客户端使用cgo封装CPP SDK连接 阿里云AliMQ,因此只需要更新CPP动态库版本和重新修改部分配置信息即可,使用方式和默认配置保持兼容

6.1. 客户端版本#

由于从rocketmq-client-cpp 1.2.2版本开始,开源版本支持链接阿里云MQ。因此Go客户端将CPP动态库依赖升级至1.2.2后,也可以无缝上云。 CPP SDK下载地址:https://github.com/apache/rocketmq-client-cpp/releases/tag/1.2.2

6.2. 资源准备#

控制台获取已创建的资源,Topic、GroupID,Endpoint,用户AK,SK,实例化用户还包括实例名InstanceID。

6.3. 支持的平台#

linux:centos6.X, centos7.X, REHL6x, REHL7X darwin: macOS Mojave 10.14.X

6.4. 参数设置核心部分代码示例#

访问阿里云需要使用阿里云的鉴权方式和接入点,下面针对非实例化用户和实例化用户,分别列出需要修改的配置信息,其余的配置和开源RocketMQ的使用完全兼容。

  • 公网默认实例(非实例化)

Producer

producerConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
//控制台上申请的GID或者PID
GroupID: "GID_XXXXX",
//控制台上获取的接入点信息
NameServer: "http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80",
Credentials:&rocketmq.SessionCredentials{
AccessKey:"mq acesskey",
SecretKey:"mq acesskey",
Channel:"mq acesskey",
},
}}
result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "Topic", Body: msg})

Consumer

config := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
//控制台上申请的GID或者PID
GroupID: "GID_XXXXX",
//控制台上获取的接入点信息
NameServer: "http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80",
Credentials:&rocketmq.SessionCredentials{
AccessKey:"mq acesskey",
SecretKey:"mq acesskey",
Channel:"mq acesskey",
},
}}
consumer, err := rocketmq.NewPushConsumer(config)
//设置订阅的topic和TAGs, 注册回调
consumer.Subscribe("TOPIC", "TAGs", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
fmt.Printf("A message received: \"%s\" \n", msg.Body)
return rocketmq.ConsumeSuccess
})
........
  • 实例化用户

由于实例化给用户提供了自己的命名空间,用于逻辑资源隔离,所以使用开源客户端访问时,需要显式的设置实例ID,即InstanceId,从控制台获取,格式为MQ_INST_XXXXX_XXXX

Producer

producerConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
//控制台上申请的GID或者PID
GroupID: "MQ_INST_XXXXX_XXXX%GID_XXXXX",
//控制台上获取的接入点信息
NameServer: "http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80",
Credentials:&rocketmq.SessionCredentials{
AccessKey:"mq acesskey",
SecretKey:"mq acesskey",
Channel:"mq acesskey",
}
}}
......
result, err := producer.SendMessageSync(&rocketmq.Message{
Topic: "MQ_INST_XXXXX_XXXX%TOPIC",
Body: msg})

Consumer

config := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
//控制台上申请的GID或者PID
GroupID: "MQ_INST_XXXXX_XXXX%GID_XXXXX",
//控制台上获取的接入点信息
NameServer: "http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80",
Credentials:&rocketmq.SessionCredentials{
AccessKey:"mq acesskey",
SecretKey:"mq acesskey",
Channel:"mq acesskey",
}
}}
consumer, err := rocketmq.NewPushConsumer(config)
//设置订阅的topic和TAGs, 注册回调
consumer.Subscribe("MQ_INST_XXXXX_XXXX%TOPIC", "TAGs", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
fmt.Printf("A message received: \"%s\" \n", msg.Body)
return rocketmq.ConsumeSuccess
})
........

6.6. Demo#

StepByStep使用方法,参考:https://github.com/apache/rocketmq-client-go/blob/master/doc/Introduction.md

消息规模超千亿,同城艺龙的消息系统建设实践

消息规模超千亿,同城艺龙的消息系统建设实践#

同程艺龙的机票、火车票、汽车票、酒店相关业务已经接入了RocketMQ,用于流量高峰时候的削峰,以减少后端的压力。同时,对常规的系统进行解耦,将一些同步处理改成异步处理,每天处理的数据达1500亿条。 在近期的Apache RockeMQ Meetup上,同程艺龙机票事业部架构师查江,分享了同程艺龙的消息系统如何应对每天1500亿条的数据处理,通过此文,您将了解到:

  • 同程艺龙在消息方面的使用情况;
  • 消息在同程艺龙的应用场景;
  • 技术上踩过哪些坑;
  • 基于RocketMQ,做了哪些改进;

1 同程艺龙在消息方面的使用情况#

RocketMQ集群分为 Name Server 和Broker两部分,Name Server用的是双主模式,一个是考虑性能,另一个考虑安全性。 在纯数据的Broker分成很多组,每个组里面分为Master和Slave。目前,我们的机票、火车票、汽车票、酒店相关业务已经接入了RocketMQ,用于流量高峰时候的削峰,以减少后端的压力。同时,对常规的系统进行解耦,将一些同步处理改成异步处理,每天处理的数据达1500亿条。 选择RocketMQ的原因是:

  • 接入简单,引入的Java包比较少;
  • 纯Java开发,设计逻辑比较清晰;
  • 整体性能比较稳定的,Topic数量大的情况下,可以保持性能;

2 消息在同程艺龙的应用场景#

2.1退订系统#

这个是我们退订系统中的一个应用场景。用户点击前端的退订按钮,系统调用退订接口,再去调用供应商的退订接口,从而完成一个退订功能。

如果供应商的系统接口不可靠,就会导致用户退订失败,如果系统设置为同步操作,会导致用户需要再去点一次。所以,我们引入RocketMQ将同步改为异步,当前端用户发出退订需求,退订系统接收到请求后就会记录到退订系统的数据库里面,表示这个用户正在退订,同时通过消息引擎把这条退订消息发送到和供应商对接的系统,去调用供应商的接口。

如果调用成功,就会把数据库进行标识,表示已经退订成功。同时,加了一个补偿的脚本,去数据库捞那些未退订成功的消息,重新退订,避免消息丢失引起的退订失败的情况。

2.2 房仓系统#

第二个应用场景是我们的房仓系统。这是一个比较常规的消息使用场景,我们从供应商处采集一些酒店的基本信息数据和详情数据,然后接入到消息系统,由后端的分销系统、最小价系统和库存系统来进行计算。同时当供应商有变价的时候,变价事件也会通过消息系统传递给我们的后端业务系统,来保证数据的实时性和准确性。

2.4 供应库的订阅系统#

数据库的订阅系统也用到了消息的应用。一般情况下做数据库同步,都是通过binlog去读里面的数据,然后搬运到数据库。搬运过程中我们最关注的是数据的顺序性,因此在数据库row模式的基础上,新增了一个功能,以确保每一个Queue里面的顺序是唯一的。

虽然Queue里面的顺序天然都是唯一的,但我们在使用上有一个特点,就是把相同ID的消息都是放在同一个Queue里面的。例如,图中右上角id1的消息,数据库主字段是id1,就统一放在Queue1里面,而且是顺序的。在Queue2里,两个id3之间被两个顺序的id2间隔开来了,但实际消费读出来的时候,也会是顺序的,由此,可以用多队列的顺序来提高整体的并发度。

3 我们踩过哪些坑#

3.1 供应商系统的场景#

上图中,一个MQ对应有两个消费者,他们是在同一个Group1中,起初大家都只有Topic1,这时候是正常消费的。但如果在第一个消费者里面加入一个Topic2,这时候是无法消费或消费不正常了。这是RocketMQ本身的机制引起的问题,需要在第二个消费者里面加入Topic2才能正常消费。

3.2 支付交易系统的场景#

另外一个是支付交易系统,这个场景下也是有两个应用,他们都是在同一Group和同一Topic下,一个是消费Tag1的数据,另一个是消费Tag2的数据。正常情况下,启动应该是没问题的,但是有一天我们发现一个应用起不来了,另外一个应用,因为他只消费Tag2的数据,但是因为RocketMQ的机制会把Tag1的数据拿过来,拿过来过后会把Tag1的数据丢弃。这会导致用户在支付过程中出现支付失败的情况。 对此,我们把Tag2放到Group2里面,两个Group就不会消费相同的消息了。个人建议RocketMQ能够实现一个机制,即只接受自己的Tag消息,非相关的Tag不去接收。

3.3 大量老数据读取的场景#

在火车票消费的场景中,我们发现有200亿条老数据没有被消费。当我们消费启动的时候,RocketMQ会默认从第0个数据开始读,这时候磁盘IO飙升到100%,从而影响到其他消费端数据的读取,但这些老数据被加载后后,并没有实际作用。因此,对于大量老数据读取的改进方式是:

对于新消费组,默认从LAST_OFFSET消费; Broker中单Topic堆积超过1000万时,禁止消费,需联系管理员开启消费; 监控要到位,磁盘IO飙升时,能立刻联系到消费方处理;

3.4 服务端的场景#

CentOS 6.6中 Futex Kernel bug, 导致Name Server, Broker进程经常挂起,无法正常工作;

解决方法:升级到6.7

服务端2个线程会创建相同CommitLog放入List,导致计算消息offset错误,解析消息失败,无法消费,重启没法解决问题;

解决方法:线程安全问题,改为单线程

Pull模式下重置消费进度,导致服务端填充大量数据到Map中,broker cpu使用率飙升100%.

解决方法: Map局部变量场景用不到,删除

Master建议客户端到Slave消费时,若数据还没同步到Slave, 会重置pullOffset, 导致大量重复消费;

解决方法:不重置offset

同步没有MagicCode,安全组扫描同步端口时,Master解析错误,导致一些问题;

解决方法:同步时添加magicCode校验

4 基于RocketMQ,我们做了哪些改进#

4.1 新增客户端#

新增.net客户端,基于Java源代码原生开发; 新增HTTP客户端,实现部分功能,并通过Netty Server连接RocketMQ;

4.2 新增消息限流功能#

如果客户端代码写错产生死循环,就会产生大量的重复数据,这时候会把生产线程打满,导致队列溢出,严重影响我们MQ集群的稳定性,从而影响其他业务。

上图是限流的模型图,我们把限流功能加在Topic之前。通过限流功能可以设置rate limit和size limit等。其中rate limit是通过令牌桶算法来实现的,即每秒往桶里放多少个令牌,每秒就消费多少速度,或者是往Topic里写多少数据。以上的两个配置是支持动态修改的。

4.3 后台监控#

我们还做了一个监控后台,用于监控消息的全链路过程,包括

o 消息全链路追踪,覆盖消息产生、消费、过期整个生命周期; o 消息生产、消费曲线; o 消息生产异常报警; o 消息堆积报警,通知哪个IP消费过慢

4.4 其他功能#

o HTTP方式生产,消费消息; o Topic消费权限设置,Topic只能被指定Group消费,防止线上错乱订阅; o 支持新消费组从最新位置消费 (默认是从第一条开始消费); o 广播模式消费进度同步 (服务端显示进度);

平安银行在开源技术选型上的思考

平安银行在开源技术选型上的思考#

随着互联网金融业务和相关技术的不断发展,传统金融行业为满足业务快速发展需求,正在积极引入各类开源技术,以快速抢占市场。那么,以金融和科技作为双驱动的平安银行在开源技术的引入方面是如何评估,运用到哪些业务场景,以及面对复杂的网络环境,是如何去部署的呢? 本文将以 Apache RocketMQ 为例,和您一起了解平安银行在开源技术选型方面的思考和实践。

1 RocketMQ 在平安银行的应用场景#

目前,平安银行通过 RocketMQ 解决了数据预加、信息通知和状态变化等方面的业务需求,接下来,我们通过 App 登录、资产总览和工资理财 3 个应用场景来展开讲下。

1 App 登录。当用户打开平安银行 App 的时候,系统会根据用户的登录 ID 去加载相应的用户数据,比如银行卡、信用卡和理财产品等,以及一些系统通知。这个场景下,我们用到了 RocketMQ 的异步处理能力,即预加载可能需要使用的数据,提升用户体验。

2 资产总览。进入平安银行 App 资产总览的页面,页面显示账户余额、各类理财产品(黄金、基金和股票等),以及贷款等方面的信息。平安银行对接了各类基金、黄金和股票等来自不同金融主体、不同系统的数据,具有种类多、异构系统多和变化快的特点。我们的目标就是让资产总览数据尽可能准确,不同种类的资产变动的时候发出通知,资产系统收到通知后,根据变化的数据来计算出用户当前的资产总览。

3 工资理财。工资理财是指每月工资下发到银行卡后,系统可以实现自动买入用户设置好的理财产品,例如买一些定投类的理财产品。这里信息的传递流程是:

• 银行卡里的余额出现变动,系统把变动信息发送到消息引擎; • Consumer 端进行消费,通知用户账户已经出现变化; • 系统判断变动是否来源于代发工资; • 如果是,系统会再发送一条消息; • 理财的 Consumer 进行消费; • 判断现在是不是应该买入工资理财类的产品; • 如果是,自动买入用户设定好的理财产品; • 自动买入之后,余额再次变动,系统会再一次通知,这一次通知,判断的就是一些其他的逻辑了。

那么,在这些场景中,我们对消息引擎会有哪些要求呢?

A、高可用、高可靠和高性能,这是金融行业引入开源技术的基本要求; B、堆积能力,代发工资的用户很多,一个公司的员工会在某个时间点集中发放; C、顺序能力,即账户变动在先,发出通知在后; D、事务性能力,如果没有事务性,有可能会出现账户变动了,但没有触发购买理财产品的情况; E、重试和延迟消息功能,比如工资发放的时候,可能是在晚上,这时候系统会自动把购买理财的动作放在第二天白天去操作,再发出通知; F、消息回溯能力,就是出现问题的时候,我们可以把这个消息进行回溯,重新进行消息的处理,提供完整的消息轨迹;

在技术选型的过程中,RocketMQ 符合我们在这些典型使用场景中对消息产品的需求,在引入的过程中,平安银行也对社区做了相应的贡献。

2 复杂网络环境下的部署实践#

2.1 多测试子环境下的服务调用场景#

平安银行有多套测试子环境,用来对不同的feture进行测试,即图中的 FAT、FAT001、FAT002、FAT003等。传统银行系统由大型机时代向更面向互联网用户的零售时代转型过程中,不可避免微服务化,传统较集中式系统会被划分为较多的微服务,正常的情况如下图,服务 A 调用服务 B,服务 B 调用服务 C,服务 C 调用服务 D。

随着业务的需求,假设新的 feature,我们需要对服务 A 和 B 进行改动。相比在FAT环境里去部署测试,更合适的方式是另起一套 FAT 环境,这里我们命名为 FAT001,把服务A和B部署上去,A 调用 B,B会调用原来 FAT 环境里的 C 和 D。

此时,另一个新的需求,需要对服务 A 和 C 进行改动。如果直接发布到 FAT 或 FAT001 肯定是不对的,会影响正在进行的测试,此时,我们会再起一套测试环境,命名为FAT002,发布服务 A 和 C。由于 FAT002 里没有服务 B,所以服务A要调用服务 B 就需要去 FAT 环境(FAT 定义为较稳定的测试子环境)。服务 B 调用服务 C 的时候,就不应该调用本环境的 C了,而是调动 FAT002 的 C 才能实现测试功能。

再假设,系统同时还会有另外一个 feature 在测试 C 和 D,此时的调用逻辑也是一样的,系统调用服务 A 和 B 的时候是在 FAT,调用服务 C 和 D 的时候会到 FAT003 的环境。

以上的服务调用场景是可以通过微服务框架解决的,进行全链路测试,但在生产环境中,用户的真实请求比测试环境中会更复杂一些。

我们看一下真实的用户请求。 APP发起一个请求请求,进入网关,需要知道请求哪一个测试环境。通常的做法是:测试人员需要在APP上选好子环境,例如选择 FAT001,我们可以直接把请求 FAT001 的网关(每个环境网关单独部署),或者在requestheader上添加标识,让网关去区分它来源于哪一个环境(网关统一部署)。假如网关判断请求来源于 FAT002,那就会把分发给 FAT002环境进行处理。

消息层面,如何处理这类用户请求?

以上是服务调用的请求路径,比较好理解,但到了消息队列上,问题会变得复杂些,假如一个 feture 只是更改了消费者,那如何把这条消息传递到改的消费者应用上去进行测试,而不被其它环境的消费者消费掉,这是我们需要去考虑的问题。

来看下具体的情况,集群部署了 Broke A 和 Broke B,TopicA 分别部署在这两个Broker上。 此时,Producer Group A向 Topic A 中写数据,Consumer Group A去消费,这种情况下是不会有任何问题。

但如果新增一套 FAT001 的环境,要求 FAT001 发布的消息,只能由 FAT001 来消费,FAT 不能消费,这种情况下我们该怎么解决?

• 在消息上面加一些路由、或是加一些Tag、Filter、消息的Property? 这些都不能解决我们的问题。

• 每个子环境部署一套 RocketMQ? 一方面成本太高,另一方面如果这个feture测试完成了,需要把相关的应用再切回 FAT 进行处理,实现太过复杂。

我们想一下,多个 feture 同时进行测试,DB 会部署一套还是多套?

首先一个 feture 不会更改所在的应用,一般来说 DB 是部署一套的,在数据库里面添加字段,来识别数据是来源于哪一个子环境,如果多套部署,不更改的应用取不到新部署的 DB 数据,无法进行全链路测试,所以同样的,我们也没有在每个子环境都部署一套 RocketMQ,而是部署统一部署,通过 微服务RPC 路由把请求路由到正确的生产者集,改造消息路由算法把消息路由到正确的消费者进行处理。

真实的用户请求过程

在上图中的场景下,默认的场景 FAT发送,FAT 消费 ,没有问题的,假设 FAT001 的生产者发布了,需要 FAT001 发送到MQ集群,FAT 是可以直接消费。

在这个场景下,如果消费者在 FAT001也部署了应用,需要FAT消费者不能消费由FAT001产生的消息,而是由 FAT001的消费者去消费。我们的处理方式是在逻辑上把Topic A下面的Queue进行分组,相当于加了逻辑分组,如果消费者在 FAT001 有部署,我们会把 Queue 的分组扩大,在其默认设置的情况下再翻一倍,新增的 Queue 就是给到 FAT001 进行消费的。

再来看看另一个场景,如上图。

假设有个feature只需要更改消费者,部署在 FAT001。也是可以通过逻辑分组的方式,实现生产者根据请求来源来发送消息到队列 FAT001 逻辑分组内的 Queue,从而只让部署在 FAT001 的消费者消费。

通过以上 3 个场景,我们了解到添加逻辑分组的必要性,实现过程并不复杂。主要做了以下调整:🤔️

• 这个逻辑分组什么时候建立? 新建 Topic 的时候,全部建好?还是 Consumer 上线/下线的时候动态的去进行调整?

我们选择了动态创建的方式,这个过程中,我们添加了 Meta Server 进行元数据管理,进行动态创建:

• 添加 Meta Service,管理的元数据包括 Producer、Consumer、Topic、Queue 和 Subenv等信息: • 调整 Producer,取Request Head 里面请求来源(FAT、FAT001、FAT002...),如果 Topic 对应的存在分组,选择分组的 Queue,否则发到默认分组呢的Queue; • 调整 Consumer,上线时判断应用部署的分组(FAT、FAT001、FAT002...),如果Topic不存在对应的分组,则新建;存在,则 rebalalce (新Consumer节点上线),下线时,判断该分组是否还存在 其它Consumer实例,若不存在,删除分组,否则 rebalalce(Consumer某一节点下线);

2.2 多隔离区场景下的部署实践#

由于对安全上的重视,金融行业的网络环境相比其他行业会更复杂。整个隔离区主要分为以下几个部分:

• DMZ 区: 外网可以直接访问,用于放置网关;

• Web 区: 面向的是用户手机,或者网页上可以看到的功能应用;

• 核心区: 包含核心的调用逻辑功能,和交易、订单相关的核心应用,例如 DB 和存储;

• 外联区: 用于访问外网,有时候也会部署一些 Poxy 代理,因为内网不能直接访问外网,需要通过代理去访问外网;

• 专用区域: 对接基金、三方存管等外部系统。在金融行业,如果某个系统是闭环的,那必须要去做隔离;

• 管理区: 是指对整个区域的应用要进行集中管理,且数据流动是单向的,只能取数据,不能通过管理区把某区域的数据推送到另一区域。

此外,从安全的角度出发,所有的区域在生产环境下都通过防火墙进行隔离,这就给我们部署 RocketMQ 带来了很大的实施难度。如果只部署一套,面对这么多的防火墙,生产者消费者到集群的的流量跨墙,会给网络带来极大的不稳定,遇到瓶颈,防火墙几乎无法在线平滑扩容;如果每个子环境都部署一套,又带来运维复杂度,而且还是存在数据同步和跨墙消费的问题。

最终,我们采用的是折中的办法,即统一部署加分隔离区部署,这样做的益处是:☺️

• 防火墙是开大策略,保证安全的前提下,符合监管要求; • 针对跨隔离区消费的问题,我们采用复制的方式,模拟消费者重新写入目标集群;

2.3 多IDC场景下的部署实践#

同城多IDC,可以认为近似局域网,比较好处理,但异地多IDC多活场景,目前我们还没有特别好的解方案,多活不可避免面临数据分片、数据合并和数据冲突的解决等问题。

如果 Topic 下数据有多活需求,我们暂时通过复制方式来处理。但这类手工模拟消费者消费数据写入新集群的复制方式,会存在一些问题,即复制到另一个集群之后 offset 会改变,处理逻辑就会有偏差。我们通过 pull 的方式自定义的去根据 offset 去进行消费。当故障转移到新的集群需要去消费的时候,需要获取到新集群里面正确的offset 值。此时,这个值和之前集群里的已经不一样了,需要根据时间得到新集群里正确的offset 值,再去进行消费。在没有更好的解决方案前,治理就显得很关键了。 不过,我们注意到,在 RocketMQ 最新发布的版本里,提供了 DLedger 的特性,DLedger 定位的是一个工业级的 Java Library,可以友好地嵌入各类 Java 系统中,满足其高可用、高可靠、强一致的需求。我们会尽快对这一特性进行集成和测试。

3 改造实践和遇到的小插曲#


我们在对 RocketMQ 的使用过程中,添加了以下功能或特性:🚀 A. 为生产者提供消息的堆积能力。 B. 将所有配置管理部署到配置中心,并做云端化处理,以满足动态修改的需求。 C. 在 4.3 版本还未提供事务处理能力前,我们在本地数据库里去建一张消息表,数据库更改数据状态的时候,会同步将数据写入消息表。若发送失败,则进行补偿。并在客户端里进行封装。 D. 实现统一的消息者幂等处理。 E. 添加身份认证和消息认证(注:RocketMQ 4.3 版本中已经实现身份认证功能)

当然,也遇到了一些小插曲,基本都是使用上的小问题,可能大家也会碰到:🚧 A. 一个应用使用多个RocketMQ集群时,未加载到正确的配置。在 Client 端,如果没有对 instancename 进行配置,一个应用连多个集群会失败。 B. 在大数据的场景下,内存溢出。订阅的 Topic 越多,每个 Queue 在本地缓存的 message 也会越多,默认每个queue 1000条,可能会把内存打爆,可根据实际情况调整。 C. 删除数据时IO抖动,默认每天凌晨4点删除数据,量上来后出现 IO 抖动,配置了消息删除策略,默认逗号分隔开,多配几个时间删除就可以了。 D. Broker上日志报延迟消息找不到数据文件。在主备切换的演练过程中,出现了延迟消息在 Broker 上处理异常的情况。当主切换到备端时,延迟消息在 Broker 上保存的文件被自动删除,再切回主,由于延时消息的元数据感觉在,会查询文件进行处理,此时已找不到文件。 E. 挂 NAS 的时候,IP 获取了 NAS 的私网地址,并被提交给了服务端

以上就是我们在部署过程中遇到的一些小插曲,基本都是可以很快定位原因,解决的。

总的来看,RocketMQ 对平安银行的消息系统建设的帮助是非常大的,尤其是满足了数据持久化、顺序消费和回溯的需求,此外,在消息的查询方面,也比我们之前使用的消息引擎好很多。最后再分享一点自己对中间件的一点感悟:中间件使用重在治理,规范不先行,开发两行泪。

放弃 Kafka,滴滴出行选择基于 RocketMQ 构建企业级消息队列服务

放弃Kafka,滴滴出行选择基于RocketMQ构建企业级消息队列服务#

摘要:本文整理自滴滴的江海挺在2018年9月1日Apache RocketMQ开发者沙龙北京站的分享。江海挺是Apache RocketMQ Contributor,在北大信科毕业后,一直在做消息队列相关的服务,在消息队列方面积累了丰富的经验。

本文的主要内容包括以下几个方面:

  1. 滴滴的消息技术选型
  2. 为什么选择RocketMQ
  3. 如何构建自己的消息队列服务
  4. RocketMQ扩展改造
  5. RocketMQ使用经验

1. 滴滴的消息技术选型#

1.1 消息历史#

如图,初期公司内部没有专门的团队维护消息队列服务,所以消息队列使用方式较多,主要以kafka为主,有业务直连的,也有通过独立的服务转发消息的。另外有一些团队也会用 RocketMQ、Redis的list,甚至会用比较非主流的beanstalkkd。导致的结果就是,比较混乱,无法维护,资源使用也很浪费。

1.2 弃用kafka#

一个核心业务在使用kafka的时候,出现了集群数据写入抖动非常严重的情况,经常会有数据写失败。

主要有两点原因:

  1. 随着业务增长,topic的数据增多,集群负载增大,性能下降。
  2. 我们用的是kafka 0.8.2那个版本,有个bug,会导致副本重新复制,复制的时候有大量的读,我们存储盘用的又是机械盘,导致磁盘IO过大,影响写入。

所以我们决定做自己的消息队列服务。

首先需要解决上面的解决业务方消息生产失败的问题。因为这个kafka用的是发布/订阅模式,一个topic的订阅方会有很多,涉及到的下游业务也就非常多,没办法一口气直接替换kafka,迁移到新的一个消息队列服务上。 所以我们当时的方案是加了一层代理,然后利用codis作为缓存,解决了kafka不定期写入失败的问题,如上图。 就是当后面的kafka出现不可写入的时候,我们就会先把数据写入到codis中,然后延时进行重试,直到写成功为止。

1.3 选择RocketMQ#

经过一系列的调研和测试之后,我们决定采用RocketMQ。具体原因在后面会介绍。

为了支持多语言环境、解决一些迁移和某些业务的特殊需求,我们又在消费侧加上了一个代理服务。

然后形成了这么一个核心框架。业务端只跟代理层交互。中间的消息引擎,负责消息的核心存储。

在之前的基本框架之后,我们后面就主要围绕三个方向做。

一个是迁移。把之前提到的所有五花八门的队列环境,全部迁移到我们上面。这里面的迁移方案后面会跟大家介绍一下。

第二个就是功能迭代和成本性能上的优化。

最后一个重点就是服务化,业务直接通过平台界面来申请资源,申请到之后直接使用。

1.4 演进中的架构#

这张图是我们消息队列服务的一个比较新的现状。

先纵向看,上面是生产的客户端,包括了7种语言。然后是我们的生产代理服务。

在中间的是我们的消息存储层。目前主要的消息存储引擎是RocketMQ。然后还有一些在迁移过程中的Kafka。还有一个chronos,它是我们延迟消息的一个存储引擎。

再下面就是消费代理。

消费代理同样提供了多种语言的客户端。然后还支持多种协议的消息主动推送功能。包括HTTP 协议 RESTful方式。结合我们的groovy脚本功能,还能实现将消息直接转存到redis、hbase和hdfs上。更多的下游存储,我们都在陆续接入。

除了存储系统之外,我们还对接了实时计算平台,像Flink,Spark,Storm这些平台,我们也都提供了支持。

左边是我们的用户控制台和运维控制台。这个是我们服务化的重点。

用户在需要使用队列的时候,就通过界面申请topic,填写各种信息,包括身份信息,消息的峰值流量,消息大小,消息格式等等。

然后消费方,通过我们的界面,就可以申请消费。

运维控制台,主要负责我们集群的管理,自动化部署,流量调度,状态显示之类的功能。

最后所有运维和用户操作会影响线上的配置,都会通过zookeeper进行同步。

2. 为什么选择RocketMQ#

因为从实际测试结果来看,RocketMQ的效果更好。

主要围绕两个测试进行。

2.1 测试-topic数量的支持#

如下图所示,测试环境:

Kafka 0.8.2

RocketMQ 3.4.6

1.0 Gbps Network

16 threads

这张图是Kafka和RocketMQ在不同topic数量下的吞吐测试。横坐标是每秒消息数,纵坐标是测试case。同时覆盖了有无消费,和不同消息体的场景。一共8组测试数据,每组数据分别在topic个数为16、32、64、128、256时获得的,每个topic包括8个partition。下面四组数据是发送消息大小为128字节的情况,上面四种是发送2k消息大小的情况。on 表示消息发送的时候,同时进行消息消费,off表示仅进行消息发送。

先看最上面一组数据,用的是kafka,开启消费,每条消息大小为2048字节。可以看到,随着topic数量增加,到256 topic之后,吞吐极具下。 可以先看最上面的一组结果,用的是Kafka,开启消费,每条消息是2kb(2048)。可以看到,随着topic数量增加,到256个topic之后,吞吐急剧下降。

第二组是是RocketMQ。可以看到,topic增大之后,影响非常小。

第三组和第四组,是上面两组关闭了消费的情况。结论基本类似,整体吞吐量会高那么一点点。

下面的四组跟上面的区别是使用了128字节的小消息体。可以看到,kafka吞吐受topic数量的影响特别明显。对比来看,虽然topic比较小的时候,RocketMQ吞吐较小,但是基本非常稳定,对于我们这种共享集群来说比较友好。

2.2 测试-延迟#

  • Kafka

测试环境:

Kafka 0.8.2.2

topic=1/8/32

Ack=1/all,replica=3

测试结果:如下图

(横坐标对应吞吐,纵坐标对应延迟时间)

上面的一组的3条线对应ack=3,需要3个备份都确认后才完成数据的写入。

下面的一组的3条线对应ack=1,有1个备份收到数据后就可以完成写入。

可以看到下面一组只需要主备份确认的写入,延迟明显较低。

每组的三条线之间主要是topic数量的区别,topic数量增加,延迟也增大了。

  • RocketMQ

测试环境:

RocketMQ 3.4.6

brokerRole=ASYNC/SYNC_MASTER, 2 Slave

flushDiskType=SYNC_FLUSH/ASYNC_FLUSH

测试结果:如下图

上面两条是同步刷盘的情况,延迟相对比较高。下面的是异步刷盘。

橙色的线是同步主从,蓝色的线是异步主从。

然后可以看到在副本同步复制的情况下,即橙色的线,4w的tps之内都不超过1ms。用这条橙色的线和上面Kafka的图中的上面三条线横向比较来看,kafka超过1w tps 就超过1ms了。kafka的延迟明显更高。

3. 如何构建自己的消息队列服务#

3.1 问题与挑战#

面临的挑战(顺时针看):

  • 客户端语言。需要支持PHP、GO、Java、C++。
  • 只有3个开发人员。
  • 决定用RocketMQ,但是没看过源码。
  • 上线时间紧,线上的kafka还有问题。
  • 可用性要求高。

使用RocketMQ时的两个问题:

  • 客户端语言支持不全,它主要支持Java,而我们还需要支持PHP、Go、C++,RocketMQ目前提供的Go的sdk我们测的有一些问题。
  • 功能特别多,如tag、property、消费过滤、RETRY topic、死信队列、延迟消费之类的功能,非常丰富。但是这个对我们稳定性维护来说,挑战非常大。

解决办法,如下图所示:

  • 使用Thrift RPC框架来解决跨语言的问题。

  • 简化调用接口。可以认为只有两个接口,send用来生产,pull用来消费。

主要策略就是坚持KISS原则(Keep it simple, stupid),保持简单,先解决最主要的问题,让消息能够流转起来。

然后我们把其他主要逻辑都放在了proxy这一层来做,比如限流、权限认证、消息过滤、格式转化之类的。这样,我们就能尽可能地简化客户端的实现逻辑,不需要把很多功能用各种语言都写一遍。

3.2 迁移方案#

架构确定后,接下来是我们的一个迁移过程。

迁移这个事情,在pub-sub的消息模型下,会比较复杂。因为下游的数据消费方可能很多,上游的数据没法做到一刀切流量,这就会导致整个迁移的周期特别长。然后我们为了尽可能地减少业务迁移的负担,加快迁移的效率,我们在proxy层提供了双写和双读的功能。

  • 双写:Procucer Proxy同时写RocketMQ和kafka。
  • 双读:Consumer Proxy同时从RocketMQ和kafka消费数据。

有了这两个功能之后,我们就能提供以下两种迁移方案了。

3.2.1 双写#

生产端双写,同时往kafka和rocketmq写同样的数据,保证两边在整个迁移过程中都有同样的全量数据。kafka和RocketMQ有相同的数据,这样下游的业务也就可以开始迁移。

如果消费端不关心丢数据,那么可以直接切换,切完直接更新消费进度。

如果需要保证消费必达,可以先在Consumer Proxy设置消费进度,消费客户端保证没有数据堆积后再去迁移,这样会有一些重复消息,一般客户端会保证消费处理的幂等。

生产端的双写其实也有两种方案:

  1. 客户端双写,如下图:

业务那边不停原来的kafka 客户端。只是加上我们的客户端,往RocketMQ里追加写。

这种方案在整个迁移完成之后,业务还需要把老的写入停掉。相当于两次上线。

  1. Producer Proxy双写,如下图:

业务方直接切换生产的客户端,只往我们的proxy上写数据。然后我们的proxy负责把数据复制,同时写到两个存储引擎中。

这样在迁移完成之后,我们只需要在proxy上关掉双写功能就可以了。对生产的业务方来说是无感知的,生产方全程只需要改造一次,上一下线就可以了。

所以表面看起来,应该还是第二种方案更加简单。但是,从整体可靠性的角度来看,一般还是认为第一种相对高一点。因为客户端到kafka这一条链路,业务之前都已经跑稳定了。一般不会出问题。但是写我们proxy就不一定了,在接入过程中,是有可能出现一些使用上的问题,导致数据写入失败,这就对业务方测试质量的要求会高一点。

然后消费的迁移过程,其实风险是相对比较低的。出问题的时候,可以立即回滚。因为它在老的kafka上消费进度,是一直保留的,而且在迁移过程中,可以认为是全量双消费。

以上就是数据双写的迁移方案,这种方案的特点就是两个存储引擎都有相同的全量数据。

3.2.2 双读#

特点:保证不会重复消费。对于p2p 或者消费下游不太多,或者对重复消费数据比较敏感的场景比较适用。

这个方案的过程是这样的,消费先切换。全部迁移到到我们的proxy上消费,proxy从kafka上获取。这个时候rocketmq上没有流量。但是我们的消费proxy保证了双消费,一旦rocketmq有流量了,客户端同样也能收到。

然后生产方改造客户端,直接切流到rocketmq中,这样就完成了整个流量迁移过程。

运行一段时间,比如kafka里的数据都过期之后,就可以把消费proxy上的双消费关了,下掉kafka集群。

整个过程中,生产直接切流,所以数据不会重复存储。然后在消费迁移的过程中,我们消费proxy上的group和业务原有的group可以用一个名字,这样就能实现迁移过程中自动rebalance,这样就能实现没有大量重复数据的效果。

所以这个方案对重复消费比较敏感的业务会比较适合的。

这个方案的整个过程中,消费方和生产方都只需要改造一遍客户端,上一次线就可以完成。

4. RocketMQ扩展改造#

说完迁移方案,这里再简单介绍一下,我们在我们的rocketmq分支上做的一些比较重要的事情。

首先一个非常重要的一点是主从的自动切换。熟悉RocketMQ的同学应该知道,目前开源版本的RocketMQ broker 是没有主从自动切换的。如果你的master挂了,那你就写不进去了。然后slave只能提供只读的功能。

当然如果你的topic在多个主节点上都创建了,虽然不会完全写不进去,但是对单分片顺序消费的场景,还是会产生影响。

所以呢,我们就自己加了一套主从自动切换的功能。

第二个是批量生产的功能。RocketMQ 4.0之后的版本是支持批量生产功能的。但是限制了,只能是同一个ConsumerQueue的。这个对于我们的proxy服务来说,不太友好,因为我们的proxy是有多个不同的topic的,所以我们就扩展了一下,让它能够支持不同topic、不同consume queue。原理上其实差不多,只是在传输的时候,把topic和consumerqueue的信息都编码进去。

第三个,目前RocketMQ 单机能够支持的topic数量,基本在几万这么一个量级,在增加上去之后,元信息的管理就会非常耗时,对整个吞吐的性能影响相对来说就会非常大。 然后我们有个场景又需要支持单机百万左右的topic数量,所以我们就改造了一下元信息管理部分,让RocketMQ单机能够支撑的topic数量达到了百万。

后面一些就不太重要了,比如集成了我们公司内部的一些监控和部署工具,修了几个bug,也给提了PR。最新版都已经修掉了。

5. RocketMQ使用经验#

接下来,再简单介绍一下,我们在 RocketMQ在使用和运维上的一些经验。主要是涉及在磁盘IO性能不够的时候,一些参数的调整。

5.1 读老数据的问题#

我们都知道,RocketMQ的数据是要落盘的,一般只有最新写入的数据才会在PageCache中。比如下游消费数据,因为一些原因停了一天之后,又突然起来消费数据。这个时候就需要读磁盘上的数据。

然后RocketMQ的消息体是全部存储在一个append only的 commitlog 中的。如果这个集群中混杂了很多不同topic的数据的话,要读的两条消息就很有可能间隔很远。最坏情况就是一次磁盘IO读一条消息。这就基本等价于随机读取了。如果磁盘的IOPS(Input/Output Operations Per Second)扛不住,还会影响数据的写入,这个问题就严重了。

值得庆幸的是,RocketMQ提供了自动从Slave读取老数据的功能。这个功能主要由slaveReadEnable这个参数控制。默认是关的(slaveReadEnable = false by default)。推荐把它打开,主从都要开。

这个参数打开之后,在客户端消费数据时,会判断,当前读取消息的物理偏移量跟最新的位置的差值,是不是超过了内存容量的一个百分比(accessMessageInMemoryMaxRatio = 40 by default)。如果超过了,就会告诉客户端去备机上消费数据。如果采用异步主从,也就是brokerRole 等于ASYNC_AMSTER的时候,你的备机IO打爆,其实影响不太大。但是如果你采用同步主从,那还是有影响。所以这个时候,最好挂两个备机。因为RocketMQ的主从同步复制,只要一个备机响应了确认写入就可以了,一台IO打爆,问题不大。

5.2 过期数据删除#

第二个是删除过期数据的问题。

RocketMQ默认数据保留72个小时(fileReservedTime=72)。

然后它默认在凌晨4点开始删过期数据(deleteWhen="04")。你可以设置多个值用分号隔开。

因为数据都是定时删除的,所以在磁盘充足的情况,数据的最长保留会比你设置的还多一天。

因为默认都是同一时间,删除一整天的数据,如果用了机械硬盘,一般磁盘容量会比较大,需要删除的数据会特别多,这个就会导致在删除数据的时候,磁盘IO被打满。这个时候又要影响写入了。

为了解决这个问题,可以尝试多个方法,一个是设置文件删除的间隔,有两个参数可以设置,

  • deleteCommitLogFilesInterval = 100(毫秒)。每删除10个commitLog文件的时间间隔。
  • deleteConsumeQueueFilesInterval=100(毫秒)。每删除一个ConsumeQueue文件的时间间隔。

另外一个就是增加删除频率,把00-23都写到deleteWhen,就可以实现每个小时都删数据。

5.3 索引#

最后一个功能是索引的问题。

默认情况下,所有的broker都会建立索引(messageIndexEnable=true)。这个索引功能可以支持按照消息的uniqId,消息的key来查询消息体。

索引文件实现的时候,本质上也就是基于磁盘的个一个hashmap。

如果broker上消息数量比较多,查询的频率比较高,这也会造成一定的IO负载。

所以我们的推荐方案是在master上关掉了index功能,只在slave上打开。然后所有的index查询全部在slave上进行。

当然这个需要简单修改一下MQAdminImpl里的实现。因为默认情况下,它会向master发出请求。