请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

使用@RocketMQMessageListener拿不到消息

老师,我是用@RocketMQMessageListener来标明消费者,但是这个消费者一会能消费到消息,一会不能消费到消息。

然后我去控制台看了不能被消费的消息状态,trackType是NOT_CONSUME_YET。

请问这是什么情况呀?

代码如下:

@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "${rocketmq.consumer.group.tenant-info-increment-sync}",
        topic = "${csp.product}_MAIN_INFO",
        selectorExpression = "increment",
        consumeThreadMax = 10,
        messageModel = MessageModel.CLUSTERING
)
public class TenantInfoIncrementSyncConsumer implements RocketMQListener<List<MainInfo>>, RocketMQPushConsumerLifecycleListener {

    /**
     * tenantInfoService
     */
    @Autowired
    private TenantInfoService tenantInfoService;

    @Override
    public void onMessage(List<MainInfo> mainInfos) {
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {

        // 消费者参数设置:从最尾消费,每次拉取一条数据
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setInstanceName("TenantInfoIncrementSyncConsumer");
        consumer.setPullBatchSize(10);
        consumer.setMaxReconsumeTimes(3);

        consumer.registerMessageListener((MessageListenerConcurrently) (messages, consumeConcurrentlyContext) -> {

            if (CollectionUtils.isNotEmpty(messages)) {

                try {

                    for (MessageExt message : messages) {

                        // 将拉取到的消息转化成指定格式
                        log.info("拉取到的消息为:" + new String(message.getBody()));

                        List<MainInfo> mainInfos = JSONUtil.toList(new JSONArray(new String(message.getBody())), MainInfo.class);

                        if (CollectionUtils.isNotEmpty(mainInfos)) {

                            int size = mainInfos.size();

                            List<String> incrementMainIds = new ArrayList<>(size);
                            List<String> decrementMainIds = new ArrayList<>(size);
                            List<String> disabledMainIds = new ArrayList<>(size);

                            mainInfos.forEach(mainInfo -> {
                                if (mainInfo.getStatus() == 1) {
                                    incrementMainIds.add(mainInfo.getMainId());
                                } else if (mainInfo.getStatus() == 0) {
                                    decrementMainIds.add(mainInfo.getMainId());
                                } else {
                                    disabledMainIds.add(mainInfo.getMainId());
                                }
                            });

                            if (CollectionUtils.isNotEmpty(incrementMainIds)) {

                                Map<String, List<String>> mainInfoMap = tenantInfoService.exist(incrementMainIds);

                                if (MapUtils.isNotEmpty(mainInfoMap)) {
                                    if (CollectionUtils.isNotEmpty(mainInfoMap.get("0"))) {
                                        List<TenantInfo> tenantInfos = new ArrayList<>();
                                        for (String mainId : mainInfoMap.get("0")) {
                                            TenantInfo tenantInfo = new TenantInfo();
                                            tenantInfo.setId(mainId);
                                            tenantInfo.setTenantStatus(1);
                                            tenantInfos.add(tenantInfo);
                                        }
                                        tenantInfoService.addBatch(tenantInfos);
                                    }
                                    if (CollectionUtils.isNotEmpty(mainInfoMap.get("1"))) {
                                        tenantInfoService.updateStatusBatch(mainInfoMap.get("1"), 1);
                                    }
                                }
                            }

                            if (CollectionUtils.isNotEmpty(decrementMainIds)) {
                                tenantInfoService.updateStatusBatch(decrementMainIds, 0);
                            }

                            if (CollectionUtils.isNotEmpty(disabledMainIds)) {
                                tenantInfoService.updateStatusBatch(disabledMainIds, 10);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("消费数据失败", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
    }
}

正在回答 回答被采纳积分+3

1回答

大目 2021-08-13 17:31:40

您好,看您的描述,有点像是命中了负载均衡机制。

建议检查是否有多个消费者实例。
当有多个消费者时,会rocketmq server采用负载均衡机制投递到其中一个实例去消费。

0 回复 有任何疑惑可以回复我~
  • 提问者 朱小悬 #1
    就一个。
    假如是其他的消费者消费了,那么数据库应该有对应的数据的,而事实上是没有的
    回复 有任何疑惑可以回复我~ 2021-08-26 10:12:58
  • 大目 回复 提问者 朱小悬 #2
    目测你的代码没有问题,请问能提供完整代码吗?我来看看。
    回复 有任何疑惑可以回复我~ 2021-08-30 22:18:59
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信