老师,您好,之前提过这个问题。我按照你给的意见做了测试,关键代码如下:
@Configuration public class CustomerConfig { @Autowired EventStore eventStore; @Bean public AggregateFactory<Customer> customerFactory(){ SpringPrototypeAggregateFactory<Customer> aggregateFactory = new SpringPrototypeAggregateFactory<>(); aggregateFactory.setPrototypeBeanName("customer"); return aggregateFactory; } @Bean public Repository<Customer> customerRepository(){ return new EventSourcingRepository<>( customerFactory(), eventStore ); } }
@Component public class CustomerCommandHandler { @Autowired Repository<Customer> customerRepository; @Autowired CustomerEntityRepository customerEntityRepository; @CommandHandler public void handle(CustomerCreateCommand command) throws Exception { int count = customerEntityRepository.countByUsername(command.getName()); System.out.println("username count:" + count); Thread.sleep(10 * 1000); System.out.println("waiting......"); if (count > 0) { throw new RuntimeException("用户名已存在"); } else { customerRepository.newInstance(() -> new Customer(command)); } } }
经过测试,在并发请求时,不能有效保证username的唯一性。按理说,在axon-spring-boot-starter默认配置下,配置的是SimpleCommandBus,理论上来说对于同一个聚合上的command和其引发的saga以及event应该是线性执行的,但是,实测结果却与所期待的不同,推测是因为创建聚合这样的命令实际上在处理时,还没有聚合创建出来,所以无法保证command的顺序执行吗?
因为我之前对您的代码做过另外一个测试,在下面代码中我对CustomerDepositedEvent 的响应代码做了并发测试,也就是我同时发起两个请求来减少同一个账号的余额,测试显示。对于同一个聚合customer上的两次并发余额操作,axon在单体应用下可以保证对两次事件的顺序响应,不会出现错误。所以当您提示我的时候,我觉得在command的处理中应该也是这样,单结果却不是。不知道是我的代码有问题,还是axon就是这样处理的。另外,像下面这段代码,在分布式环境下如果query端部署了 多个节点,那多个节点之间会有并发冲突吗?比如:command端出发两次减余额操作,发出两个CustomerDepositedEvent 事件,query节点1响应一个,query节点2响应一个,那这两个节点用您例子中给出的这种更新视图的方式,会不会出现并发错误呢?这个我还没有测试过,不知道您有没有实测过。
@Service public class CustomerProjector { @Autowired private CustomerEntityRepository repository; @EventHandler public void on(CustomerCreatedEvent event) { CustomerEntity customer = new CustomerEntity(event.getCustomerId(), event.getName(), event.getPassword(), 0d); repository.save(customer); } @EventHandler public void on(CustomerDepositedEvent event) { String customerId = event.getCustomerId(); CustomerEntity accountView = repository.getOne(customerId); try { System.out.println(accountView.getDeposit()); System.out.println("waiting......"); Thread.sleep(30*1000); } catch (InterruptedException e) { e.printStackTrace(); } Double newDeposit = accountView.getDeposit() + event.getAmount(); accountView.setDeposit(newDeposit); repository.save(accountView); } @EventHandler public void on(CustomerChargedEvent event) { String customerId = event.getCustomerId(); CustomerEntity customer = repository.getOne(customerId); Double newDeposit = customer.getDeposit() - event.getAmount(); customer.setDeposit(newDeposit); repository.save(customer); } @EventHandler public void on(OrderPaidEvent event) { String customerId = event.getCustomerId(); CustomerEntity customer = repository.getOne(customerId); Double newDeposit = customer.getDeposit() - event.getAmount(); customer.setDeposit(newDeposit); repository.save(customer); } }
最后,您视频提到axon的文档和社区很活跃,可是我看官方文档很简单啊http://docs.axonframework.org/v/3.2/
还有社区地址又是多少呢?
最后,如果我所说的都成立,也就是说,我像上面这样写代码,不能保证并发冲突,或者说不能在分布式环境下保证并发冲突,还是需要再在countByUsername这样的地方加锁,减余额还是需要利用sql update set deposit=deposit-amount where id=?这样的形式,那我们使用axon或者事件来保证分布式一致性的意义又体现在哪里呢?