RockerMQ分布式事务
rockectMQ分布式事务
流程图
文件结构
执行一个订单扣费发货流程
执行sql
CREATE TABLE `t_trade_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`goods_id` int(11) NOT NULL COMMENT '商品ID',
`goods_name` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '商品名称',
`status` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '状态',
`create_time` datetime(0) NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
CREATE TABLE `t_transaction_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`transaction_Id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '事务id',
`remark` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '备注',
PRIMARY KEY (`id`, `transaction_Id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
创建实体
@Data
@TableName("t_transaction_log")
public class TransactionLog implements Serializable {
private static final long serialVersionUID = 1268216478456291093L;
@TableId(value = "ID", type = IdType.AUTO)
private Long id;
@TableField("TRANSACTION_ID")
private String transactionId;
@TableField("REMARK")
private String remark;
}
@Data
@TableName("t_trade_log")
public class TradeLog implements Serializable {
private static final long serialVersionUID = 3902838426348137002L;
@TableId(value = "ID", type = IdType.AUTO)
private Long id;
@TableField("GOODS_ID")
private String goodsId;
@TableField("GOODS_NAME")
private String goodsName;
@TableField("STATUS")
private String status;
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@TableField("CREATE_TIME")
private Date createTime;
}
1.生产端
service
public interface ITradeLogService extends IService<TradeLog> {
void orderAndPay(TradeLog tradeLog);
void pay(TradeLog tradeLog, String transactionId);
}
@Slf4j
@Service("tradeLogService")
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class TradeLogServiceImpl extends ServiceImpl<TradeLogMapper, TradeLog> implements ITradeLogService {
private final RocketMQTemplate rocketMQTemplate;
private final TransactionLogMapper transactionLogMapper;
@Override
public void orderAndPay(TradeLog tradeLog) {
// 检测库存
log.info("检测商品Id为{},名称为{}的商品库存,库存充足", tradeLog.getGoodsId(), tradeLog.getGoodsName());
String transactionId = UUID.randomUUID().toString();
// 往RocketMQ发送事务消息
this.rocketMQTemplate.sendMessageInTransaction(
"pay-success", // 事务消息topic
MessageBuilder.withPayload(tradeLog)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build(), // 消息
tradeLog // 额外参数,供后续回调使用
);
}
@Override
@Transactional
public void pay(TradeLog tradeLog, String transactionId) {
tradeLog.setCreateTime(new Date());
tradeLog.setStatus("下单并支付成功");
// 保存支付日志
this.save(tradeLog);
log.info("用户已经下单并支付成功商品ID为{},名称为{}的商品", tradeLog.getGoodsId(), tradeLog.getGoodsName());
// 记录事务日志
TransactionLog transactionLog = new TransactionLog();
transactionLog.setTransactionId(transactionId);
String remark = String.format("事务ID为%s的本地事务执行成功", transactionId);
transactionLog.setRemark(remark);
transactionLogMapper.insert(transactionLog);
log.info("事务ID为{}的本地事务执行成功", transactionId);
}
}
RocketMQ消息监听器
@Slf4j
@Component
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class MyRocketMQListener implements RocketMQLocalTransactionListener {
private final ITradeLogService tradeLogService;
private final TransactionLogMapper transactionLogMapper;
/**
* 执行本地事务
*
* @param message 消息
* @param o 额外参数
* @return RocketMQ事务状态
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
MessageHeaders headers = message.getHeaders();
String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
try {
TradeLog tradeLog = (TradeLog) o;
this.tradeLogService.pay(tradeLog, transicationId); // 对应图中第3步,执行本地事务
log.info("本地事务执行成功,往RocketMQ发送COMMIT");
return RocketMQLocalTransactionState.COMMIT; // 对应图中第4步,COMMIT,半消息经过COMMIT后,消息消费端就可以消费这条消息了
} catch (Exception e) {
e.printStackTrace();
log.info("本地事务回滚,往RocketMQ发送ROLLBACK", e);
return RocketMQLocalTransactionState.ROLLBACK; // 对应途中第4步,ROLLBACK
}
}
/**
* RocketMQ回查本地事务状态,这个过程对应图中第5步
*
* @param message 消息
* @return RocketMQ事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("RocketMQ事务状态回查");
// 从数据库中根据事务Id查询对应的事务日志,对应图中第6步
TransactionLog transactionLog = transactionLogMapper.selectOne(
new LambdaQueryWrapper<TransactionLog>().eq(TransactionLog::getTransactionId, transicationId)
);
// 对应图中的第7步骤
return transactionLog != null ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
}
controller
@RestController
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class RocketMqTestController {
private final ITradeLogService tradeLogService;
@GetMapping("pay")
public void orderAndPay(TradeLog tradeLog) {
this.tradeLogService.orderAndPay(tradeLog);
}
}
2.消费端
public interface ITradeLogService extends IService<TradeLog> {
void packageAndSend(TradeLog tradeLog);
}
@Slf4j
@Service("tradeLogService")
public class TradeLogServiceImpl extends ServiceImpl<TradeLogMapper, TradeLog> implements ITradeLogService {
@Override
@Transactional
public void packageAndSend(TradeLog tradeLog) {
TradeLog tl = new TradeLog();
tl.setGoodsId(tradeLog.getGoodsId());
tl.setGoodsName(tradeLog.getGoodsName());
tl.setStatus("打包完毕,开始物流配送!");
tl.setCreateTime(new Date());
this.save(tl);
log.info("商品ID为{},名称为{}的商品打包完毕,开始物流配送", tradeLog.getGoodsId(), tradeLog.getGoodsName());
}
}
监听器
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "test-group", topic = "pay-success")
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class MyRocketMQListener implements RocketMQListener<TradeLog> {
private final ITradeLogService tradeLogService;
@Override
public void onMessage(TradeLog tradeLog) {
log.info("监听到用户已经下单并支付成功ID为{},名称为{}的商品", tradeLog.getGoodsId(), tradeLog.getGoodsName());
this.tradeLogService.packageAndSend(tradeLog);
}
}
3.开始测试
3.1正常测试
数据库正常显示
3.2 异常测试
在生产者的TradeLogServiceImpl的pay方法加入抛出异常代码
@Override
@Transactional
public void pay(TradeLog tradeLog, String transactionId) {
tradeLog.setCreateTime(new Date());
tradeLog.setStatus("下单并支付成功");
// 保存支付日志
this.save(tradeLog);
log.info("用户已经下单并支付成功商品ID为{},名称为{}的商品", tradeLog.getGoodsId(), tradeLog.getGoodsName());
// 记录事务日志
TransactionLog transactionLog = new TransactionLog();
transactionLog.setTransactionId(transactionId);
String remark = String.format("事务ID为%s的本地事务执行成功", transactionId);
transactionLog.setRemark(remark);
transactionLogMapper.insert(transactionLog);
log.info("事务ID为{}的本地事务执行成功", transactionId);
throw new RuntimeException("抛出一个异常");
}
然后执行,查看
日志打印
2021-05-25 00:04:58.637 INFO 68453 --- [nio-6031-exec-2] c.yz.alibaba.listner.MyRocketMQListener : 本地事务回滚,往RocketMQ发送ROLLBACK
3.3 异常回查
抛出异常的代码删除,并且打上断点
执行到断点,打印
2021-05-25 00:10:38.464 INFO 71122 --- [nio-6031-exec-1] c.y.a.service.impl.TradeLogServiceImpl : 检测商品Id为123,名称为测试的商品库存,库存充足2021-05-25 00:10:38.782 INFO 71122 --- [nio-6031-exec-1] c.y.a.service.impl.TradeLogServiceImpl : 用户已经下单并支付成功商品ID为123,名称为测试的商品2021-05-25 00:10:38.790 INFO 71122 --- [nio-6031-exec-1] c.y.a.service.impl.TradeLogServiceImpl : 事务ID为f14ff797-26ee-4c3a-a46e-6209c347f3d6的本地事务执行成功
终端杀死生产者进程
➜ ~ lsof -i tcp:6031COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAMEPostman 2216 yz 81u IPv6 0x3948355ab07483a9 0t0 TCP localhost:62594->localhost:6031 (ESTABLISHED)java 71122 yz 184u IPv6 0x3948355aae8dc3a9 0t0 TCP *:6031 (LISTEN)java 71122 yz 209u IPv6 0x3948355ab0748a09 0t0 TCP localhost:6031->localhost:62594 (ESTABLISHED)➜ ~ kill -9 71122
启动生产者服务,日志打印
2021-05-25 00:12:35.495 INFO 71943 --- [pool-1-thread-1] c.yz.alibaba.listner.MyRocketMQListener : RocketMQ事务状态回查
数据库