请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

还是event sourcing唯一性验证问题

老师,您好,之前提过这个问题。我按照你给的意见做了测试,关键代码如下:

@Configuration
public class CustomerConfig {

    @Autowired
    EventStore eventStore;

    @Bean
    public AggregateFactory<Customer> customerFactory(){
        SpringPrototypeAggregateFactory<Customer> aggregateFactory = new SpringPrototypeAggregateFactory<>();
        aggregateFactory.setPrototypeBeanName("customer");
        return aggregateFactory;
    }

    @Bean
    public Repository<Customer> customerRepository(){
        return new EventSourcingRepository<>(
                customerFactory(),
                eventStore
        );
    }
}
@Component
public class CustomerCommandHandler {

    @Autowired
    Repository<Customer> customerRepository;

    @Autowired
    CustomerEntityRepository customerEntityRepository;

    @CommandHandler
    public void handle(CustomerCreateCommand command) throws Exception {
        int count = customerEntityRepository.countByUsername(command.getName());
        System.out.println("username count:" + count);
        Thread.sleep(10 * 1000);
        System.out.println("waiting......");
        if (count > 0) {
            throw new RuntimeException("用户名已存在");
        } else {
            customerRepository.newInstance(() -> new Customer(command));
        }
    }
}

经过测试,在并发请求时,不能有效保证username的唯一性。按理说,在axon-spring-boot-starter默认配置下,配置的是SimpleCommandBus,理论上来说对于同一个聚合上的command和其引发的saga以及event应该是线性执行的,但是,实测结果却与所期待的不同,推测是因为创建聚合这样的命令实际上在处理时,还没有聚合创建出来,所以无法保证command的顺序执行吗?

因为我之前对您的代码做过另外一个测试,在下面代码中我对CustomerDepositedEvent 的响应代码做了并发测试,也就是我同时发起两个请求来减少同一个账号的余额,测试显示。对于同一个聚合customer上的两次并发余额操作,axon在单体应用下可以保证对两次事件的顺序响应,不会出现错误。所以当您提示我的时候,我觉得在command的处理中应该也是这样,单结果却不是。不知道是我的代码有问题,还是axon就是这样处理的。另外,像下面这段代码,在分布式环境下如果query端部署了 多个节点,那多个节点之间会有并发冲突吗?比如:command端出发两次减余额操作,发出两个CustomerDepositedEvent 事件,query节点1响应一个,query节点2响应一个,那这两个节点用您例子中给出的这种更新视图的方式,会不会出现并发错误呢?这个我还没有测试过,不知道您有没有实测过。

@Service
public class CustomerProjector {

    @Autowired
    private CustomerEntityRepository repository;

    @EventHandler
    public void on(CustomerCreatedEvent event) {
        CustomerEntity customer = new CustomerEntity(event.getCustomerId(), event.getName(), event.getPassword(), 0d);
        repository.save(customer);
    }

