老鬼的博客 来都来啦,那就随便看看吧~
springboot整合阿里云rocketmq4.0
发布于: 2022-09-29 更新于: 2022-09-29 分类于: Java 阅读次数: 

一:pom引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<!-- rocketmq start -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.8.5.Final</version>
</dependency>
<!-- rocketmq end -->

注意上面的ons-client不同的版本支持不同的地域,
要不然会造成无法连接。

1.png

二:yml文件

1
2
3
4
5
6
7
8
9
# rocketmq 消息队列
rocketmq:
accessKey: LTAI5tDTxxxrsBxr # key
secretKey: uL07TsSZAsxxxDecb39HTZG # secret
nameSrvAddr: http://MQ_INST_143.cn-shanghai.mq-internal.aliyuncs.com:8080 # server地址
group: GID_aia_YJK_activity # 组
topic: aia_YJK_bp_topic # 主题
tag: spmall # 标签
sendMsgTimeoutMillis: 3000 # 发送超时时间

三:MQ配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.tohours.spmall.config.mq;

import java.util.Properties;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import com.aliyun.openservices.ons.api.PropertyKeyConst;

import lombok.Data;


/**
* @desc MQ的配置信息
* @author RenJie
*/
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public @Data class MqConfig {

private String accessKey;
private String secretKey;
private String nameSrvAddr;//server地址
private String topic;//主题
private String group;//组
private String tag;//标签
private String sendMsgTimeoutMillis;//发送超时时间

public Properties getMqPropertie() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
return properties;
}


}

四:生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.tohours.spmall.config.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.aliyun.openservices.ons.api.bean.ProducerBean;


/**
* @desc 生产者初始化
* @author RenJie
*/
@Configuration
public class ProducerClient {

@Autowired
private MqConfig mqConfig;

@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(mqConfig.getMqPropertie());
return producer;
}

}

五:消费者

  • 初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.tohours.spmall.config.mq;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;


/**
* @desc 消费者初始化
* @author RenJie
* @date 2022-09-29
*/
//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
@Configuration
public class ConsumerClient {

@Autowired
private MqConfig mqConfig;

@Autowired
private NormalMessageListener messageListener;

@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroup());
//将消费者线程数固定为20个 20为默认值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
//广播订阅方式设置
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
consumerBean.setProperties(properties);
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.getTopic());
subscription.setExpression(mqConfig.getTag());
subscriptionTable.put(subscription, messageListener);
//订阅多个topic如上面设置

consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}

}

  • 监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.tohours.spmall.config.mq;

import java.io.UnsupportedEncodingException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;

import lombok.extern.slf4j.Slf4j;


/**
* @desc 消费者监听
* @author RenJie
*/
@Component
@Log4j2
public class NormalMessageListener implements MessageListener {

@Autowired
private JdbcTemplate jdbcTemplate;

@Override
public Action consume(Message message, ConsumeContext context) {
try {
log.info("consumer start ...");
log.info("消费者监听: " + JSON.toJSONString(message));
log.info("msgId:" + message.getMsgID());
log.info("body:" + new String(message.getBody(),"UTF-8"));
//do something..

insertMessage(message);

return Action.CommitMessage;
} catch (Exception e) {
//消费失败
return Action.ReconsumeLater;
}
}

private void insertMessage(Message message) throws UnsupportedEncodingException {
String sql = "INSERT INTO spmall_message (msg_id,body,insert_time) values(?,?,NOW());";
String msgId = message.getMsgID();
String body = new String(message.getBody(),"UTF-8");
jdbcTemplate.update(sql,msgId,body);
}
}

六:工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package com.tohours.spmall.utils;

import javax.annotation.PostConstruct;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.tohours.spmall.config.mq.MqConfig;
import com.tohours.spmall.config.mq.ProducerClient;

import lombok.extern.log4j.Log4j2;

