Docker安装RockerMq

Docker安装RockerMq

docker-compose 安装RockerMq

1.编写docker-compose.yml

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - /Users/docker/yz/rocketmq/rocketmq-server/logs:/opt/logs
      - /Users/docker/yz/rocketmq/rocketmq-server/store:/opt/store
    networks:
        rmq:
          aliases:
            - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - /Users/docker/yz/rocketmq/rocketmq-broker/logs:/opt/logs
      - /Users/docker/yz/rocketmq/rocketmq-broker/store:/opt/store
      - /Users/docker/yz/rocketmq/rocketmq-broker/brokerconf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8092:8080
    environment:
        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

2.创建broker.conf

放置/Users/docker/yz/rocketmq/rocketmq-broker/brokerconf 目录下

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.


# 所属集群名字
brokerClusterName=DefaultCluster

# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a

# 0 表示 Master,> 0 表示 Slave
brokerId=0

# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.3.33

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true

# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

# Broker 对外服务的监听端口
listenPort=10911

# 删除文件时间点,默认凌晨4点
deleteWhen=04

# 文件保留时间,默认48小时
fileReservedTime=120

# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128

3.修改配置

修改 brokerIP1配置,使用内网IP. 我的是Mac系统,不修改这个配置会出现以下问题

RocketmqRemoting closeChannel: close the connection to remote address[\] result: true

4.启动

输入docker-composer up -d

5.进入控制台查看

查看 http://192.168.3.33:8092/

6.创建Dome测试

6.1创建一个父项目spring-cloud-alibaba-rocketmq

导包
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>5.2.1.RELEASE</version>
</dependency>

因为我使用的alibabacloud 2.1.1.RELEASE版本,使用2.1.1版本会出现一个启动bug

Description:

An attempt was made to call a method that does not exist. The attempt was made from the following location:

    com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder.createProducerMessageHandler(RocketMQMessageChannelBinder.java:172)

The following method did not exist:

    org.springframework.integration.channel.AbstractMessageChannel.getChannelInterceptors()Ljava/util/List;

The method's class, org.springframework.integration.channel.AbstractMessageChannel, is available from the following locations:

    jar:file:/xzb/5_course/xzb-course.jar!/BOOT-INF/lib/spring-integration-core-5.3.2.RELEASE.jar!/org/springframework/integration/channel/AbstractMessageChannel.class

The class hierarchy was loaded from the following locations:

    org.springframework.integration.channel.AbstractMessageChannel: jar:file:/xzb/5_course/xzb-course.jar!/BOOT-INF/lib/spring-integration-core-5.3.2.RELEASE.jar!/
    org.springframework.integration.context.IntegrationObjectSupport: jar:file:/xzb/5_course/xzb-course.jar!/BOOT-INF/lib/spring-integration-core-5.3.2.RELEASE.jar!/


Action:

Correct the classpath of your application so that it contains a single, compatible version of org.springframework.integration.channel.AbstractMessageChannel

原因是 AbstractMessageChannel的getChannelInterceptors方法不存在,需要更换spring-integration-core版本

6.2 创建生产者spring-cloud-alibaba-rocketmq-produce

application.yml
spring:
  jackson:
    time-zone: GMT+8
    date-format: yyyy-MM-dd HH:mm:ss
  cloud:
    # RocketMQ 相关配置
    stream:
      rocketmq:
        binder:
          name-server: 192.168.75.129:9876
      bindings:
        #自定义的名称 # test-group(一级分类)
        test-group: {destination: test-group,content-type: application/json}
创建MyProduce
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MyProduce {

    @Output("test-group")
    MessageChannel log();
}
在应用启动入口绑定上面的MyProduce.java
import com.lingkang.rocketmqdemo.mq.MyProduce;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding({MyProduce.class})
public class SrpingCloudAlibabaRocketProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(SrpingCloudAlibabaRocketProducerApplication.class, args);
    }
}
controller中直接装配即可
import com.lingkang.rocketmqdemo.mq.MyProduce;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author linke
 * @date 2020-01-19 下午 21:08
 * @description
 */
@RestController
public class MainController {

    @Autowired
    private MyProduce myProduce;

    @GetMapping("send")
    public Object send(String msg) throws Exception {
        myProduce.log().send(MessageBuilder.withPayload(msg).build());
        return "";
    }

    @GetMapping("send1")
    public Object send1(String msg) {
        //带上标签发送
        MessageBuilder builder = MessageBuilder.withPayload(msg)
                .setHeader(RocketMQHeaders.TAGS, "log")
                .setHeader(RocketMQHeaders.KEYS, "my-key")
                .setHeader("DELAY", "1");
        Message message = builder.build();
        myProduce.log().send(message);
        return message;
    }
}

6.3 创建消费者spring-cloud-alibaba-rocketmq-consumer

application.yml
spring:
  jackson:
    time-zone: GMT+8
    date-format: yyyy-MM-dd HH:mm:ss
  cloud:
    # RocketMQ 相关配置
    stream:
      rocketmq:
        binder:
          name-server: 192.168.75.129:9876
      bindings:
        #自定义的名称 # test-group(一级分类)
        test-group: {destination: test-group,content-type: application/json}
        # consumer.maxAttempts
        # 消息最大可以被尝试消费的次数,包含第一次投递
        # 设为 1,表示不重试,注意该值必须大于 0
        input-consumer: {destination: test-group, content-type: text/plain,
                         group: test-group, consumer.maxAttempts: 1}
创建MyConsumer
public interface MyConsumer {
    @Input("input-consumer")
    MessageChannel log();
}
创建接收监听MyConsumerReceive
@Component
public class MyConsumerReceive {

    @StreamListener("input-consumer")
    public void receiveConsumer(Object msg) {
        System.out.println("消息消费:" + msg);
    }
}
在应用启动入口绑定上面的MyProduce.java
@SpringBootApplication
@EnableBinding({MyConsumer.class})
public class SrpingCloudAlibabaRocketConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(SrpingCloudAlibabaRocketConsumerApplication.class, args);
    }
}
项目结构