    @EventHandler
    public void on(CustomerDepositedEvent event) {
        String customerId = event.getCustomerId();
        CustomerEntity accountView = repository.getOne(customerId);
        try {
            System.out.println(accountView.getDeposit());
            System.out.println("waiting......");
            Thread.sleep(30*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Double newDeposit = accountView.getDeposit() + event.getAmount();
        accountView.setDeposit(newDeposit);
        repository.save(accountView);
    }

    @EventHandler
    public void on(CustomerChargedEvent event) {
        String customerId = event.getCustomerId();
        CustomerEntity customer = repository.getOne(customerId);

        Double newDeposit = customer.getDeposit() - event.getAmount();
        customer.setDeposit(newDeposit);
        repository.save(customer);
    }

    @EventHandler
    public void on(OrderPaidEvent event) {
        String customerId = event.getCustomerId();
        CustomerEntity customer = repository.getOne(customerId);

        Double newDeposit = customer.getDeposit() - event.getAmount();
        customer.setDeposit(newDeposit);
        repository.save(customer);
    }
}

最后,您视频提到axon的文档和社区很活跃,可是我看官方文档很简单啊http://docs.axonframework.org/v/3.2/

还有社区地址又是多少呢?

最后,如果我所说的都成立,也就是说,我像上面这样写代码,不能保证并发冲突,或者说不能在分布式环境下保证并发冲突,还是需要再在countByUsername这样的地方加锁,减余额还是需要利用sql   update set deposit=deposit-amount where id=?这样的形式,那我们使用axon或者事件来保证分布式一致性的意义又体现在哪里呢?


正在回答

2回答

再回答第二个问题:CustomerProjector并发处理event的问题。

你说的这种并发问题还真的存在,如果是一个query服务,我们可以通过读消息处理消息的并发数来控制。但是,在有多个query服务的时候,他们都从队列上读取消息并处理,应该是会有问题的。那这种情况下,还是要通过合理的方法设计来避免。

比如在更新余额的时候,不要get - set,而是 update,例如使用spring data的repository上的自定义query,写一个update的query。

当然,我们也可以通过数据库的事务隔离机制来避免,比如这种情况下就使用“序列化”的隔离方式。


最后,axon的社区是在google groups上,然后,如果有bug,在github上的确认和更新也是很快的。


0 回复 有任何疑惑可以回复我~
  • 提问者 慕粉1750262393 #1
    嗯,对于这个问题,虽然没有实测,我也觉得会存在并发冲突。还有一个问题,想请教一下,您在视频中也提到过,axon可以保证“关联同一个聚合id的command会被发到同一个实例处理“。这一点,在单实例下很好理解,但是在同一个command代码被多节点部署的时候也能保证这一点吗?比如:我customer的command端代码被部署在A、B两个node上,然后客户端连续发出两个充值请求,一个command被发到A上,一个被发到B上。会出现这样的情况吗?如果不会又是怎么保证的呢?我一直很好奇这一点。但是看官方文档好像没有介绍过。如果会出现我说的情况,那理论上也有可能出现并发冲突导致其中一个充值请求失败吧?就像您演示的买票压测时,出现sequence冲突的情况。
    
    最后再多问一个,哈哈。看一些axon的例子,在涉及到需要saga需要查询的时候,如createOrder需要查询ticket详情的话,都建议使用queryRepository,但是我测试过,axon对queryhandler的调用即使在单机情况下虽然是同步调用,但是如果query保存异常不会回滚command的操作和event,当然本身cqrs就是这样设计的。但是这也就意味着query端查到的数据可能不准确。这样做会有什么不妥之处吗?当然在分布式环境下这个问题就更有可能了,因为event通过mq发布出去,command端只是处理command成功存储event就返回成功了,query什么时候更新,是未知的。也就是说我下一个command执行的时候通过query查到的数据无法保证是最新的,不知道这一点我说的对吗?如果对的话,我们什么时候该容忍,什么场景下不能容忍呢,这一点也很疑惑。
    回复 有任何疑惑可以回复我~ 2018-08-10 17:09:54
  • 提问者 慕粉1750262393 #2
    非常感谢!
    回复 有任何疑惑可以回复我~ 2018-08-10 17:12:20
  • 提问者 慕粉1750262393 #3
    哈哈。看来上社区还得科学上网~
    回复 有任何疑惑可以回复我~ 2018-08-10 17:13:28
大漠风 2018-08-10 15:49:31

非常高质量的问题,回答之前先手动点个赞。

先说第一个问题,command并发处理的问题。在使用聚合对象上的CommandHandler处理方法时,我们的command上面关联id,也就是聚合对象Id,axon框架在处理该command的时候,根据Id找到(或创建)聚合对象,并对同样Id的command依次处理。这在我们的实例中也演示过,也就是在并发测试(性能测试)的时候。

然后,在默认情况下,axon处理command,再处理event,是同步调用执行,也就是说,command handler处理方法在执行的时候,最终会调用event处理方法,在event处理完以后,这个command的处理方法才执行完成。

所以,在你测试event处理的并发性,也就是:“因为我之前对您的代码做过另外一个测试,在下面代码中我对CustomerDepositedEvent 的响应代码做了并发测试,也就是我同时发起两个请求来减少同一个账号的余额,测试显示。对于同一个聚合customer上的两次并发余额操作,axon在单体应用下可以保证对两次事件的顺序响应,不会出现错误。”

这时候,你的2个command依次执行,只有在第一个执行完以后,第二个才会执行。所以并非不会有问题。

然后,再说使用CustomerCommandHandler类的并发问题,就不一样了。这个处理类,在处理一个command的时候,并没有关联聚合对象,而是我们自己在代码里从 聚合资源库中取出聚合对象,再验证、处理、再触发相应的event。所以这时候的并发就不一样了,就不能在开始处理的时候就依次处理。

我反复想了一下,在你使用的测试情况下,即使是在单服务中,也无法简单的保证并发时不出错,因为判断总会有先后,如果在用户名字段上有唯一索引,后执行的方法总会报错。我们只要能保证在出错时数据的一致性,对于这种小概率事件,还是应该允许。如果为了这个弄得过于复杂就得不偿失了。

但是呢,借助于axon的这种机制:“同一个聚合对象的处理方法依次处理同一个聚合对象上的事件”,我们可以创建一个UserNameAggregate,对他来说,username就是聚合id,当创建用户时,由UserNameAggregate上的方法处理UserNameCreateCommnad,这样同样的username始终依次处理;然后生成UserNameCreatedEvent,它再产生一个UserCreateCommnad,去出发原先的用户创建流程。这有点像saga流程一样,但是又没那么复杂,因为就2步,而且第一步只是为了验证。

0 回复 有任何疑惑可以回复我~
  • 提问者 慕粉1750262393 #1
    嗯,确实这种情况很少见。我只是钻牛角尖的考虑一下。哈哈。
    对于您说的UserNameAggregate的方式,感觉有点儿类似链式事务,由于没有saga来处理错误补偿,极端情况下感觉会出现UserNameAggregate创建成功,但是userAggregate创建失败的情况。这样,明明账号没有被创建,但是我下次用同一个账号名注册会被提示 用户名已存在。不知道我考虑的对不对。当然就算是有这种情况也是更极端了。
    我还考虑过一种方式是,提供一个领域服务专门来维护user的索引,在commandHandler中,先调用领域服务来验证用户名的唯一性,如mysql的insert ignore into username_index values('account1');返回值为0则表示插入失败,返回1则成功,然后再根据返回值触发创建user聚合的事件或者抛出异常,当然这个commandHandler也最好整个在事务前提下,要不然有可能也会出现username_index 插入成功但是CreateUserCommand失败的情况。不知道这样做是否严谨些。
    回复 有任何疑惑可以回复我~ 2018-08-10 16:46:27
  • 用userNameAggregate,在一个业务请求里面处理两个command,他们在一个事务里,即使第二个失败了,第一个事件也回滚了。所以不会有你说的情况。
    反而是用saga的话,他会保存一个未完成的saga流程,第一步的事件也保存了。
    回复 有任何疑惑可以回复我~ 2018-08-11 11:42:04
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信