老师, 您好:
在响应式流的filter操作中如果加入了Thread.sleep() 这种让线程TIME_WATING的操作, 订阅的过程就不能继续执行, 这是为什么呢?
代码如下:
@Test
public void subscribeOnExecuter() throws InterruptedException {
AtomicInteger k = new AtomicInteger();
ExecutorService executorService = Executors.newFixedThreadPool(3, (runnable) -> {
Thread thread = new Thread(runnable);
thread.setName("subscribeOn-demo-elastic" + k);
k.getAndIncrement();
return thread;
});
Scheduler s = Schedulers.fromExecutor(executorService);
Flux<Integer> flux = Flux.range(1, 4)
.filter(i -> {
log.info("filter in thread {}", Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i % 2 == 0;
})
.subscribeOn(s)
.map(i -> {
log.info("map in thread {}", Thread.currentThread().getName());
return i + 2;
});
Thread t = new Thread(() -> {
log.info("start current thread");
flux.log().subscribe(i -> log.info(String.valueOf(i)));
flux.log().subscribe(i -> log.info(String.valueOf(i)));
log.info("end current thread");
});
t.start();
t.join();
}