有个问题啊,比如在BatchEventProcessor.run() ,同时有两个消费者进来c1,c2, 假设此时ringBuffer.sequence=7 , c1, c2 都 waitFor(8),假设c1先waitFor(8) (虽然并发同时进来,但是CPU调度的时候也有可能先让c1 waitFor),那么在BlockingWaitStrategy的waitFor中,c1 还在计算,c2后进来,但是提前出来,返回了availableSequence=8, 那么 有两个问题:
(1) c2 后进来,把 c1 正在处理的消息,处理掉了,这会不会有什么问题,往多了说,会造成一些消费者饥饿,一些消费者一直忙,或者说Disruptor不care这种消费者之间消费多少的问题
(2) 上面的步骤中,c1也 availableSequence=8, 那么在下面这段代码中,c1中的availableSequence=8, nextSequence=8 (因为nextSequence不是Volatile的,所以c1看不到)
这个while(nextSequence <= availableSequence) 也会走下去,但是dateProvider.get(8) 这个位置的消息被C2 已经消费了,c1再次消息,同一个位置,get方法我看没做幂等啊,
不知道有没说错?
try
{
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
//假设c2先从BlockingWaitStrategy中出来,处理后nextSequence=9
nextSequence++;
}
//c2设置availableSequence=8,导致c1的sequance.value=8
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}