RockerMQ分布式事务

RockerMQ分布式事务

rockectMQ分布式事务

流程图

文件结构

执行一个订单扣费发货流程

执行sql

CREATE TABLE `t_trade_log`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `goods_id` int(11) NOT NULL COMMENT '商品ID',
  `goods_name` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '商品名称',
  `status` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '状态',
  `create_time` datetime(0) NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

CREATE TABLE `t_transaction_log`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `transaction_Id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '事务id',
  `remark` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '备注',
  PRIMARY KEY (`id`, `transaction_Id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

创建实体

@Data
@TableName("t_transaction_log")
public class TransactionLog implements Serializable {

    private static final long serialVersionUID = 1268216478456291093L;

    @TableId(value = "ID", type = IdType.AUTO)
    private Long id;
    @TableField("TRANSACTION_ID")
    private String transactionId;
    @TableField("REMARK")
    private String remark;
}
@Data
@TableName("t_trade_log")
public class TradeLog implements Serializable {

    private static final long serialVersionUID = 3902838426348137002L;

    @TableId(value = "ID", type = IdType.AUTO)
    private Long id;

    @TableField("GOODS_ID")
    private String goodsId;
    @TableField("GOODS_NAME")
    private String goodsName;
    @TableField("STATUS")
    private String status;

    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @TableField("CREATE_TIME")
    private Date createTime;
}

1.生产端

service

public interface ITradeLogService extends IService<TradeLog> {
    void orderAndPay(TradeLog tradeLog);
    void pay(TradeLog tradeLog, String transactionId);
}
@Slf4j
@Service("tradeLogService")
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class TradeLogServiceImpl extends ServiceImpl<TradeLogMapper, TradeLog> implements ITradeLogService {


    private final RocketMQTemplate rocketMQTemplate;

    private  final TransactionLogMapper transactionLogMapper;

    @Override
    public void orderAndPay(TradeLog tradeLog) {
        // 检测库存
        log.info("检测商品Id为{},名称为{}的商品库存,库存充足", tradeLog.getGoodsId(), tradeLog.getGoodsName());

        String transactionId = UUID.randomUUID().toString();
        // 往RocketMQ发送事务消息
        this.rocketMQTemplate.sendMessageInTransaction(
                "pay-success", // 事务消息topic
                MessageBuilder.withPayload(tradeLog)
                        .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                        .build(), // 消息
                tradeLog // 额外参数,供后续回调使用
        );
    }

    @Override
    @Transactional
    public void pay(TradeLog tradeLog, String transactionId) {
        tradeLog.setCreateTime(new Date());
        tradeLog.setStatus("下单并支付成功");
        // 保存支付日志
        this.save(tradeLog);
        log.info("用户已经下单并支付成功商品ID为{},名称为{}的商品", tradeLog.getGoodsId(), tradeLog.getGoodsName());
        // 记录事务日志
        TransactionLog transactionLog = new TransactionLog();
        transactionLog.setTransactionId(transactionId);
        String remark = String.format("事务ID为%s的本地事务执行成功", transactionId);
        transactionLog.setRemark(remark);
        transactionLogMapper.insert(transactionLog);
        log.info("事务ID为{}的本地事务执行成功", transactionId);
    }

}

RocketMQ消息监听器

