RockeMQ发送消息

RockeMQ发送消息

1.使用spring-boot-rocketmq 发送消息

1.1消息生产者spring-cloud-alibaba-rocketmq-producer

pom.xml

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

application.yml

spring:
  application:
    name: spring-cloud-alibaba-rocketmq-producer
server:
  port: 6031
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    #必填
    group: test-group

RocketMqTestController

@RestController
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class RocketMqTestController {
    private final RocketMQTemplate rocketMQTemplate;

    @GetMapping("senStr")
    public String senStr(String str) {
        rocketMQTemplate.convertAndSend("add-bonus", str);
        return str;
    }

    @GetMapping("senObject")
    public ShareDTO senObject(String str) {
      ShareDTO shareDTO =  ShareDTO.builder().id(1).name(str).thisDate(new Date()).build();
        rocketMQTemplate.convertAndSend("add-bonus",shareDTO);
        return shareDTO;
    }
}

ShareDTO

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class ShareDTO {
    private Integer id;
    private String name;
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date thisDate;
}

1.2消息生产者spring-cloud-alibaba-rocketmq-consumer

pom.xml

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

application.yml

spring:
  application:
    name: spring-cloud-alibaba-rocketmq-producer
server:
  port: 6031
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    #必填
    group: test-group

BonusListener

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "test-group",topic = "add-bonus")
public class BonusListener implements RocketMQListener<Object> {
    @Override
    public void onMessage(Object o) {
        log.info("接收参数:{}",o);
    }
}

2.使用spring-cloud-starter-stream-rocketmq 发送消息

2.1 spring-cloud-alibaba-stream-rocketmq-producer 生产者

pom.xml

<!-- RocketMQ 依赖 -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>5.2.1.RELEASE</version>
</dependency>

application.yml

spring:  application:    name: spring-cloud-alibaba-rocketmq-producer    # RocketMQ 相关配置    stream:      rocketmq:        binder:          name-server: 127.0.0.1:9876      bindings:        output1: {destination: test-topic1, content-type: application/json}        output2: {destination: test-topic2, content-type: application/json}server:  port: 6033

MainController

/** * @author linke * @date 2020-01-19 下午 21:08 * @description */@RestController@RequiredArgsConstructor(onConstructor = @_(@Autowired))public class MainController {    private final ProduceService produceService;    @RequestMapping(value = "/send", method = RequestMethod.GET)    public String send(String msg) {        produceService.send(msg);        return "字符串消息发送成功!";    }    @RequestMapping(value = "/sendWithTags", method = RequestMethod.GET)    public String sendWithTags(String msg) {        produceService.sendWithTags(msg, "tagStr");        return "带tag字符串消息发送成功!";    }    @RequestMapping(value = "/sendObject", method = RequestMethod.GET)    public String sendObject(int index) {        produceService.sendObject(new ShareDTO(index, "foo",new Date()), "tagObj");        return "Object对象消息发送成功!";    }}

MySource

public interface MySource {    @Output("output1")    MessageChannel output1();    @Output("output2")    MessageChannel output2();}

ProduceService

@Slf4j@Service@RequiredArgsConstructor(onConstructor = @_(@Autowired))public class ProduceService {    private final MySource source;    /**     * 发送字符串     *     * @param msg     */    public void send(String msg) {        Message message = MessageBuilder.withPayload(msg)                .build();        source.output1().send(message);    }    /**     * 发送带tag的字符串     *     * @param msg     * @param tag     */    public void sendWithTags(String msg, String tag) {        Message message = MessageBuilder.withPayload(msg)                .setHeader(RocketMQHeaders.TAGS, tag)                .build();        source.output1().send(message);    }    /**     * 发送对象     *     * @param msg     * @param tag     * @param <T>     */    public <T> void sendObject(T msg, String tag) {        Message message = MessageBuilder.withPayload(msg)                .setHeader(RocketMQHeaders.TAGS, tag)                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)                .build();        source.output2().send(message);    }}

SrpingCloudAlibabaStreamRocketProducerApplication

@SpringBootApplication@EnableBinding({MySource.class})public class SrpingCloudAlibabaStreamRocketProducerApplication {    public static void main(String[] args) {        SpringApplication.run(SrpingCloudAlibabaStreamRocketProducerApplication.class, args);    }}

2.2 spring-cloud-alibaba-stream-rocketmq-consumer消费者

pom.xml

