您好,您代码的:
com.sqlpay.contentcenter.rocketmq.AddBonusTransactionListener#executeLocalTransaction
MessageHeaders headers = message.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = (Integer) headers.get("share_id");里面:
Integer shareId = (Integer) headers.get("share_id");这一行应该是有问题的。RocketMQ比较奇葩,不管你设置的是啥,它都会强制转换成字符串。所以是没办法直接用Integer接收的。在这一行直接就会抛异常了。。。。。
所以得改成:
Integer shareId = Integer.valueOf((String) headers.get("share_id"));这是第一个问题。
第二,你的Service代码:
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
//1.查询share时候存在,或者当前审核状态不为待审核,抛出异常
Share existsShare = shareMapper.selectByPrimaryKey(id);
if (existsShare == null) {
throw new IllegalArgumentException("参数非法!该分享不存在");
}
if (!Objects.equals(AuditStatusEnum.NOT_YET.name(), existsShare.getAuditStatus())) {
throw new IllegalArgumentException("参数非法!该分享已审核通过或不通过");
}
//2.审核资源,设置状态
auditByIdInDb(id, auditDTO);
//3.如果通过,发送消息给rocketMq,让用户中心去消费为发布人添加积分
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
// 发送半消息
String transactionId = UUID.randomUUID().toString();
rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(existsShare.getUserId())
.bounds(50)
.build()
)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.build(),
auditDTO
);
} else {
this.auditByIdInDb(id, auditDTO);
}
return existsShare;
}
你都先auditByIdInDb完了,才发消息,那么,只要:
auditByIdInDb不出异常
rocketMQTemplate.sendMessageInTransaction不报异常,数据库里数据的状态就已经变掉了。
你在RocketMQ那个Listener里记录的日志什么的,都没用了。这个流程是有问题的。
可以:
再次复习下rocketmq章节事务的流程
参考下我的代码:https://git.imooc.com/coding-358/content-center/src/master/src/main/java/com/itmuch/contentcenter/service/content/ShareService.java 以及 https://git.imooc.com/coding-358/content-center/src/master/src/main/java/com/itmuch/contentcenter/rocketmq/AddBonusTransactionListener.java