开源客户端访问阿里云 RocketMQ 说明

开源客户端访问阿里云RocketMQ说明#

1 目标#

本文档主要帮助RocketMQ开源用户访问阿里云RocketMQ,以下场景可能会需要该文档:
* 云迁移场景:想从开源RocketMQ迁移到阿里云RocketMQ
* 混合云场景:应用需要同时访问私有云的开源RocketMQ和阿里云RocketMQ。
* 测试环境访问开源RocketMQ,线上环境访问阿里云RocketMQ。

2 开源Java客户端上云#

已经使用开源Java SDK进行生产的用户,只要参考一下方法,重新配置一下参数,即可实现无缝上云。

2.1 客户端版本#

从RocketMQ4.5.1版本开始,开源版本支持链接阿里云MQ。 下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.5.1/

2.2. 资源准备#

控制台获取已创建的资源包括Topic、GroupID,Endpoint,还有AK SK

2.3. 关键接口介绍#

  • AccessChannel

阿里云AliMQ和开源的RocketMQ默认使用的鉴权通道不同 本地自建使用:AccessChannel:LOCAL 上云连接AliMQ设置:AccessChannel:CLOUD

  • EndPoint

阿里云AliMQ使用接入点用来做nameserver的负载均衡并屏蔽了的具体IP地址,并且针对实例化用户通过在接入点前加入实例ID用于区分,因此要接入阿里云直接使用接入点即可。

  • ACL

阿里云提供一套完整的鉴权控制,开源版本的SDK提供了ACL功能,能够兼容阿里云的鉴权算法,这里只要配置正确的ak,sk既可以实现互联互通。

2.4 POM 依赖#

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.5.1</version>
</dependency>

2.5 参数设置部分代码示例#

Producer