@Slf4j
@Component
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class MyRocketMQListener implements RocketMQLocalTransactionListener {


    private final ITradeLogService tradeLogService;

    private final TransactionLogMapper transactionLogMapper;

    /**
     * 执行本地事务
     *
     * @param message 消息
     * @param o       额外参数
     * @return RocketMQ事务状态
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        MessageHeaders headers = message.getHeaders();
        String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        try {
            TradeLog tradeLog = (TradeLog) o;
            this.tradeLogService.pay(tradeLog, transicationId); // 对应图中第3步,执行本地事务
            log.info("本地事务执行成功,往RocketMQ发送COMMIT");
            return RocketMQLocalTransactionState.COMMIT; // 对应图中第4步,COMMIT,半消息经过COMMIT后,消息消费端就可以消费这条消息了
        } catch (Exception e) {
            e.printStackTrace();
            log.info("本地事务回滚,往RocketMQ发送ROLLBACK", e);
            return RocketMQLocalTransactionState.ROLLBACK; // 对应途中第4步,ROLLBACK
        }
    }

    /**
     * RocketMQ回查本地事务状态,这个过程对应图中第5步
     *
     * @param message 消息
     * @return RocketMQ事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("RocketMQ事务状态回查");
        // 从数据库中根据事务Id查询对应的事务日志,对应图中第6步
        TransactionLog transactionLog = transactionLogMapper.selectOne(
                new LambdaQueryWrapper<TransactionLog>().eq(TransactionLog::getTransactionId, transicationId)
        );
        // 对应图中的第7步骤
        return transactionLog != null ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
    }
}

controller

@RestController
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class RocketMqTestController {

    private final ITradeLogService tradeLogService;

    @GetMapping("pay")
    public void orderAndPay(TradeLog tradeLog) {
        this.tradeLogService.orderAndPay(tradeLog);
    }
}

2.消费端

public interface ITradeLogService extends IService<TradeLog> {
    void packageAndSend(TradeLog tradeLog);
}
@Slf4j
@Service("tradeLogService")
public class TradeLogServiceImpl extends ServiceImpl<TradeLogMapper, TradeLog> implements ITradeLogService {

    @Override
    @Transactional
    public void packageAndSend(TradeLog tradeLog) {
        TradeLog tl = new TradeLog();
        tl.setGoodsId(tradeLog.getGoodsId());
        tl.setGoodsName(tradeLog.getGoodsName());
        tl.setStatus("打包完毕,开始物流配送!");
        tl.setCreateTime(new Date());

        this.save(tl);
        log.info("商品ID为{},名称为{}的商品打包完毕,开始物流配送", tradeLog.getGoodsId(), tradeLog.getGoodsName());
    }
}

监听器

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "test-group", topic = "pay-success")
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class MyRocketMQListener implements RocketMQListener<TradeLog> {

    private  final ITradeLogService tradeLogService;

    @Override
    public void onMessage(TradeLog tradeLog) {
        log.info("监听到用户已经下单并支付成功ID为{},名称为{}的商品", tradeLog.getGoodsId(), tradeLog.getGoodsName());
        this.tradeLogService.packageAndSend(tradeLog);
    }
}

3.开始测试

3.1正常测试

数据库正常显示

3.2 异常测试

在生产者的TradeLogServiceImpl的pay方法加入抛出异常代码

 @Override
    @Transactional
    public void pay(TradeLog tradeLog, String transactionId) {
        tradeLog.setCreateTime(new Date());
        tradeLog.setStatus("下单并支付成功");
        // 保存支付日志
        this.save(tradeLog);
        log.info("用户已经下单并支付成功商品ID为{},名称为{}的商品", tradeLog.getGoodsId(), tradeLog.getGoodsName());
        // 记录事务日志
        TransactionLog transactionLog = new TransactionLog();
        transactionLog.setTransactionId(transactionId);
        String remark = String.format("事务ID为%s的本地事务执行成功", transactionId);
        transactionLog.setRemark(remark);
        transactionLogMapper.insert(transactionLog);
        log.info("事务ID为{}的本地事务执行成功", transactionId);
       throw  new RuntimeException("抛出一个异常");
}

然后执行,查看

日志打印

2021-05-25 00:04:58.637  INFO 68453 --- [nio-6031-exec-2] c.yz.alibaba.listner.MyRocketMQListener  : 本地事务回滚,往RocketMQ发送ROLLBACK
3.3 异常回查

抛出异常的代码删除,并且打上断点

执行到断点,打印

2021-05-25 00:10:38.464  INFO 71122 --- [nio-6031-exec-1] c.y.a.service.impl.TradeLogServiceImpl   : 检测商品Id为123,名称为测试的商品库存,库存充足2021-05-25 00:10:38.782  INFO 71122 --- [nio-6031-exec-1] c.y.a.service.impl.TradeLogServiceImpl   : 用户已经下单并支付成功商品ID为123,名称为测试的商品2021-05-25 00:10:38.790  INFO 71122 --- [nio-6031-exec-1] c.y.a.service.impl.TradeLogServiceImpl   : 事务ID为f14ff797-26ee-4c3a-a46e-6209c347f3d6的本地事务执行成功

终端杀死生产者进程

➜  ~ lsof -i tcp:6031COMMAND   PID USER   FD   TYPE             DEVICE SIZE/OFF NODE NAMEPostman  2216   yz   81u  IPv6 0x3948355ab07483a9      0t0  TCP localhost:62594->localhost:6031 (ESTABLISHED)java    71122   yz  184u  IPv6 0x3948355aae8dc3a9      0t0  TCP *:6031 (LISTEN)java    71122   yz  209u  IPv6 0x3948355ab0748a09      0t0  TCP localhost:6031->localhost:62594 (ESTABLISHED)➜  ~ kill -9 71122

启动生产者服务,日志打印

2021-05-25 00:12:35.495  INFO 71943 --- [pool-1-thread-1] c.yz.alibaba.listner.MyRocketMQListener  : RocketMQ事务状态回查

数据库