请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

消费者下线

还没有看到后面,在测试重平衡的时候,消费者下线后队列没有重新分配。是我代码写错了还是还没有实现到那一块?

正在回答 回答被采纳积分+3

3回答

Danny_Idea 2025-08-22 21:18:29
consumerreqid可以在生成的时候注入一些唯一标识来避免即可。这块我觉得可以注入一些如机器码和时间戳类型的值
1 回复 有任何疑惑可以回复我~
提问者 FlobbY 2025-07-31 17:09:52

看了源码,老师已经实现过了

  1. org.idea.eaglemq.broker.netty.broker.BrokerServerHandler中有剔除代码

1 回复 有任何疑惑可以回复我~
提问者 FlobbY 2025-07-31 18:00:44

我发现虽然有剔除代码,但是剔除后没有修改change的map,所以没有进行重平衡。我修改了remove方法,在移除实例之前,找到了这个实例,然后把他的consumeGroup加到了changeMap里,因为在重平衡的时候有空指针的校验,所以不用担心group不存在,也不用担心topic不存在。但是仍然有一个疑问,现在的消费者在我看来都是唯一的,所以直接用了findFirst,会不会出现不唯一的情况呢?

public void removeFromInstancePool(String reqId) {
    synchronized (this) {
        for (String topic : consumerInstanceMap.keySet()) {
            List<ConsumerInstance> consumerInstances = consumerInstanceMap.get(topic);
            // 找出要删除的实例
            Optional<ConsumerInstance> removeInstance = consumerInstances.stream()
                    .filter(ci -> ci.getConsumerReqId().equals(reqId))
                    .findFirst();
            // 如果没有找到,继续下一个 topic
            if (!removeInstance.isPresent()) {
                continue;
            }
            // 添加进change消费组
            Set<String> consumerGroupSet = reBalanceInfo.getChangeConsumerGroupMap().getOrDefault(topic, new HashSet<>());
            consumerGroupSet.add(removeInstance.get().getConsumeGroup());
            reBalanceInfo.getChangeConsumerGroupMap().put(topic, consumerGroupSet);
            // 移除实例
            List<ConsumerInstance> filteredInstances = consumerInstances.stream()
                    .filter(ci -> !ci.getConsumerReqId().equals(reqId))
                    .collect(Collectors.toList());
            consumerInstanceMap.put(topic, filteredInstances);
        }
        Map<String, Map<String, List<ConsumerInstance>>> consumeHoldMap = CommonCache.getConsumeHoldMap();
        for (String topic : consumeHoldMap.keySet()) {
            Map<String,List<ConsumerInstance>> consumeGroupInstanceMap = consumeHoldMap.get(topic);
            for (String consumeGroup : consumeGroupInstanceMap.keySet()) {
                List<ConsumerInstance> consumerInstances = consumeGroupInstanceMap.get(consumeGroup);
                List<ConsumerInstance> filterInstances = consumerInstances.stream()
                        .filter(item -> !item.getConsumerReqId().equals(reqId))
                        .collect(Collectors.toList());
                consumeGroupInstanceMap.put(consumeGroup,filterInstances);
            }
        }
    }
}


0 回复 有任何疑惑可以回复我~
问题已解决,确定采纳
还有疑问,暂不采纳
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号