采纳答案成功!
向帮助你的同学说点啥吧!感谢那些助人为乐的人
还没有看到后面,在测试重平衡的时候,消费者下线后队列没有重新分配。是我代码写错了还是还没有实现到那一块?
看了源码,老师已经实现过了
org.idea.eaglemq.broker.netty.broker.BrokerServerHandler中有剔除代码
我发现虽然有剔除代码,但是剔除后没有修改change的map,所以没有进行重平衡。我修改了remove方法,在移除实例之前,找到了这个实例,然后把他的consumeGroup加到了changeMap里,因为在重平衡的时候有空指针的校验,所以不用担心group不存在,也不用担心topic不存在。但是仍然有一个疑问,现在的消费者在我看来都是唯一的,所以直接用了findFirst,会不会出现不唯一的情况呢?
public void removeFromInstancePool(String reqId) { synchronized (this) { for (String topic : consumerInstanceMap.keySet()) { List<ConsumerInstance> consumerInstances = consumerInstanceMap.get(topic); // 找出要删除的实例 Optional<ConsumerInstance> removeInstance = consumerInstances.stream() .filter(ci -> ci.getConsumerReqId().equals(reqId)) .findFirst(); // 如果没有找到,继续下一个 topic if (!removeInstance.isPresent()) { continue; } // 添加进change消费组 Set<String> consumerGroupSet = reBalanceInfo.getChangeConsumerGroupMap().getOrDefault(topic, new HashSet<>()); consumerGroupSet.add(removeInstance.get().getConsumeGroup()); reBalanceInfo.getChangeConsumerGroupMap().put(topic, consumerGroupSet); // 移除实例 List<ConsumerInstance> filteredInstances = consumerInstances.stream() .filter(ci -> !ci.getConsumerReqId().equals(reqId)) .collect(Collectors.toList()); consumerInstanceMap.put(topic, filteredInstances); } Map<String, Map<String, List<ConsumerInstance>>> consumeHoldMap = CommonCache.getConsumeHoldMap(); for (String topic : consumeHoldMap.keySet()) { Map<String,List<ConsumerInstance>> consumeGroupInstanceMap = consumeHoldMap.get(topic); for (String consumeGroup : consumeGroupInstanceMap.keySet()) { List<ConsumerInstance> consumerInstances = consumeGroupInstanceMap.get(consumeGroup); List<ConsumerInstance> filterInstances = consumerInstances.stream() .filter(item -> !item.getConsumerReqId().equals(reqId)) .collect(Collectors.toList()); consumeGroupInstanceMap.put(consumeGroup,filterInstances); } } } }
登录后可查看更多问答,登录/注册
MQ大牛成长课--从0到1手写分布式消息队列中间件
96 5
329 4
353 3
127 3
256 3
购课补贴联系客服咨询优惠详情
慕课网APP您的移动学习伙伴
扫描二维码关注慕课网微信公众号