Skip to content

Latest commit

 

History

History
69 lines (61 loc) · 4.27 KB

Spring Webflux使用subscribeOn和publishOn的最佳实践.md

File metadata and controls

69 lines (61 loc) · 4.27 KB

概述

在Spring Webflux中,有两个关键的调度器操作符:publishOnsubscribeOn。理解它们的用途和差异是使用WebFlux进行反应式编程的重要部分。

  • subscribeOn:这个操作符定义了产生数据的线程。无论在哪里放置subscribeOn,都只会影响源头的执行线程。如果在一个流的多个地方使用subscribeOn,只有第一个起作用。
  • publishOn:这个操作符可以影响其后续操作运行的线程。如果在流中的多个地方使用publishOn,每个都会起作用,并且会改变其后续操作的执行线程。

Scheduler调度器

Spring WebFlux中,Scheduler(调度器)是Project Reactor中的一个概念,它负责控制并发和线程切换。 主要有以下几种常见类型的Scheduler:

  • Schedulers.immediate():直接在当前线程执行任务,不进行线程切换。
  • Schedulers.single():创建一个只包含一个线程的调度器,所有任务都在这个单一线程上执行。如果这个线程出现故障,将创建一个新的线程替代它。
  • Schedulers.elastic():创建一个弹性的线程池,适用于IO阻塞操作比如数据库查询、文件读写等。该线程池会根据需要创建新的线程,如果线程闲置时间过长则会被销毁。弹性线程池可以无限大,所以要注意不要在大量计算密集型的任务中使用它,因为这可能导致创建大量线程,消耗过多系统资源。
  • Schedulers.parallel():创建一个固定大小的线程池,线程数量默认等于CPU核心数。适用于计算密集型任务,因为这类任务能够充分利用CPU资源。
  • Schedulers.boundedElastic():创建一个有界的弹性线程池,适合I/O任务。它与Schedulers.elastic()相似,但有最大线程数和队列大小的限制。

publishOnsubscribeOn使用场景

使用Scheduler的方法主要是通过publishOnsubscribeOn两个操作符。

  • 使用publishOn将数据流中后续的处理操作在parallel调度器对应的线程池中执行
flux.publishOn(Schedulers.parallel()).map(this::blockMethod)
  • 使用subscribeOn将源头的操作在boundedElastic调度器对应的线程池中执行
Mono.fromCallable(this::blockMethod).subscribeOn(Schedulers.boundedElastic())

两者的区别

  • publishOn常用于切换执行下游操作符的线程,影响范围和它的位置有关。
  • subscribeOn常用于切换源头数据生成的线程,影响范围则和位置无关。

使用示例

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class PublishOnAndSubscribeOnTests {

    @Test
    public void testPublishOnAndSubscribeOn() {
        Flux.just("Hello").map(s -> {
            System.out.println("[1] Thread name: " + Thread.currentThread().getName());
            return s.concat(" World");
        }).publishOn(Schedulers.newParallel("thread-publishOn")).map(s -> {
            System.out.println("[2] Thread name: " + Thread.currentThread().getName());
            return s;
        }).subscribeOn(Schedulers.newSingle("thread-subscribeOn")).subscribe(s -> {
            System.out.println("[3] Thread name: " + Thread.currentThread().getName());
            System.out.println(s);
        });
    }
}
  • 输入结果如下:
[1] Thread name: thread-subscribeOn-1
[2] Thread name: thread-publishOn-1
[3] Thread name: thread-publishOn-1
Hello World
  • 从上面的例子可以看出,subscribeOn定义在publishOn之后,但是却从源头开始生效。
  • 而在publishOn执行之后,线程池变更为publishOn所定义的。

总结

  • 在实际使用中,我们可以根据需要选择使用publishOnsubscribeOn,它们可以帮助我们更好地控制程序的并发行为和系统资源的利用。
  • 然而,publishOnsubscribeOn也有一些局限性:
1.无法解决所有并发问题:虽然它们可以将任务移至其他线程执行,但如果你的程序设计本身就没有考虑到并发安全,那么使用这两者仍可能出现问题。
2.线程切换开销:每次通过publishOn或subscribeOn进行线程切换时,都会产生一定的系统开销。如果大量无谓的线程切换,可能会对性能产生负面影响。