RockeMQ发送消息
1.使用spring-boot-rocketmq 发送消息
1.1消息生产者spring-cloud-alibaba-rocketmq-producer
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
application.yml
spring:
application:
name: spring-cloud-alibaba-rocketmq-producer
server:
port: 6031
rocketmq:
name-server: 127.0.0.1:9876
producer:
#必填
group: test-group
RocketMqTestController
@RestController
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class RocketMqTestController {
private final RocketMQTemplate rocketMQTemplate;
@GetMapping("senStr")
public String senStr(String str) {
rocketMQTemplate.convertAndSend("add-bonus", str);
return str;
}
@GetMapping("senObject")
public ShareDTO senObject(String str) {
ShareDTO shareDTO = ShareDTO.builder().id(1).name(str).thisDate(new Date()).build();
rocketMQTemplate.convertAndSend("add-bonus",shareDTO);
return shareDTO;
}
}
ShareDTO
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class ShareDTO {
private Integer id;
private String name;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date thisDate;
}
1.2消息生产者spring-cloud-alibaba-rocketmq-consumer
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
application.yml
spring:
application:
name: spring-cloud-alibaba-rocketmq-producer
server:
port: 6031
rocketmq:
name-server: 127.0.0.1:9876
producer:
#必填
group: test-group
BonusListener
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "test-group",topic = "add-bonus")
public class BonusListener implements RocketMQListener<Object> {
@Override
public void onMessage(Object o) {
log.info("接收参数:{}",o);
}
}
2.使用spring-cloud-starter-stream-rocketmq 发送消息
2.1 spring-cloud-alibaba-stream-rocketmq-producer 生产者
pom.xml
<!-- RocketMQ 依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
application.yml
spring: application: name: spring-cloud-alibaba-rocketmq-producer # RocketMQ 相关配置 stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output1: {destination: test-topic1, content-type: application/json} output2: {destination: test-topic2, content-type: application/json}server: port: 6033
MainController
/** * @author linke * @date 2020-01-19 下午 21:08 * @description */@RestController@RequiredArgsConstructor(onConstructor = @_(@Autowired))public class MainController { private final ProduceService produceService; @RequestMapping(value = "/send", method = RequestMethod.GET) public String send(String msg) { produceService.send(msg); return "字符串消息发送成功!"; } @RequestMapping(value = "/sendWithTags", method = RequestMethod.GET) public String sendWithTags(String msg) { produceService.sendWithTags(msg, "tagStr"); return "带tag字符串消息发送成功!"; } @RequestMapping(value = "/sendObject", method = RequestMethod.GET) public String sendObject(int index) { produceService.sendObject(new ShareDTO(index, "foo",new Date()), "tagObj"); return "Object对象消息发送成功!"; }}
MySource
public interface MySource { @Output("output1") MessageChannel output1(); @Output("output2") MessageChannel output2();}
ProduceService
@Slf4j@Service@RequiredArgsConstructor(onConstructor = @_(@Autowired))public class ProduceService { private final MySource source; /** * 发送字符串 * * @param msg */ public void send(String msg) { Message message = MessageBuilder.withPayload(msg) .build(); source.output1().send(message); } /** * 发送带tag的字符串 * * @param msg * @param tag */ public void sendWithTags(String msg, String tag) { Message message = MessageBuilder.withPayload(msg) .setHeader(RocketMQHeaders.TAGS, tag) .build(); source.output1().send(message); } /** * 发送对象 * * @param msg * @param tag * @param <T> */ public <T> void sendObject(T msg, String tag) { Message message = MessageBuilder.withPayload(msg) .setHeader(RocketMQHeaders.TAGS, tag) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); source.output2().send(message); }}
SrpingCloudAlibabaStreamRocketProducerApplication
@SpringBootApplication@EnableBinding({MySource.class})public class SrpingCloudAlibabaStreamRocketProducerApplication { public static void main(String[] args) { SpringApplication.run(SrpingCloudAlibabaStreamRocketProducerApplication.class, args); }}
2.2 spring-cloud-alibaba-stream-rocketmq-consumer消费者
pom.xml
<!-- RocketMQ 依赖 --><dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <exclusions> <exclusion> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> </exclusion> </exclusions></dependency><dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>5.2.1.RELEASE</version></dependency>
application.yml
spring: application: name: spring-cloud-alibaba-rocketmq-producer # RocketMQ 相关配置 stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input1: {consumer.orderly: true} #是否排序 input2: {consumer.tags: tagStr} #订阅 带tag值为tagStr的字符串 input3: {consumer.tags: tagObj} #订阅 带tag值为tabObj的字符串 bindings: input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1} input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxAttempts: 1} input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxAttempts: 1}server: port: 6030
ReceiveService
@Service@Slf4jpublic class ReceiveService { @StreamListener("input1") public void receiveInput1(String receiveMsg) { log.info("input1 接收到了消息:" + receiveMsg); } @StreamListener("input2") public void receiveInput2(String receiveMsg) { log.info("input2 接收到了消息:" + receiveMsg); } @StreamListener("input3") public void receiveInput3(@Payload ShareDTO foo) { log.info("input3 接收到了消息:" + foo.toString()); } @StreamListener("input4") public void receiveInput4(@Payload ShareDTO foo) { log.info("input4 接收到了消息:" + foo.toString()); }}
MyConsumer
public interface MyConsumer { @Input("input1") SubscribableChannel input1(); @Input("input2") SubscribableChannel input2(); @Input("input3") SubscribableChannel input3(); @Input("input4") SubscribableChannel input4();}
SrpingCloudAlibabaStreamRocketConsumerApplication
@SpringBootApplication@EnableBinding({MyConsumer.class})public class SrpingCloudAlibabaStreamRocketConsumerApplication { public static void main(String[] args) { SpringApplication.run(SrpingCloudAlibabaStreamRocketConsumerApplication.class, args); }}
3.spring-cloud-starter-stream-rocketmq 消息过滤
生产者
生产者设置一下header,比如my-header,值根据你的需要填写:
@Autowiredprivate Source source;public String testStream() { this.source.output() .send( MessageBuilder .withPayload("消息体") .setHeader("my-header","你的header") .build() ); return "success";}
消费者
@Service@Slf4jpublic class TestStreamConsumer { @StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'") public void receive(String messageBody) { log.info("通过stream收到了消息:messageBody ={}", messageBody); }}
4.异常处理
4.1全局异常
@Service@Slf4jpublic class ReceiveService { @StreamListener("input1") public void receiveInput1(String receiveMsg) { log.info("input1 接收到了消息:" + receiveMsg); } @StreamListener("input2") public void receiveInput2(String receiveMsg) { log.info("input2 接收到了消息:" + receiveMsg); } @StreamListener("input3") public void receiveInput3(@Payload ShareDTO foo) { log.info("input3 接收到了消息:" + foo.toString()); } @StreamListener("input4") public void receiveInput4(@Payload ShareDTO foo) { log.info("input4 接收到了消息:" + foo.toString()); } /** * 全局异常 * @param message */ @StreamListener("errorChannel") public void error(Message<?> message) { ErrorMessage errorMessage = (ErrorMessage) message; System.out.println("Handling ERROR: " + errorMessage); }}
4.2 局部异常
配置
spring: cloud: stream: bindings: input: destination: my-destination group: my-group output: destination: my-destination
代码
@Slf4j@SpringBootApplication@EnableBinding({Processor.class})@EnableSchedulingpublic class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @StreamListener(value = Processor.INPUT) public void handle(String body) { throw new RuntimeException("x"); } @ServiceActivator(inputChannel = "my-destination.my-group.errors") public void handleError(ErrorMessage message) { Throwable throwable = message.getPayload(); log.error("截获异常", throwable); Message<?> originalMessage = message.getOriginalMessage(); assert originalMessage != null; log.info("原始消息体 = {}", new String((byte[]) originalMessage.getPayload())); } @Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test() { return () -> new GenericMessage<>("adfdfdsafdsfa"); }}