Docker安装RockerMq
docker-compose 安装RockerMq
1.编写docker-compose.yml
version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- /Users/docker/yz/rocketmq/rocketmq-server/logs:/opt/logs
- /Users/docker/yz/rocketmq/rocketmq-server/store:/opt/store
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- /Users/docker/yz/rocketmq/rocketmq-broker/logs:/opt/logs
- /Users/docker/yz/rocketmq/rocketmq-broker/store:/opt/store
- /Users/docker/yz/rocketmq/rocketmq-broker/brokerconf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8092:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
2.创建broker.conf
放置/Users/docker/yz/rocketmq/rocketmq-broker/brokerconf 目录下
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 所属集群名字
brokerClusterName=DefaultCluster
# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a
# 0 表示 Master,> 0 表示 Slave
brokerId=0
# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.3.33
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
3.修改配置
修改 brokerIP1配置,使用内网IP. 我的是Mac系统,不修改这个配置会出现以下问题
RocketmqRemoting closeChannel: close the connection to remote address[\] result: true
4.启动
输入docker-composer up -d
5.进入控制台查看
查看 http://192.168.3.33:8092/
6.创建Dome测试
6.1创建一个父项目spring-cloud-alibaba-rocketmq
导包
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
因为我使用的alibabacloud 2.1.1.RELEASE版本,使用2.1.1版本会出现一个启动bug
Description:
An attempt was made to call a method that does not exist. The attempt was made from the following location:
com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder.createProducerMessageHandler(RocketMQMessageChannelBinder.java:172)
The following method did not exist:
org.springframework.integration.channel.AbstractMessageChannel.getChannelInterceptors()Ljava/util/List;
The method's class, org.springframework.integration.channel.AbstractMessageChannel, is available from the following locations:
jar:file:/xzb/5_course/xzb-course.jar!/BOOT-INF/lib/spring-integration-core-5.3.2.RELEASE.jar!/org/springframework/integration/channel/AbstractMessageChannel.class
The class hierarchy was loaded from the following locations:
org.springframework.integration.channel.AbstractMessageChannel: jar:file:/xzb/5_course/xzb-course.jar!/BOOT-INF/lib/spring-integration-core-5.3.2.RELEASE.jar!/
org.springframework.integration.context.IntegrationObjectSupport: jar:file:/xzb/5_course/xzb-course.jar!/BOOT-INF/lib/spring-integration-core-5.3.2.RELEASE.jar!/
Action:
Correct the classpath of your application so that it contains a single, compatible version of org.springframework.integration.channel.AbstractMessageChannel
原因是 AbstractMessageChannel的getChannelInterceptors方法不存在,需要更换spring-integration-core版本
6.2 创建生产者spring-cloud-alibaba-rocketmq-produce
application.yml
spring:
jackson:
time-zone: GMT+8
date-format: yyyy-MM-dd HH:mm:ss
cloud:
# RocketMQ 相关配置
stream:
rocketmq:
binder:
name-server: 192.168.75.129:9876
bindings:
#自定义的名称 # test-group(一级分类)
test-group: {destination: test-group,content-type: application/json}
创建MyProduce
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MyProduce {
@Output("test-group")
MessageChannel log();
}
在应用启动入口绑定上面的MyProduce.java
import com.lingkang.rocketmqdemo.mq.MyProduce;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableBinding({MyProduce.class})
public class SrpingCloudAlibabaRocketProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SrpingCloudAlibabaRocketProducerApplication.class, args);
}
}
controller中直接装配即可
import com.lingkang.rocketmqdemo.mq.MyProduce;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author linke
* @date 2020-01-19 下午 21:08
* @description
*/
@RestController
public class MainController {
@Autowired
private MyProduce myProduce;
@GetMapping("send")
public Object send(String msg) throws Exception {
myProduce.log().send(MessageBuilder.withPayload(msg).build());
return "";
}
@GetMapping("send1")
public Object send1(String msg) {
//带上标签发送
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "log")
.setHeader(RocketMQHeaders.KEYS, "my-key")
.setHeader("DELAY", "1");
Message message = builder.build();
myProduce.log().send(message);
return message;
}
}
6.3 创建消费者spring-cloud-alibaba-rocketmq-consumer
application.yml
spring:
jackson:
time-zone: GMT+8
date-format: yyyy-MM-dd HH:mm:ss
cloud:
# RocketMQ 相关配置
stream:
rocketmq:
binder:
name-server: 192.168.75.129:9876
bindings:
#自定义的名称 # test-group(一级分类)
test-group: {destination: test-group,content-type: application/json}
# consumer.maxAttempts
# 消息最大可以被尝试消费的次数,包含第一次投递
# 设为 1,表示不重试,注意该值必须大于 0
input-consumer: {destination: test-group, content-type: text/plain,
group: test-group, consumer.maxAttempts: 1}
创建MyConsumer
public interface MyConsumer {
@Input("input-consumer")
MessageChannel log();
}
创建接收监听MyConsumerReceive
@Component
public class MyConsumerReceive {
@StreamListener("input-consumer")
public void receiveConsumer(Object msg) {
System.out.println("消息消费:" + msg);
}
}
在应用启动入口绑定上面的MyProduce.java
@SpringBootApplication
@EnableBinding({MyConsumer.class})
public class SrpingCloudAlibabaRocketConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SrpingCloudAlibabaRocketConsumerApplication.class, args);
}
}