老师,我是用@RocketMQMessageListener来标明消费者,但是这个消费者一会能消费到消息,一会不能消费到消息。
然后我去控制台看了不能被消费的消息状态,trackType是NOT_CONSUME_YET。
请问这是什么情况呀?
代码如下:
@Slf4j
@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer.group.tenant-info-increment-sync}",
topic = "${csp.product}_MAIN_INFO",
selectorExpression = "increment",
consumeThreadMax = 10,
messageModel = MessageModel.CLUSTERING
)
public class TenantInfoIncrementSyncConsumer implements RocketMQListener<List<MainInfo>>, RocketMQPushConsumerLifecycleListener {
/**
* tenantInfoService
*/
@Autowired
private TenantInfoService tenantInfoService;
@Override
public void onMessage(List<MainInfo> mainInfos) {
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 消费者参数设置:从最尾消费,每次拉取一条数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setInstanceName("TenantInfoIncrementSyncConsumer");
consumer.setPullBatchSize(10);
consumer.setMaxReconsumeTimes(3);
consumer.registerMessageListener((MessageListenerConcurrently) (messages, consumeConcurrentlyContext) -> {
if (CollectionUtils.isNotEmpty(messages)) {
try {
for (MessageExt message : messages) {
// 将拉取到的消息转化成指定格式
log.info("拉取到的消息为:" + new String(message.getBody()));
List<MainInfo> mainInfos = JSONUtil.toList(new JSONArray(new String(message.getBody())), MainInfo.class);
if (CollectionUtils.isNotEmpty(mainInfos)) {
int size = mainInfos.size();
List<String> incrementMainIds = new ArrayList<>(size);
List<String> decrementMainIds = new ArrayList<>(size);
List<String> disabledMainIds = new ArrayList<>(size);
mainInfos.forEach(mainInfo -> {
if (mainInfo.getStatus() == 1) {
incrementMainIds.add(mainInfo.getMainId());
} else if (mainInfo.getStatus() == 0) {
decrementMainIds.add(mainInfo.getMainId());
} else {
disabledMainIds.add(mainInfo.getMainId());
}
});
if (CollectionUtils.isNotEmpty(incrementMainIds)) {
Map<String, List<String>> mainInfoMap = tenantInfoService.exist(incrementMainIds);
if (MapUtils.isNotEmpty(mainInfoMap)) {
if (CollectionUtils.isNotEmpty(mainInfoMap.get("0"))) {
List<TenantInfo> tenantInfos = new ArrayList<>();
for (String mainId : mainInfoMap.get("0")) {
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setId(mainId);
tenantInfo.setTenantStatus(1);
tenantInfos.add(tenantInfo);
}
tenantInfoService.addBatch(tenantInfos);
}
if (CollectionUtils.isNotEmpty(mainInfoMap.get("1"))) {
tenantInfoService.updateStatusBatch(mainInfoMap.get("1"), 1);
}
}
}
if (CollectionUtils.isNotEmpty(decrementMainIds)) {
tenantInfoService.updateStatusBatch(decrementMainIds, 0);
}
if (CollectionUtils.isNotEmpty(disabledMainIds)) {
tenantInfoService.updateStatusBatch(disabledMainIds, 10);
}
}
}
} catch (Exception e) {
log.error("消费数据失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
}
}
面向未来微服务:熟练掌握Spring Cloud Alibaba
了解课程