debug 断点跟 大目老师的一样
kill 掉 content-center 后 重启,多次点击 F9,会多次执行 checkLocalTransaction 函数,正常吗?
“加积分” 也执行了好几次。
我的 @RocketMQTransactionListener注解 没有txProducerGroup 属性
我的代码
public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {
//1.查询 share 是否存在
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("参数非法!已经审核或者审核未通过");
}
//2.发送事务消息,如果是 PASS
if (AuditStatusEnum.PASS.equals(shareAuditDTO.getAuditStatusEnum())) {
String transactionId = UUID.randomUUID().toString();
this.rocketMQTemplate.sendMessageInTransaction(
"add-bonus",
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeaderIfAbsent("shared_id", id)
.build(),
shareAuditDTO
);
}else { //reject 状态,就不发送消息
this.auditByIdInDB(id, shareAuditDTO);
}
return share;
}
@RocketMQTransactionListener()
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 发送事务消息函数 sendMessageInTransaction(String destination, Message<?> message, Object arg)
* 参数对应起来的
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
MessageHeaders headers= message.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer sharedId = Integer.valueOf((String) headers.get("shared_id"));
try {
this.shareService.auditByIdWithRocketMqLog(sharedId, (ShareAuditDTO) arg,transactionId);
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers= message.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer sharedId = Integer.valueOf((String) headers.get("shared_id"));
RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId).
build()
);
if(rocketmqTransactionLog != null){
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
登录后可查看更多问答,登录/注册
面向未来微服务:熟练掌握Spring Cloud Alibaba
了解课程