//设置为云上创建的GID, 以及替换为自己的ak,sk
DefaultMQProducer producer = new DefaultMQProducer("GID-xxx", new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)));
//设置为自己的云上接入点
producer.setNamesrvAddr("http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
// 云上消息轨迹需要设置为CLOUD
producer.setAccessChannel(AccessChannel.CLOUD);
producer.start();
// 设置为云上创建的topic 名字
Message msg = new Message("xx-topic", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);

Consumer

//设置为云上创建的GID, 以及替换为自己的ak,sk
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID-xxx", new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)), new AllocateMessageQueueAveragely());
//设置为云上接入点
consumer.setNamesrvAddr("http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
// 云上消息轨迹需要设置为CLOUD方式
consumer.setAccessChannel(AccessChannel.CLOUD);
// 设置为云上创建的topic
consumer.subscribe("xx-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

2.6 Demo#

参考官网提供的example示例:https://github.com/apache/rocketmq/tree/master/example/src/main/java/org/apache/rocketmq/example

3 开源CPP客户端上云#

开源的客户端连接 阿里云AliMQ,需要重新修改部分配置信息即可,使用方式和默认配置保持兼容。

3.1. 客户端版本#

从rocketmq-clienr-cpp 1.2.2版本开始,开源版本支持链接阿里云MQ。 下载地址:https://github.com/apache/rocketmq-client-cpp/releases/tag/1.2.2

3.2. 资源准备#

控制台获取已创建的资源,Topic、GroupID,Endpoint,用户AK,SK,实例化用户还包括实例名InstanceID。

3. 3. 支持的平台#

linux:centos6.X, centos7.X, REHL6x, REHL7X darwin: macOS Mojave 10.14.X windows: win10

3.4. 关键接口参数介绍#

  • SessionCredentials

CPP客户端提供了访问阿里云默认的鉴权方式,

void setSessionCredentials(const std::string& input_accessKey,
const std::string& input_secretKey,
const std::string& input_channel);
此处前两个参数分别为从控制台上获取AK,SK,如果访问阿里云channel设置为"ALIYUN"
  • EndPoint

阿里云AliMQ将访问的name server地址以接入点的方式提供

void setNamesrvAddr(const std::string& namesrvAddr);

NameServer Address设置为控制台上获取的endpoint地址

  • GroupID(ProducerID/ConsumerID)

由于阿里云AliMQ将原来的ProducerID和ConsumerID合并,统一称作GroupID,开源客户端除了在Producer和Consumer实例化的时候,在构造函数里可以指定GID/PID/CID外,额外提供了设置GID的接口。

void setGroupName(const std::string& groupname);

针对实例化用户整理还需要将GID重新拼接一下格式InstanceID%GID,参考实例代码。 注意,此接口调用必须在实例start之前。

  • Topic

对于新用户,需要在控制台创建topic,对于需要迁移的用户,在控制台上创建相同的topic即可,这里不用修改,如果是申请的实例化用户,这里需要把实例ID拼接上,格式为InstanceID%Topic

3.5. 参数设置核心部分代码示例#

访问阿里云需要使用阿里云的鉴权方式和接入点,下面针对非实例化用户和实例化用户,分别列出需要修改的配置信息,其余的配置和开源RocketMQ的使用完全兼容。

  • 公网默认实例(非实例化)

Producer

//控制台上申请的GID或者PID
DefaultMQProducer producer("GID_XXXXX");
//如果上一步初始化的时候已经设置过,这步设置可以跳过
producer.setGroupName("GID_XXXXX");
//控制台上获取的接入点信息
producer.setNamesrvAddr("http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");

Consumer

//控制台上申请的GID或者CID
DefaultMQPushConsumer consumer("GID_XXXXX");
//如果上一步初始化的时候已经设置过,这步设置可以跳过
consumer.setGroupName("GID_XXXXX");
//控制台上获取的接入点信息
consumer.setNamesrvAddr("http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
consumer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
//设置订阅的topic和TAGs
consumer.subscribe("TOPIC","TAGS");
........
//注册处理消息的回调函数
consumer.registerMessageListener(&msglistener);
  • 实例化用户

由于实例化给用户提供了自己的命名空间,用于逻辑资源隔离,所以使用开源客户端访问时,需要显式的设置实例ID,即InstanceId,从控制台获取,格式为MQ_INST_XXXXX_XXXX

Producer

//控制台上申请的GID或者PID,需要和InstanceId拼装一下
DefaultMQProducer producer("MQ_INST_XXXXX_XXXX%GID_XXXXX");
//如果上一步初始化的时候已经设置过,这步设置可以跳过
producer.setGroupName("MQ_INST_XXXXX_XXXX%GID_XXXXX");
//控制台上获取的接入点信息,接入点包含了实例化信息
producer.setNamesrvAddr("http://MQ_INST_XXXX_XXXX、.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
...........
//发送消息的时候,Topic也需要指定InstanceId
MQMessage msg("MQ_INST_XXXXX_XXXX%YOURTOPIC", "TAG", "BODY");

Consumer

//控制台上申请的GID或者CID,需要和InstanceId拼装一下
DefaultMQPushConsumer consumer("MQ_INST_XXXXX_XXXX%GID_XXXXX");
//如果上一步初始化的时候已经设置过,这步设置可以跳过
consumer.setGroupName("MQ_INST_XXXXX_XXXX%GID_XXXXX");
//控制台上获取的接入点信息
consumer.setNamesrvAddr("http://MQ_INST_XXXX_XXXX、.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
consumer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
//设置订阅的topic和TAGs,由于实例化topic需要拼装一下
consumer.subscribe("MQ_INST_XXXXX_XXXX%TOPIC","TAGS");
........
//注册处理消息的回调函数
consumer.registerMessageListener(&msglistener);

3.6. Demo#

StepByStep使用方法,参考:https://github.com/apache/rocketmq-client-cpp/blob/master/README.md

4 开源Python客户端上云#

开源的客户端连接 阿里云AliMQ,需要重新修改部分配置信息即可,使用方式和默认配置保持兼容。

4.1. 客户端版本#

由于从rocketmq-client-cpp 1.2.2版本开始,开源版本支持链接阿里云MQ。因此Python客户端将CPP动态库依赖升级至1.2.2后,也可以无缝上云。 CPP SDK下载地址:https://github.com/apache/rocketmq-client-cpp/releases/tag/1.2.2

4.2. 资源准备#

控制台获取已创建的资源,Topic、GroupID,Endpoint,用户AK,SK,实例化用户还包括实例名InstanceID。

4.3. 支持的平台#

linux:centos6.X, centos7.X, REHL6x, REHL7X darwin: macOS Mojave 10.14.X

4.4. 关键接口参数介绍#

  • SessionCredentials

Python客户端提供了访问阿里云默认的鉴权方式,

SetProducerSessionCredentials(producerid,accessKey,secretKey,channel);
SetConsumerSessionCredentials(consumerid,accessKey,secretKey,channel);
此处前两个参数分别为从控制台上获取AK,SK,如果访问阿里云channel设置为"ALIYUN"
  • EndPoint

阿里云AliMQ将访问的name server地址以接入点的方式提供

SetProducerNameServerAddressr(producerid,endpoint);
SetConsumerNameServerAddressr(consumerid,endpoint);

NameServer Address设置为控制台上获取的endpoint地址

  • GroupID(ProducerID/ConsumerID)

由于阿里云AliMQ将原来的ProducerID和ConsumerID合并,统一称作GroupID,开源客户端除了在Producer和Consumer实例化的时候,在构造函数里可以指定GID/PID/CID外,不再额外提供了设置GID的接口。 针对实例化用户整理还需要将GID重新拼接一下格式InstanceID%GID,参考实例代码。

  • Topic

对于新用户,需要在控制台创建topic,对于需要迁移的用户,在控制台上创建相同的topic即可,这里不用修改,如果是申请的实例化用户,这里需要把实例ID拼接上,格式为InstanceID%Topic

4.5. 参数设置核心部分代码示例#

访问阿里云需要使用阿里云的鉴权方式和接入点,下面针对非实例化用户和实例化用户,分别列出需要修改的配置信息,其余的配置和开源RocketMQ的使用完全兼容。

  • 公网默认实例(非实例化)

    Producer

//控制台上申请的GID或者PID
producer = CreateProducer("GID_XXXX")
//控制台上获取的接入点信息
SetProducerNameServerAddress(producer,"http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80")
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
SetProducerSessionCredentials(producer,"mq acesskey", "mq secretkey", "ALIYUN");

Consumer

//控制台上申请的GID或者CID
consumer = CreatePushConsumer("GID_XXXX")
//控制台上获取的接入点信息
SetPushConsumerNameServerAddress(consumer,"http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80")
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
SetProducerSessionCredentials(consumer,"mq acesskey", "mq secretkey", "ALIYUN")
//设置订阅的topic和TAGs
Subscribe(consumer,"TOPIC","TAGS")
........
//注册处理消息的回调函数
RegisterMessageCallback(consumer, consumerMessage, None)
  • 实例化用户

由于实例化给用户提供了自己的命名空间,用于逻辑资源隔离,所以使用开源客户端访问时,需要显式的设置实例ID,即InstanceId,从控制台获取,格式为MQ_INST_XXXXX_XXXX

Producer

//控制台上申请的GID或者PID,需要和InstanceId拼装一下
producer = CreateProducer("MQ_INST_XXXXX_XXXX%GID_XXXXX")
//控制台上获取的接入点信息,接入点包含了实例化信息
SetProducerNameServerAddress(producer,"http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80");
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
SetProducerSessionCredentials(producer,"mq acesskey", "mq secretkey", "ALIYUN");
...........
//发送消息的时候,Topic也需要指定InstanceId
msg = CreateMessage("MQ_INST_XXXXX_XXXX%YOURTOPIC")

Consumer

//控制台上申请的GID或者CID,需要和InstanceId拼装一下
consumer = CreatePushConsumer("MQ_INST_XXXXX_XXXX%GID_XXXXX")
//控制台上获取的接入点信息
SetPushConsumerNameServerAddress(consumer,"http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80")
//用户的AK和SK,控制台上可以获取,第三个参数固定默认使用“ALIYUN”
SetProducerSessionCredentials(consumer,"mq acesskey", "mq secretkey", "ALIYUN")
//设置订阅的topic和TAGs,由于实例化topic需要拼装一下
Subscribe(consumer,"MQ_INST_XXXXX_XXXX%TOPIC","TAGS")
........
//注册处理消息的回调函数
RegisterMessageCallback(consumer, consumerMessage, None)

4.6. Demo#

StepByStep使用方法,参考:https://github.com/apache/rocketmq-client-python/blob/master/doc/Introduction.md

5 RocketMQ Spring上云#

5.1. 版本#

RocketMQ Spring版本:2.0.3 RocketMQ客户端:4.5.1。 下载地址:https://github.com/apache/rocketmq-spring/releases/tag/rocketmq-spring-all-2.0.3

5.2. 资源准备#

控制台获取已创建的资源,Topic、GroupID,Endpoint,用户AK,SK,实例化用户还包括实例名InstanceID。

5.3 Pom 依赖#

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${RELEASE.VERSION}</version>
</dependency>

5.4. 参数设置#

参考开源Java客户端测设置,使用spring链接上云需要同样设置部分ACL参数和接入点。打开application.properties文件,修改参数即可

Producer

//控制台上获取的接入点信息
rocketmq.name-server=http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
//控制台上申请的GID或者PID
rocketmq.producer.group=GID_XXXXX
//用户的AK和SK,控制台上可以获取
rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK

Consumer

//控制台上获取的接入点信息
rocketmq.name-server=http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
//控制台申请的Topic资源
rocketmq.topic=TOPIC
//控制台上申请的GID或者PID
rocketmq.consumer.group=GID_XXXXX
//用户的AK和SK,控制台上可以获取
rocketmq.consumer.access-key=AK
rocketmq.consumer.secret-key=SK

5.5. Demo#

StepByStep使用方法,参考:https://github.com/apache/rocketmq-spring

6. 开源Go客户端上云#

开源的Go客户端使用cgo封装CPP SDK连接 阿里云AliMQ,因此只需要更新CPP动态库版本和重新修改部分配置信息即可,使用方式和默认配置保持兼容

6.1. 客户端版本#

由于从rocketmq-client-cpp 1.2.2版本开始,开源版本支持链接阿里云MQ。因此Go客户端将CPP动态库依赖升级至1.2.2后,也可以无缝上云。 CPP SDK下载地址:https://github.com/apache/rocketmq-client-cpp/releases/tag/1.2.2

6.2. 资源准备#

控制台获取已创建的资源,Topic、GroupID,Endpoint,用户AK,SK,实例化用户还包括实例名InstanceID。

6.3. 支持的平台#

linux:centos6.X, centos7.X, REHL6x, REHL7X darwin: macOS Mojave 10.14.X

6.4. 参数设置核心部分代码示例#

访问阿里云需要使用阿里云的鉴权方式和接入点,下面针对非实例化用户和实例化用户,分别列出需要修改的配置信息,其余的配置和开源RocketMQ的使用完全兼容。

  • 公网默认实例(非实例化)

Producer

producerConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
//控制台上申请的GID或者PID
GroupID: "GID_XXXXX",
//控制台上获取的接入点信息
NameServer: "http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80",
Credentials:&rocketmq.SessionCredentials{
AccessKey:"mq acesskey",
SecretKey:"mq acesskey",
Channel:"mq acesskey",
},
}}
result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "Topic", Body: msg})

Consumer

config := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
//控制台上申请的GID或者PID
GroupID: "GID_XXXXX",
//控制台上获取的接入点信息
NameServer: "http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80",
Credentials:&rocketmq.SessionCredentials{
AccessKey:"mq acesskey",
SecretKey:"mq acesskey",
Channel:"mq acesskey",
},
}}
consumer, err := rocketmq.NewPushConsumer(config)
//设置订阅的topic和TAGs, 注册回调
consumer.Subscribe("TOPIC", "TAGs", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
fmt.Printf("A message received: \"%s\" \n", msg.Body)
return rocketmq.ConsumeSuccess
})
........
  • 实例化用户

