扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
同步发送sysnc
、异步发送async
、直接发送one-way
。成都创新互联专注于东宝企业网站建设,响应式网站开发,购物商城网站建设。东宝网站建设公司,为东宝等地区提供建站服务。全流程按需制作,专业设计,全程项目跟踪,成都创新互联专业和态度为您提供的服务使用RocketMQTemplate必须要在配置文件中配置RocketMQ的属性,Springboot在加载时才会创建RocketMQTemplate的Bean。配置文件示例如下:
rocketmq:
name-server: nameServer的集群IP
compress-message-body-threshold: 4096
consumer:
access-key: username
secret-key: password
max-message-size: 536870912
producer:
access-key: username
secret-key: password
group: producerGroup
retry-next-server: true
retry-times-when-send-async-failed: 2
retry-times-when-send-failed: 2
send-message-timeout: 3000
(1)void send(Message>message) throws MessagingException;同步发送import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 这里的Message是org.springframework.messaging.Message类型
MessagesendMessage = MessageBuilder.withPayload("这里设置消息体")
.setHeader("消息的属性的key", "消息的属性的值")
// 想发送带key的消息,请求头的键必须写成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
rocketMQTemplate.send(sendMessage);
}
}
RocketMQTemplate.send(Message>message) 方法只有一个Message>类型的参数,没有设置topic,这个消息会发送到RocketMQ的默认topic
,这个默认topic是在安装RocketMQ Client的时候配置的,如果没有这个topic会抛出"No 'defaultDestination' configured"
异常。这个方法几乎不会被使用,我们发送消息一般都是要发送到我们想去的一个topic。此方法会将消息同步发送至topic,此方法没有返回值,我们无法获取SendResult。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 这里的Message是org.springframework.messaging.Message类型
MessagesendMessage = MessageBuilder.withPayload("这里设置消息体")
.setHeader("消息的属性的key", "消息的属性的值")
// 想发送带key的消息,请求头的键必须写成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 这里指定将消息发送到topicA的tagA下,也可以不指定tagA只写topicA
rocketMQTemplate.send("topicA:tagA", sendMessage);
}
}
RocketMQTemplate.send(D destination, Message>message) 方法有两个参数,第一个参数就是topic,第二个参数是要发送的消息。这个方法是一个委托方法,其实最终调用的是RocketMQTemplate.syncSend(String destination, Message>message)方法,也就是说destination虽然是个泛型,但是我们应该传入一个字符串类型的topic,此方法会将消息同步发送至topic
。此方法没有返回值,我们无法获取SendResult。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 这里的Message是org.springframework.messaging.Message类型
MessagesendMessage = MessageBuilder.withPayload("这里设置消息体")
.setHeader("消息的属性的key", "消息的属性的值")
// 想发送带key的消息,请求头的键必须写成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 这里指定将消息发送到topicA的tagA下,也可以不指定tagA只写topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", sendMessage);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Message>message) 方法有两个参数,第一个参数就是topic,第二个参数是要发送的消息。此方法的返回值为SendResult,我们可以通过这个类来确定消息是否发送成功,获取消息的MessageId等
。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 这里的Message是org.springframework.messaging.Message类型
MessagesendMessage = MessageBuilder.withPayload("这里设置消息体")
.setHeader("消息的属性的key", "消息的属性的值")
// 想发送带key的消息,请求头的键必须写成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 这里指定将消息发送到topicA的tagA下,也可以不指定tagA只写topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", sendMessage, 3000L);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Message>message, long timeout) 方法有三个参数,第一个参数就是topic,第二个参数是要发送的消息,第三个参数是超时时间
。其实方法(3)的底层也是调用此方法,只不过由于我们没有设置timeout,系统会使用默认的timeout,默认值为3000毫秒。注意:超时时间设置的过小会导致消息发送失败。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("这里设置消息体" + i)
.setHeader("消息的属性的key", "消息的属性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
SendResult sendResult =rocketMQTemplate.syncSend("topicA:tagA", messages);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Collection消息的集合
。此方法会将消息批量发送到topicA下的tagA下。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("这里设置消息体" + i)
.setHeader("消息的属性的key", "消息的属性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
SendResult sendResult =rocketMQTemplate.syncSend("topicA:tagA", messages, 3000L);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Collection消息的集合
,第三个参数是超时时间。此方法会将消息批量发送到topicA下的tagA下。其实方法(5)的底层也是调用此方法,只不过由于我们没有设置timeout系统会使用默认的timeout,默认值为3000毫秒。注意:超时时间设置的过小会导致消息发送失败。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 这里指定将消息发送到topicA的tagA下,也可以不指定tagA只写topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", "这里设置消息体");
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Object payload, long timeout) 方法有两个参数,第一个参数就是topic,第二个参数是要发送的消息的消息体。此方法不需要我们自己创建Message对象了,底层会帮我们创建。但是缺点就是不能设置消息的属性和key
。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 这里指定将消息发送到topicA的tagA下,也可以不指定tagA只写topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", "这里设置消息体", 3000L);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Object payload, long timeout) 方法有三个参数,第一个参数就是topic,第二个参数是要发送的消息的消息体
,第三个参数是超时时间。此方法就是不需要我们自己创建Message对象了,底层会帮我们创建。但是缺点就是不能设置消息的属性和key
。其实方法(7)的底层也是调用此方法,只不过由于我们没有设置timeout,系统会使用默认的timeout,默认值为3000毫秒。注意:超时时间设置的过小会导致消息发送失败。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 这里的Message是org.springframework.messaging.Message类型
MessagesendMessage = MessageBuilder.withPayload("这里设置消息体")
.setHeader("消息的属性的key", "消息的属性的值")
// 想发送带key的消息,请求头的键必须写成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 这里指定将消息发送到topicA的tagA下,也可以不指定tagA只写topicA
rocketMQTemplate.sendOneWay("topicA:tagA", "这里设置消息体");
}
}
RocketMQTemplate.sendOneWay(String destination, Message>message) 方法有两个参数,第一个参数就是topic,第二个参数是要发送的消息。此方法可以异步发送消息,具有很高的发送效率,但是没有返回值,我们无法获取SendResult。
(10)void sendOneWay(String destination, Object payload);one-way模式,异步发送import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 这里指定将消息发送到topicA的tagA下,也可以不指定tagA只写topicA
rocketMQTemplate.sendOneWay("topicA:tagA", "这里设置消息体");
}
}
RocketMQTemplate.sendOneWay(String destination, Object payload) 方法有两个参数,第一个参数就是topic,第二个参数是要发送的消息的消息体
。此方法不需要我们自己创建Message对象了,底层会帮我们创建。但是缺点就是不能设置消息的属性和key。此方法可以异步发送消息,具有很高的发送效率,但是没有返回值,我们无法获取SendResult。
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {MessagesendMessage = MessageBuilder.withPayload("这里设置消息体")
.setHeader("消息的属性的key", "消息的属性的值")
.setHeader("KEYS", "消息的key的值")
.build();
rocketMQTemplate.asyncSend("topicA:tagA", sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
}
}
RocketMQTemplate.asyncSend(String destination, Message>message, SendCallback sendCallback) 方法有三个参数,第一个参数就是topic,第二个参数是要发送的消息,第三个参数是异步消息的回调对象。此方法允许我们设置回调函数,知道异步消息是否发送成功以此来做相应的事情。方法(9)和(10)底层也是调用了此方法,只不过把SendCallback对象设为了null。
(12)void asyncSend(String destination, Message>message, SendCallback sendCallback, long timeout);异步发送import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {MessagesendMessage = MessageBuilder.withPayload("这里设置消息体")
.setHeader("消息的属性的key", "消息的属性的值")
.setHeader("KEYS", "消息的key的值")
.build();
rocketMQTemplate.asyncSend("topicA:tagA", sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
}
}
RocketMQTemplate.asyncSend(String destination, Message>message, SendCallback sendCallback, long timeout) 方法有四个参数,第一个参数就是topic,第二个参数是要发送的消息,第三个参数是异步消息的回调对象,第四个参数是超时时间。此方法允许我们设置回调函数,知道异步消息是否发送成功以此来做相应的事情。方法(11)也是调用了此方法,只不过由于我们没有设置timeout,系统会使用默认的timeout,默认值为3000毫秒。注意:超时时间设置的过小会导致消息发送失败。
(13)void asyncSend(String destination, Object payload, SendCallback sendCallback);异步发送import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {rocketMQTemplate.asyncSend("topicA:tagA", "这里设置消息体", new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
}
}
同方法(11)一样,只不过第二个参数由Message>类型换成了Object类型,可以直接传入要发送的消息体,不用我们自己创建Message对象,缺点就是不能设置消息的属性和key。
(14)void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout);异步发送import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {rocketMQTemplate.asyncSend("topicA:tagA", "这里设置消息体", new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
}
}
方法(13)的底层也是调用此方法,只不过由于我们没有设置timeout,系统会使用默认的timeout,默认值为3000毫秒。注意:超时时间设置的过小会导致消息发送失败。
(15)import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("这里设置消息体" + i)
.setHeader("消息的属性的key", "消息的属性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
rocketMQTemplate.asyncSend("topicA:tagA", messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
}
}
同方法(11)一样,只不过第二个参数由Message>类型换成了Collection
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("这里设置消息体" + i)
.setHeader("消息的属性的key", "消息的属性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
rocketMQTemplate.asyncSend("topicA:tagA", messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
}
}
方法(15)的底层也是调用此方法,只不过由于我们没有设置timeout,系统会使用默认的timeout,默认值为3000毫秒。注意:超时时间设置的过小会导致消息发送失败。
(17)void convertAndSend(Object payload) throws MessagingException;同步发送import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 实体类
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend(exampleEntity);
}
}
同方法(1)一样,此方法也是将消息发送到默认的topic。payload可以是一个实体类、集合等也可以是字符串。如果payload是实体类、集合等,底层会将实体类转化成json对象,例如上述代码发送消息的结果就是{"name":"Tom"}
,如果传入的是集合对象这会转换从jsonArray。如果payload是字符串,则发送消息的结果就是原字符串。
import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 实体类
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity);
}
}
在方法(17)的基础上可以指定topic将消息发送到指定的topic。
(19)void convertAndSend(D destination, Object payload, @Nullable Mapimport com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 消息的属性
Mapmap = new HashMap<>();
map.put("消息的属性的键", "消息的属性的值");
map.put("KEYS", "消息的key");
// 实体类
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, map);
}
}
在方法(18)的基础上增加了第三个Map类型的参数,我们可以使用这个参数来设置消息的属性和key。
(20)void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步发送import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 实体类
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend(exampleEntity, new MessagePostProcessor() {@Override
public Message>postProcessMessage(Message>message) {MessageHeaders headers = message.getHeaders();
String keys = headers.get("KEYS", String.class);
System.out.println(keys);
return message;
}
});
}
}
在方法(17)的基础上增加了MessagePostProcessor对象,MessagePostProcessor顾名思义就是消息的后处理。我们传入的参数中,destination就是消息要去往的topic(这里没有destination则发送默认的topic),payload就是消息的消息体,headers就是消息的属性(这里没有headers则无法设置属性),RocketMQ底层会根据payload和headers生成Message对象,MessagePostProcessor就是对这个生成的Message对象做一些事情,最后再发往destination。
(21)void convertAndSend(D destination, Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步发送import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 实体类
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, new MessagePostProcessor() {@Override
public Message>postProcessMessage(Message>message) {MessageHeaders headers = message.getHeaders();
String keys = headers.get("KEYS", String.class);
System.out.println(keys);
return message;
}
});
}
}
在方法(20)的基础上可以指定topic将消息发送到指定的topic。
(22)void convertAndSend(D destination, Object payload, @Nullable Mapimport com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 消息的属性
Mapmap = new HashMap<>();
map.put("消息的属性的键", "消息的属性的值");
map.put("KEYS", "消息的key");
// 实体类
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, map, new MessagePostProcessor() {@Override
public Message>postProcessMessage(Message>message) {MessageHeaders headers = message.getHeaders();
String keys = headers.get("KEYS", String.class);
System.out.println(keys);
return message;
}
});
}
}
在方法(21)的基础上增加了Map类型的参数,我们可以使用这个参数来设置消息的属性和key。
至此,使用RocketMQTemplate发送普通消息的方法就全部讲解完了,其实还有RocketMQTemplate.sendAndReceive()方法也可以发送普通消息,但是这个要配合消费者一起使用,我会另写一篇文章讲解这个方法。
二、使用DefaultMQProducer发送消息有些场景下我们不想使用springboot自动创建的RocketMQTemplate的Bean来发送消息,而是想自己创建生产者以使用不同的nameServer来发送消息到不同的集群或者想在main函数中创建生产者,可以使用DefaultMQProducer来发送消息。其实RocketMQTemplate底层也是使用DefaultMQProducer来发送消息的,只不过进行了包装让用户使用起来更方便。
2.1 DefaultMQProducer的创建DefaultMQProducer有多个构造函数,我们可以根据不同的场景使用不同的构造函数创建对象。
(1)DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook);DefaultMQProducer producer = new DefaultMQProducer("命名空间", "生产者组", new AclClientRPCHook(new SessionCredentials("用户名","密码")));
此构造函数的第一个参数是命名空间,命名空间需要在服务端提前创建。第二个参数是生产者组,一个生产者组可以包含多个生产者,生产者组不需要提前创建,在创建DefaultMQProducer对象的时候赋值一个生产者组就可以。第三个参数是RPCHook对象用于权限认证,相当于你登陆一个网站需要输入用户名和密码。
(2)DefaultMQProducer(String producerGroup, RPCHook rpcHook);命名空间是RocketMQ中的一个资源管理概念。用户不同的业务场景一般都可以通过命名空间做隔离,并且针对不同的业务场景设置专门的配置,例如消息保留时间。不同命名空间之间的 Topic 相互隔离,订阅相互隔离,角色权限相互隔离。
DefaultMQProducer producer = new DefaultMQProducer("生产者组", new AclClientRPCHook(new SessionCredentials("用户名","密码")));
此构造函数底层还是调用了构造方法(1)
,只不过将namespace设为了null,在没有命名空间的时候可以使用此构造函数。
DefaultMQProducer producer = new DefaultMQProducer("命名空间", "生产者组");
此构造函数底层还是调用了构造方法(1)
,只不过将RPCHook 设为了null,在不需要acl认证的时候可以使用此构造函数。
DefaultMQProducer producer = new DefaultMQProducer("生产者组");
此构造函数底层还是调用了构造方法(1)
,只不过将namespace和RPCHook设为了null,在没有命名空间和不需要acl认证的时候可以使用此构造函数。
DefaultMQProducer producer = new DefaultMQProducer(new AclClientRPCHook(new SessionCredentials("用户名","密码")));
此构造函数底层还是调用了构造方法(1)
,只不过将namespace设为了null,由于prodcuerGroup不能为null,所以RocketMQ会使用默认的生产者组:DEFAULT_PRODUCER
。
DefaultMQProducer producer = new DefaultMQProducer();
此构造函数底层还是调用了构造方法(1)
,只不过将namespace和RPCHook设为了null,由于prodcuerGroup不能为null,所以RocketMQ会使用默认的生产者组:DEFAULT_PRODUCER
。
DefaultMQProducer producer = new DefaultMQProducer("命名空间", "生产者组", new AclClientRPCHook(new SessionCredentials("用户名","密码")), true, "traceTopic");
此构造函数的第一个参数是命名空间,命名空间需要在服务端提前创建。第二个参数是生产者组,一个生产者组可以包含多个生产者,生产者组不需要提前创建,在创建DefaultMQProducer对象的时候赋值一个生产者组就可以。第三个参数是RPCHook对象用于权限认证,相当于你登陆一个网站需要输入用户名和密码。第四个参数是布尔类型,表示是否开启消息追踪。第五个参数是消息跟踪的topic的名称,这个topic专门用来做消息追踪的,一般不会用这个topic生产和消费业务数据。开启追踪后,追踪topic内会记录生产者的一些信息,比如生产者IP、消息的MessageID等
。例如下面的代码就是开启追踪并设置trace-topic
为追踪topic,然后将消息发送到topicA中,于是topicA里面是业务数据,trace-topic里面是用于消息追踪的追踪数据。也就是发送一次消息会发送一份业务数据和一份追踪数据到业务topic和追踪topic
,
package com.sgm.esb.gateway.service;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", "这里设置消息体".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
producer.send(sendMessage);
producer.shutdown();
}
}
如下是追踪topic中的消息内容:
DefaultMQProducer producer = new DefaultMQProducer("生产者组", true, "traceTopic");
此构造函数底层还是调用了构造方法(7)
,只不过将namespace和RPCHook设为了null,使用于没有命名空间和不需要acl认证的时候。
DefaultMQProducer producer = new DefaultMQProducer("生产者组", true);
此构造函数底层还是调用了构造方法(7)
,只不过将namespace、RPCHook和customizedTraceTopic设为了null。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
// 这里的Message类型是org.apache.rocketmq.common.message.Message
Message sendMessage = new Message("topicA", "tagA", "这里设置消息体".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
SendResult sendResult = producer.send(sendMessage);
System.out.println(sendResult);
producer.shutdown();
}
}
我们根据需要使用上面提到的某一种构造方法创建生产者后必须调用setNamesrvAddr()方法来设置NameServer(要不然消息发到哪呢),然后调用start()开始发送消息,调用send()方法将创建的Message对象发送到某个topic下,最后调用shutdown()来释放资源。DefaultMQProducer.send(Message msg)方法只有一个Message类型的参数,这个Message的类型为org.apache.rocketmq.common.message.Message
,不同于之前RocketMQTemplate中的org.springframework.messaging.Message
。我们可以在此Message中设置topic和tag而不是在send()方法中设置topic和tag,通过putUserProperty()方法设置消息的属性,通过setKeys()方法设置消息的key等。在RocketMQTemplate中我们使用org.springframework.messaging.Message来创建消息,其实RocketMQ底层最终会将org.springframework.messaging.Message转化为org.apache.rocketmq.common.message.Message类型进行消息的发送
。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
// 这里的Message类型是org.apache.rocketmq.common.message.Message
Message sendMessage = new Message("topicA", "tagA", "这里设置消息体".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
SendResult sendResult = producer.send(sendMessage, 3000L);
System.out.println(sendResult);
producer.shutdown();
}
}
在方法(1)的基础上增加timeout参数来设置超时时间。
(3)void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;异步发送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", "这里设置消息体".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
producer.send(sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
producer.shutdown();
}
}
第二个参数设置异步回调,同RocketMQTemplate异步发送消息一样,不再赘述。
(4)void send(Message msg, SendCallback sendCallback,long timeout) throws MQClientException, RemotingException, InterruptedException;异步发送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", "这里设置消息体".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
producer.send(sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
producer.shutdown();
}
}
在方法(3)的基础上增加timeout参数来设置超时时间。
(5)SendResult send(Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步发送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("这里设置消息体" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
producer.shutdown();
}
}
批量发送消息到某一个topic。
(6)SendResult send(Collection msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步发送注意:这里List中Message的topic都必须是同一个,否则会报错。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("这里设置消息体" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
producer.shutdown();
}
}
批量发送消息到某一个topic,在方法(5)的基础上增加timeout参数来设置超时时间。
(7)void send(Collection msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;异步发送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("这里设置消息体" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
producer.send(messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
producer.shutdown();
}
}
批量异步发送消息到某一个topic。
(8)void send(Collection msgs, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;异步发送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("这里设置消息体" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
producer.send(messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
producer.shutdown();
}
}
批量异步发送消息到某一个topic,在方法(7)的基础上增加timeout参数来设置超时时间。
(9)void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;one-way模式,异步发送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生产者组",
new AclClientRPCHook(new SessionCredentials("用户名","密码")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", ("这里设置消息体" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的属性的键", "消息的属性的值");
sendMessage.setKeys("消息的key");
producer.sendOneway(sendMessage);
producer.shutdown();
}
}
使用one-way模式异步发送消息。
总结与展望至此,使用RocketMQTemplate和DefaultMQProducer发送普通消息的全部方法就讲解完了,本文的主要目的是帮助读者快速学习使用RocketMQ发送普通消息,本文总结了所有的发送普通消息的方法以满足实际工作中不同的业务场景。RocketMQTemplate和DefaultMQProducer中还有一些发送消息的方法是用来发送顺序、定时/延时消息的(DefaultMQProducer不能用来发送事务消息),之后我会继续写文章来讲解这些方法以及所有的消费消息的方法。
这是我在写的第一篇原创文章,之后我还准备写一些使用RocketMQ时踩过的坑,最后可能会写一些RocketMQ更底层的东西,我还准备写一些webflux、springCloudGateway这些我比较感兴趣的东西,希望看到这篇文章的人能和我一起成长。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流