<!-- RocketMQ 依赖 --><dependency>    <groupId>com.alibaba.cloud</groupId>    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>    <exclusions>        <exclusion>            <groupId>org.springframework.integration</groupId>            <artifactId>spring-integration-core</artifactId>        </exclusion>    </exclusions></dependency><dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-core</artifactId>    <version>5.2.1.RELEASE</version></dependency>

application.yml

spring:  application:    name: spring-cloud-alibaba-rocketmq-producer    # RocketMQ 相关配置    stream:      rocketmq:        binder:          name-server: 127.0.0.1:9876        bindings:          input1: {consumer.orderly: true}  #是否排序          input2: {consumer.tags: tagStr}   #订阅 带tag值为tagStr的字符串          input3: {consumer.tags: tagObj}   #订阅 带tag值为tabObj的字符串      bindings:        input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1}        input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxAttempts: 1}        input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxAttempts: 1}server:  port: 6030

ReceiveService

@Service@Slf4jpublic class ReceiveService {    @StreamListener("input1")    public void receiveInput1(String receiveMsg) {        log.info("input1 接收到了消息:" + receiveMsg);    }    @StreamListener("input2")    public void receiveInput2(String receiveMsg) {        log.info("input2 接收到了消息:" + receiveMsg);    }    @StreamListener("input3")    public void receiveInput3(@Payload ShareDTO foo) {        log.info("input3 接收到了消息:" + foo.toString());    }    @StreamListener("input4")    public void receiveInput4(@Payload ShareDTO foo) {        log.info("input4 接收到了消息:" + foo.toString());    }}

MyConsumer

public interface MyConsumer {    @Input("input1")    SubscribableChannel input1();    @Input("input2")    SubscribableChannel input2();    @Input("input3")    SubscribableChannel input3();    @Input("input4")    SubscribableChannel input4();}

SrpingCloudAlibabaStreamRocketConsumerApplication

@SpringBootApplication@EnableBinding({MyConsumer.class})public class SrpingCloudAlibabaStreamRocketConsumerApplication {    public static void main(String[] args) {        SpringApplication.run(SrpingCloudAlibabaStreamRocketConsumerApplication.class, args);    }}

3.spring-cloud-starter-stream-rocketmq 消息过滤

生产者

生产者设置一下header,比如my-header,值根据你的需要填写:
@Autowiredprivate Source source;public String testStream() {  this.source.output()    .send(    MessageBuilder    .withPayload("消息体")    .setHeader("my-header","你的header")    .build()  );  return "success";}

消费者

@Service@Slf4jpublic class TestStreamConsumer {    @StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'")    public void receive(String messageBody) {        log.info("通过stream收到了消息:messageBody ={}", messageBody);    }}

4.异常处理

4.1全局异常
@Service@Slf4jpublic class ReceiveService {    @StreamListener("input1")    public void receiveInput1(String receiveMsg) {        log.info("input1 接收到了消息:" + receiveMsg);    }    @StreamListener("input2")    public void receiveInput2(String receiveMsg) {        log.info("input2 接收到了消息:" + receiveMsg);    }    @StreamListener("input3")    public void receiveInput3(@Payload ShareDTO foo) {        log.info("input3 接收到了消息:" + foo.toString());    }    @StreamListener("input4")    public void receiveInput4(@Payload ShareDTO foo) {        log.info("input4 接收到了消息:" + foo.toString());    }    /**     * 全局异常     * @param message     */    @StreamListener("errorChannel")    public void error(Message<?> message) {        ErrorMessage errorMessage = (ErrorMessage) message;        System.out.println("Handling ERROR: " + errorMessage);    }}
4.2 局部异常

配置

spring:  cloud:    stream:      bindings:        input:          destination: my-destination          group: my-group        output:          destination: my-destination

代码

@Slf4j@SpringBootApplication@EnableBinding({Processor.class})@EnableSchedulingpublic class ConsumerApplication {    public static void main(String[] args) {        SpringApplication.run(ConsumerApplication.class, args);    }    @StreamListener(value = Processor.INPUT)    public void handle(String body) {        throw new RuntimeException("x");    }    @ServiceActivator(inputChannel = "my-destination.my-group.errors")    public void handleError(ErrorMessage message) {        Throwable throwable = message.getPayload();        log.error("截获异常", throwable);        Message<?> originalMessage = message.getOriginalMessage();        assert originalMessage != null;        log.info("原始消息体 = {}", new String((byte[]) originalMessage.getPayload()));    }    @Bean    @InboundChannelAdapter(value = Processor.OUTPUT,            poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))    public MessageSource<String> test() {        return () -> new GenericMessage<>("adfdfdsafdsfa");    }}