请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

并发生产消费问题

有个问题啊,比如在BatchEventProcessor.run() ,同时有两个消费者进来c1,c2, 假设此时ringBuffer.sequence=7 , c1, c2 都 waitFor(8),假设c1先waitFor(8) (虽然并发同时进来,但是CPU调度的时候也有可能先让c1 waitFor),那么在BlockingWaitStrategy的waitFor中,c1 还在计算,c2后进来,但是提前出来,返回了availableSequence=8, 那么 有两个问题:
(1) c2 后进来,把 c1 正在处理的消息,处理掉了,这会不会有什么问题,往多了说,会造成一些消费者饥饿,一些消费者一直忙,或者说Disruptor不care这种消费者之间消费多少的问题
(2) 上面的步骤中,c1也 availableSequence=8, 那么在下面这段代码中,c1中的availableSequence=8, nextSequence=8 (因为nextSequence不是Volatile的,所以c1看不到)
这个while(nextSequence <= availableSequence) 也会走下去,但是dateProvider.get(8) 这个位置的消息被C2 已经消费了,c1再次消息,同一个位置,get方法我看没做幂等啊,

不知道有没说错?

try
        {
            while (true)
            {
                try
                {
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        //假设c2先从BlockingWaitStrategy中出来,处理后nextSequence=9
                        nextSequence++;
                    }
                    //c2设置availableSequence=8,导致c1的sequance.value=8
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }

正在回答

4回答

加一下群 ,我在群里更详细的解释了细节问题!~

0 回复 有任何疑惑可以回复我~
  • 提问者 慕粉4184357 #1
    非常感谢!
    回复 有任何疑惑可以回复我~ 2018-09-18 13:23:47
阿神 2018-09-18 11:32:36

1 disruptor  首先  大家姑且认为有3个sequence  : 生产者、 消费者、以及ringbuffer本身 (不区分单个、多个 生产消费模式 暂时)

阿神 2018/9/18 10:50:41

2 这里我们讨论 单生产者 单消费者模型 (即时是单生产者模型,我们也可以构建多个handle 去串并行操作。那么每次多一个handler 相当于 多一个线程,底层代码就是 new BatchEventProcess 这个类 是处理 单生产者(这里多线程指的是handler多个,而不是多生产者) 单消费者) 因此不会出现任何并发问题

阿神 2018/9/18 10:52:25

3 然后 ,我们讨论 多生产者,多消费者模型  对于多生产者, 由于ringbuffer 本身的 cursor 是具有 volatile 特性的 所以 并发的时候cas操作 ,多生产者在并发投递数据到ringbuffer的时候,不会出现重复的位置情况


0 回复 有任何疑惑可以回复我~
阿神 2018-09-18 10:56:04

disruptor 也正是考虑到小伙伴的这个问题, 所以 我们的EventProcessor  有两种实现, 专门区分与 单消费者 和多消费者 ,单消费者的时候就是BatchEventProcessor,多消费者的时候 则是WorkProcessor  也就是说小伙伴分析的问题是存在的,,但是分析的是单消费者代码,, 如果要多消费者进行分析,要看WorkProcessor  实现 ~


0 回复 有任何疑惑可以回复我~
阿神 2018-09-18 03:43:36

消费者不会同时等待一个相同的sequence,这个是共享的操作

0 回复 有任何疑惑可以回复我~
  • 提问者 慕粉4184357 #1
    刚调试了下,在单生产者,多消费者的情况下,用的BatchEventProcessor,是会出现多个handler等待同一个sequence,如果这里的handler不等于消费者的话
    --->表示nextSequence
    <---表示availableSequence
    
    ---->0
    ---->0
    ---->0
    <----4
    <----4
    <----4
    ---->5
    ---->5
    <----19
    ---->5
    回复 有任何疑惑可以回复我~ 2018-09-18 14:54:43
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信