由于实例化给用户提供了自己的命名空间,用于逻辑资源隔离,所以使用开源客户端访问时,需要显式的设置实例ID,即InstanceId,从控制台获取,格式为MQ_INST_XXXXX_XXXX

Producer

producerConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
//控制台上申请的GID或者PID
GroupID: "MQ_INST_XXXXX_XXXX%GID_XXXXX",
//控制台上获取的接入点信息
NameServer: "http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80",
Credentials:&rocketmq.SessionCredentials{
AccessKey:"mq acesskey",
SecretKey:"mq acesskey",
Channel:"mq acesskey",
}
}}
......
result, err := producer.SendMessageSync(&rocketmq.Message{
Topic: "MQ_INST_XXXXX_XXXX%TOPIC",
Body: msg})

Consumer

config := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
//控制台上申请的GID或者PID
GroupID: "MQ_INST_XXXXX_XXXX%GID_XXXXX",
//控制台上获取的接入点信息
NameServer: "http://MQ_INST_XXXX_XXXX.mq-internet-access.mq-internet.aliyuncs.com:80",
Credentials:&rocketmq.SessionCredentials{
AccessKey:"mq acesskey",
SecretKey:"mq acesskey",
Channel:"mq acesskey",
}
}}
consumer, err := rocketmq.NewPushConsumer(config)
//设置订阅的topic和TAGs, 注册回调
consumer.Subscribe("MQ_INST_XXXXX_XXXX%TOPIC", "TAGs", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
fmt.Printf("A message received: \"%s\" \n", msg.Body)
return rocketmq.ConsumeSuccess
})
........

6.6. Demo#

StepByStep使用方法,参考:https://github.com/apache/rocketmq-client-go/blob/master/doc/Introduction.md