@Component
@Log4j2
public class RocketMQUtils {

@Autowired
private MqConfig cmqConfig;
@Autowired
private ProducerClient cproducerClient;

@Autowired
private static MqConfig mqConfig;
@Autowired
private static ProducerClient producerClient;

@PostConstruct
public void init() {
mqConfig = cmqConfig;
producerClient = cproducerClient;
}

private static ProducerBean producer;

// static{
// producer = buildProducer();
// }

public static ProducerBean buildProducer() {
if (producer == null) {
producer = producerClient.buildProducer();
}
return producer;
}

/**
* @desc 同步发送消息
* @param topic 主题
* @param tag 标签
* @param key 置代表消息的业务关键属性,请尽可能全局唯一。
* 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发
* @param data 消息内容
* @return
*/
public static SendResult send(String topic, String tag, String key, Object data) {
log.info("topic:" + topic);
log.info("tag:" + tag);
log.info("key:" + key);
log.info("data:" + data);
Message msg = new Message(
// 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
topic,
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
tag,
// Message
// Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
data.toString().getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
if (StringUtils.isNotEmpty(key)) {
msg.setKey(key);
}
SendResult sendResult = buildProducer().send(msg);
return sendResult;
}

/**
* @desc 同步发送消息
* @param data 消息内容
* @return
*/
public static SendResult send(Object data) {
return send("", data);
}

/**
* @desc 同步发送消息
* @param key 设置代表消息的业务关键属性,请尽可能全局唯一。
* 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
* @param data 消息内容
* @return
*/
public static SendResult send(String key, Object data) {
return send(mqConfig.getTopic(), mqConfig.getTag(), key, data);
}

/**
* @desc 异步发送消息
* @param topic 主题
* @param tag 标签
* @param key key
* @param data 消息内容
* @param sendCallback 回调方法,如果为空则默认打印info日志
*/
public static void sendAsync(String topic, String tag, String key, Object data, SendCallback sendCallback) {
log.info("topic:" + topic);
log.info("tag:" + tag);
log.info("key:" + key);
log.info("data:" + data);
Message msg = new Message(
// 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
topic,
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
tag,
// Message
// Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
data.toString().getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
if (StringUtils.isNotEmpty(key)) {
msg.setKey(key);
}
// 异步发送消息, 发送结果通过callback返回给客户端。
if (sendCallback == null) {
buildProducer().sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 消息发送成功。
log.info("send message success. topic=" + sendResult.getTopic() + ", msgId="
+ sendResult.getMessageId());
}

@Override
public void onException(OnExceptionContext context) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
log.info("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
} else {
buildProducer().sendAsync(msg, sendCallback);
}
}

/**
* @desc 发送异步消息
* @param key key
* @param data 消息内容
* @param sendCallback 回调方法,如果为空则默认打印info日志
*/
public static void sendAsync(String key, Object data, SendCallback sendCallback) {
sendAsync(mqConfig.getTopic(), mqConfig.getTag(), key, data, sendCallback);
}

/**
* @desc 发送异步消息
* @param key key
* @param data 消息内容
*/
public static void sendAsync(String key, Object data) {
sendAsync(key, data, null);
}

/**
* @desc 发送异步消息
* @param data 消息内容
* @param sendCallback 回调方法,如果为空则默认打印info日志
*/
public static void sendAsync(Object data, SendCallback sendCallback) {
sendAsync(null, data, sendCallback);
}

/**
* @desc 发送异步消息
* @param data 消息内容
*/
public static void sendAsync(Object data) {
sendAsync(null, data, null);
}

/**
* @desc 发送单向消息
* @param topic 主题
* @param tag 标签
* @param key key
* @param data 消息内容
*/
public static void sendOneway(String topic, String tag, String key, Object data) {
log.info("topic:" + topic);
log.info("tag:" + tag);
log.info("key:" + key);
log.info("data:" + data);
Message msg = new Message(
// 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
topic,
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
tag,
// Message
// Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
data.toString().getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
if (StringUtils.isNotEmpty(key)) {
msg.setKey(key);
}
buildProducer().sendOneway(msg);
}

/**
* @desc 发送单向消息
* @param key key
* @param data 消息内容
*/
public static void sendOneway(String key, Object data) {
sendOneway(mqConfig.getTopic(), mqConfig.getTag(), key, data);
}

/**
* @desc 发送单向消息
* @param data 消息内容
*/
public static void sendOneway(Object data) {
sendOneway(null, data);
}



}

七:相关截图

1.png

1.png

1.png

*************感谢您的阅读*************