请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

回查事务函数checkLocalTransaction 会执行多次,正常吗???

debug 断点跟 大目老师的一样

kill 掉 content-center 后 重启,多次点击 F9,会多次执行 checkLocalTransaction 函数,正常吗?

“加积分” 也执行了好几次。

https://img1.sycdn.imooc.com//szimg/5f8ff8b90970453114920250.jpg

我的 @RocketMQTransactionListener注解 没有txProducerGroup 属性

我的代码

public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {
   //1.查询 share 是否存在
   Share share = this.shareMapper.selectByPrimaryKey(id);
   if (share == null) {
       throw new IllegalArgumentException("分享不存在!");
   }
   if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
       throw new IllegalArgumentException("参数非法!已经审核或者审核未通过");
   }

   //2.发送事务消息,如果是 PASS
   if (AuditStatusEnum.PASS.equals(shareAuditDTO.getAuditStatusEnum())) {
       String transactionId = UUID.randomUUID().toString();
       this.rocketMQTemplate.sendMessageInTransaction(
               "add-bonus",
               MessageBuilder
                       .withPayload(
                               UserAddBonusMsgDTO.builder()
                                       .userId(share.getUserId())
                                       .bonus(50)
                                       .build()
                       )
                       .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                       .setHeaderIfAbsent("shared_id", id)
                       .build(),
               shareAuditDTO
       );
   }else {  //reject 状态,就不发送消息
       this.auditByIdInDB(id, shareAuditDTO);
   }
   return share;
}




@RocketMQTransactionListener()
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
   private final ShareService shareService;
   private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
   /**
    * 发送事务消息函数 sendMessageInTransaction(String destination, Message<?> message, Object arg)
    * 参数对应起来的
    */
   @Override
   public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
       MessageHeaders headers= message.getHeaders();

       String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
       Integer sharedId = Integer.valueOf((String) headers.get("shared_id"));

       try {
           this.shareService.auditByIdWithRocketMqLog(sharedId, (ShareAuditDTO) arg,transactionId);
           return RocketMQLocalTransactionState.COMMIT;
       }catch (Exception e){
           return RocketMQLocalTransactionState.ROLLBACK;
       }
   }

   @Override
   public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
       MessageHeaders  headers= message.getHeaders();

       String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
       Integer sharedId = Integer.valueOf((String) headers.get("shared_id"));

       RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(
               RocketmqTransactionLog.builder()
                       .transactionId(transactionId).
                       build()
       );
       if(rocketmqTransactionLog != null){
           return RocketMQLocalTransactionState.COMMIT;
       }

       return RocketMQLocalTransactionState.ROLLBACK;
   }
}

正在回答 回答被采纳积分+3

1回答

大目 2020-10-22 08:23:35

您好,正常的,rocketmq会定时扫描半消息,并检查本地事务有没有提交。

0 回复 有任何疑惑可以回复我~
  • 提问者 hthonor #1
    我以为半消息被消费了一次,就不会再消费了呢
    回复 有任何疑惑可以回复我~ 2020-10-22 11:15:33
  • 大目 回复 提问者 hthonor #2
    rocketmq的事务消息有重试机制
    回复 有任何疑惑可以回复我~ 2020-10-24 20:19:15
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信