课程中的演示是添加任务的add()方法先抢到了minute锁,然后doMinutesTimeWheelExecute()才抢到锁,这样的话slot设置的就是正确的。
但是我自己尝试的时候,无一例外每次都是doMinutesTimeWheelExecute()先抢到锁,这个时候已经把current更新成1了,导致添加任务的时候,slot槽设置错误
我看了后面一节的代码,里面加入了一个eventbus,这个时候在doSecondsTimeWheelExecute()会多消耗一些时间,从而可以确保add方法每次都可以抢到minutesLock锁。
所以我在自己代码的doSecondsTimeWheelExecute()里加了10ms的延迟,这下可以让add方法先抢到锁,但是这个都不是自己主动控制的,怎么解决这个问题呢
补充:
public class TimeWheelModelManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeWheelModelManager.class);
private TimeWheelModel secondTimeWheelModel;
private TimeWheelModel minuteTimeWheelModel;
private TimeWheelModel hourTimeWheelModel;
// 计算时间轮时间
private long executeTime = 0L;
// 锁
private final Object secondLock = new Object();
private final Object minuteLock = new Object();
private final Object hourLock = new Object();
public void init() {
……
}
public void addTask(DelayMessageDto dto) {
int delay = dto.getDelay();
int mins = delay / 60;
if (mins == 0) {
synchronized (secondLock) {
……
}
} else if (mins < 60) {
synchronized (minuteLock) {
int nextSlotIndex = minuteTimeWheelModel.countNextSlot(mins);
LOGGER.info("【minuteTimeWheelModel】:当前槽位:{}, 任务延迟:{}, 放置在slot:{}", minuteTimeWheelModel.getCurrent(), delay, nextSlotIndex);
TimeWheelSlotListModel timeWheelSlotListModel = minuteTimeWheelModel.getTimeWheelSlotListModel()[nextSlotIndex];
TimeWheelSlotModel slotModel = new TimeWheelSlotModel();
slotModel.setData(dto.getData());
slotModel.setStoreType(dto.getSlotStoreType().getClazz());
slotModel.setDelaySecond(delay);
timeWheelSlotListModel.getTimeWheelSlotModelList().add(slotModel);
}
} else {
// 小时
int hours = mins / 60;
synchronized (hourLock) {
……
}
}
}
/**
* 执行扫描任务
*/
public void doScanTask() {
Thread scanThread = new Thread(() -> {
LOGGER.info("时间轮启动");
while (true) {
try {
doSecondTimeWheel();
if (executeTime % 60 == 0) {
doMinuteTimeWheel();
}
if (executeTime % 3600 == 0) {
doHourTimeWheel();
}
// 休眠1秒
TimeUnit.SECONDS.sleep(1);
executeTime++;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
scanThread.setName("scan-slot-task");
scanThread.start();
}
private void doSecondTimeWheel() {
synchronized (secondLock) {
……
}
}
private void doMinuteTimeWheel() {
synchronized (minuteLock) {
// 获取当前槽位
int current = minuteTimeWheelModel.getCurrent();
LOGGER.info("current minuteTimeWheel slot : {}", current);
// 获取当前槽位中的数据
TimeWheelSlotListModel timeWheelSlotListModel = minuteTimeWheelModel.getTimeWheelSlotListModel()[current];
// 全部抛出
timeWheelSlotListModel.getTimeWheelSlotModelList().forEach(timeWheelSlotModel -> {
int remainSecond = timeWheelSlotModel.getDelaySecond() % 60;
if (remainSecond != 0) {
// 仍然需要扔回秒级时间轮
DelayMessageDto dto = new DelayMessageDto();
dto.setData(timeWheelSlotModel.getData());
dto.setDelay(remainSecond);
dto.setSlotStoreType(SlotStoreTypeEnum.of(timeWheelSlotModel.getStoreType()));
addTask(dto);
} else {
LOGGER.info("[minuteTimeWheelModel] 任务data:{}, 当前时间:{}", JSON.toJSONString(timeWheelSlotModel.getData()), System.currentTimeMillis());
}
});
// 清空槽位数据
timeWheelSlotListModel.getTimeWheelSlotModelList().clear();
// 设置槽位
if (current == minuteTimeWheelModel.getTimeWheelSlotListModel().length - 1) {
// 重置
current = 0;
} else {
current = current + 1;
}
minuteTimeWheelModel.setCurrent(current);
}
}
private void doHourTimeWheel() {
……
}
/**
* 构建时间轮槽列表模型
*
* @param slotCount 插槽数
* @return {@link TimeWheelSlotListModel[] }
*/
private TimeWheelSlotListModel[] buildTimeWheelSlotListModel(int slotCount) {
……
}
}