Java8 使用调度器获得并发性和并行性详解

现代处理器有多个核,可以更快地同时处理许多耗时的操作。Java 并发 API(包括线程和更多)使这一点成为可能。

RxJava 的Observable链似乎与线程非常匹配。如果我们能够订阅我们的源代码,并在后台进行所有转换、组合和过滤,并且在完成所有操作后,将结果传递给主线程,那就太好了。是的,这听起来很棒,但是 RxJava 默认是单线程的。这意味着,在大多数情况下,当对一个Observable实例调用subscribe方法时,当前线程会阻塞,直到所有内容都发出。(例如,对于由intervaltimer工厂方法创建的Observable实例,情况并非如此。)。这是一件好事,因为使用线程并不容易。它们很强大,但它们需要相互同步;例如,当一方依赖另一方的结果时。

在多线程环境中最难管理的事情之一是线程之间的共享数据。一个线程可以从数据源读取数据,而另一个线程正在修改数据源,这会导致不同线程使用相同数据的不同版本。如果Observable链的构造方式正确,则不存在共享状态。这意味着同步没有那么复杂。

在本章中,我们将讨论并行执行,并了解并发的含义。此外,我们还将学习一些处理Observable实例发出过多项的技术(在多线程环境中这种情况并不罕见)。本章涵盖的主题如下:

调度程序是 RxJava 实现并发的方式。他们负责为我们创建和管理线程(内部依赖于 Java 的线程池设施)。我们不会讨论 Java 的并发 API 及其怪癖和复杂性。我们一直在使用调度器,隐式地使用计时器和间隔,但现在是掌握它们的时候了。

让我们回顾一下我们在第 3 章中介绍的Observable.interval工厂方法创建并连接观察者、观察者和受试者。如前所述,RxJava 在默认下是单线程,所以在大多数情况下,在Observable实例上调用subscribe方法会阻塞当前线程。但interval Observable案例并非如此。如果我们看一下Observable<Long> interval(long interval, TimeUnit unit)方法的 JavaDoc,我们会看到它说它创建的Observable实例在一个叫做“计算调度器的东西上运行。

为了检查interval方法的行为(以及本章中的其他内容),我们需要一个强大的调试工具。这就是为什么我们在本章中要做的第一件事就是实现它。

调试可观察对象及其调度器

在前面的章节中,我们已经介绍了doOnNext()操作符,它可以用于直接从Observable链中记录发出的项目。我们提到还有doOnError()doOnCompleted()运营商。但是有一个结合了这三者的doOnEach()操作符。我们可以从它记录所有内容,因为它接收所有发出的通知,而不管它们的类型如何。我们可以把它放在运营商链的中间,然后用它记录,比如说,那里的状态。它需要一个Notification -> void功能。

下面是高阶调试函数返回lambda结果的源,该函数能够使用传递的描述记录标记的Observable实例的发射:

<T> Action1<Notification<? super T>> debug(
  String description, String offset
) {
  AtomicReference<String> nextOffset = new AtomicReference<String>(">");
  return (Notification<? super T> notification) -> {
    switch (notification.getKind()) {
    case OnNext:
      System.out.println(
        Thread.currentThread().getName() +
        "|" + description + ": " + offset +
        nextOffset.get() + notification.getValue()
      );
      break;
    case OnError:
      System.err.println(
        Thread.currentThread().getName() +
        "|" + description + ": " + offset +
        nextOffset.get() + " X " + notification.getThrowable()
      );
      break;
    case OnCompleted:
      System.out.println(
        Thread.currentThread().getName() +
        "|" + description + ": " + offset +
        nextOffset.get() + "|"
      );
    default:
      break;
    }
    nextOffset.getAndUpdate(p -> "-" + p);
  };
}

根据通过的说明偏移量,返回的方法记录每个通知。然而,重要的是,它首先记录当前活动线程的名称。<value>标记OnNext notificationsX错误通知|未完成通知nextOffset变量用于及时显示值。

下面是一个使用这种新方法的示例:

Observable
  .range(5, 5)
  .doOnEach(debug("Test", ""))
  .subscribe();

本例将生成五个序列号,从数字 5 开始。我们将对debug(String, String)方法的调用传递给doOnEach()操作符,以记录range()方法调用后的所有内容。使用不带参数的 subscribe 调用,将触发此小链。结果如下:

main|Test: >5
main|Test: ->6
main|Test: -->7
main|Test: --->8
main|Test: ---->9
main|Test: ----->|

记录的第一件事是当前线程的名称(主线程),然后我们将Observable实例的描述传递给debug()方法,然后,一个冒号和破折号形成箭头,表示时间。最后,我们有通知类型的符号,值本身表示值,|表示完成。

让我们为debug()helper 方法定义一个重载,这样,如果不需要,我们就不需要向其传递带有额外偏移量的第二个参数:

<T> Action1<Notification<? super T>> debug(String description) {
  return debug(description, "");
}

上述方法的代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/common/Helpers.java

现在我们已经准备好调试由 interval 方法创建的Observable实例发生了什么!

可观察的间隔及其默认调度程序

让我们检查以下示例:

Observable
  .take(5)
  .interval(500L, TimeUnit.MILLISECONDS)
  .doOnEach(debug("Default interval"))
  .subscribe();

这将创建一个interval Observable实例,每半秒发射一次。我们使用take()方法仅获取前五个通知并完成。我们将使用我们的debug()助手方法记录由 interval 方法创建的Observable实例发出的值,并使用对subscribe()的调用来触发逻辑。输出应如下所示:

RxComputationThreadPool-1|Default interval: >0
RxComputationThreadPool-1|Default interval: ->1
RxComputationThreadPool-1|Default interval: -->2
RxComputationThreadPool-1|Default interval: --->3
RxComputationThreadPool-1|Default interval: ---->4

这里的一切都应该很熟悉,除了Observable实例在其上执行的线程!此线不是线。从名称(RxComputationThreadPool-1来看,它似乎是由 RxJava 管理的可重用Thread实例池创建的。

如果您还记得,Observable.interval工厂方法具有以下重载:

Observable<Long> interval(long, TimeUnit, Scheduler)

这意味着我们可以指定它将在其上运行的调度程序。前面提到过,只有两个参数的重载在计算调度器上运行。现在,让我们尝试传递另一个调度程序,看看会发生什么:

Observable
  .take(5)
  .interval(500L, TimeUnit.MILLISECONDS, Schedulers.immediate())
  .doOnEach(debug("Imediate interval"))
  .subscribe();

这是和以前一样,但有一点不同。我们传递一个名为立即的调度程序。的想法是在当前运行的线程上立即执行工作。结果如下:

main|Imediate interval: >0
main|Imediate interval: ->1
main|Imediate interval: -->2
main|Imediate interval: --->3
main|Imediate interval: ---->4

通过指定此调度器,我们使interval Observable实例在当前线程上运行。

上述示例的源代码可在中找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/IntervalAndSchedulers.java

在调度程序的帮助下,我们可以指示操作员在特定线程上运行或使用特定的线程池。

我们刚刚讨论的所有内容都会让我们得出这样的结论:调度器会生成新线程,或者重用已经生成的线程,而操作Observable实例链的一部分,在这些线程上执行。因此,我们可以通过只使用它们来实现并发性(操作员同时进行操作)。

为了拥有多线程逻辑,我们必须学习以下两件事:

  • 我们可以从中选择的调度器类型
  • 如何将这些调度器与任意Observable操作一起使用

调度器的类型

有几种类型的schedulers专用于某些类型的动作。为了了解更多关于它们的知识,让我们来看看 AuthT1 类。

事实证明,这个类非常简单。它只有两种方法,如下所示:

  • long now()
  • abstract Worker createWorker()

第一个以毫秒为单位返回当前时间,第二个创建一个Worker实例。这些Worker实例用于在单个线程或事件循环上执行操作(取决于实现)。使用工作人员的schedule*方法完成执行的调度操作。Worker类实现了Subscription接口,所以它有一个unsubscribe()方法。取消订阅Worker计划外所有未完成的工作,并允许资源清理。

我们可以使用 worker 在Observable上下文之外执行调度。对于每种Scheduler类型,我们可以执行以下操作:

scheduler.createWorker().schedule(Action0);

这将计划传递的操作并执行它。在大多数情况下,此方法不应直接用于调度工作,我们只需选择正确的调度器,并在其上调度操作即可。为了了解它们的作用,我们可以使用该方法检查可用的各种类型的调度器。

让我们定义一种测试方法:

void schedule(Scheduler scheduler, int numberOfSubTasks, boolean onTheSameWorker) {
  List<Integer> list = new ArrayList<>(0);
  AtomicInteger current = new AtomicInteger(0);
  Random random = new Random();
  Worker worker = scheduler.createWorker();
  Action0 addWork = () -> {
    synchronized (current) {
      System.out.println("  Add : " + Thread.currentThread().getName() + " " + current.get());
      list.add(random.nextInt(current.get()));
      System.out.println("  End add : " + Thread.currentThread().getName() + " " + current.get());
    }
  };
  Action0 removeWork = () -> {
    synchronized (current) {
      if (!list.isEmpty()) {
        System.out.println("  Remove : " + Thread.currentThread().getName());
        list.remove(0);
        System.out.println("  End remove : " + Thread.currentThread().getName());
      }
    }
  };
  Action0 work = () -> {
    System.out.println(Thread.currentThread().getName());
    for (int i = 1; i <= numberOfSubTasks; i++) {
      current.set(i);
      System.out.println("Begin add!");
      if (onTheSameWorker) {
        worker.schedule(addWork);
      }
      else {
 scheduler.createWorker().schedule(addWork);
      }
      System.out.println("End add!");
    }
    while (!list.isEmpty()) {
      System.out.println("Begin remove!");
    if (onTheSameWorker) {
 worker.schedule(removeWork);
    }
    else {
 scheduler.createWorker().schedule(removeWork);
    }
    System.out.println("End remove!");
  };
  worker.schedule(work);
}

该方法使用传递的Scheduler实例进行一些工作。有一个选项可以指定是为每个任务使用相同的Worker实例,还是为每个子任务生成一个新实例。基本上,虚拟工作包括用随机数填充列表,然后逐个删除这些数字。每个添加操作删除操作都是通过传递的Scheduler实例创建的 worker 作为子任务进行调度的。在每个子任务之前和之后,会记录当前线程和一些附加信息。

提示

在真实场景中,一旦完成所有工作,我们应该始终调用worker.unsubscribe()方法。

转到预定义的Scheduler实例。它们可以通过Schedulers类中包含的一组静态方法进行检索。我们将使用前面定义的调试方法来检查它们的行为,以了解它们的差异和有用性。

调度器。即时调度器

Schedulers.immediate调度器此时此地执行工作。当一个动作被传递给它的工作者的schedule(Action0)方法时,它就被调用了。假设我们使用它运行测试方法,如下所示:

schedule(Schedulers.immediate(), 2, false);
schedule(Schedulers.immediate(), 2, true);

在这两种情况下,结果如下所示:

main
Begin add!
 Add : main 1
 End add : main 1
End add!
Begin add!
 Add : main 2
 End add : main 2
End add!
Begin remove!
 Remove : main
 End remove : main
End remove!
Begin remove!
 Remove : main
 End remove : main
End remove!

换句话说,所有的都是在主调用线程上执行的,没有任何东西是并行的。

该调度器可用于在前台执行interval()timer()等方法。

调度员。蹦床调度员

调度程序,通过Schedulers.trampoline方法子任务排队到当前thread上。排队工作在当前正在进行的工作完成后执行。假设我们要运行这个:

schedule(Schedulers.trampoline(), 2, false);
schedule(Schedulers.trampoline(), 2, true);

在第一种情况下,结果将与即时调度器的结果相同,因为所有任务都在它们自己的Worker实例中执行,因此,每个 worker 中只有一个任务排队等待执行。但是,当我们使用相同的Worker实例来调度每个子任务时,我们得到如下结果:

main
Begin add!
End add!
Begin add!
End add!
 Add : main 2
 End add : main 2
 Add : main 2
 End add : main 2

换句话说,它将首先执行整个主操作,然后执行子任务;因此,List实例将被填充(子任务已排队),但不会被清空。这是因为在执行主任务时,List实例仍然是空的,while循环没有被触发。

trampoline调度器在递归运行多个任务时可用于避免StackOverflowError异常。例如,让我们假设一个任务完成了,然后调用它自己来执行一些新的工作。在单线程环境中,由于递归,这将导致堆栈溢出;但是,如果我们使用蹦床调度器,它将序列化所有计划的活动,堆栈深度将保持正常。然而,蹦床调度程序通常比即时调度程序慢。因此,使用正确的方法取决于用例。

Schedulers.newThread 调度程序

此计划为每个新Worker实例创建一个Thread实例(精确地说是一个单线程ScheduledThreadPoolExecutor实例)。此外,每个工作人员通过其schedule()方法将接收到的操作排队,这与 trampoline 调度器非常相似。让我们看看下面的代码:

schedule(Schedulers.newThread(), 2, true);

它将具有与蹦床相同的行为,但将在新的thread:中运行

RxNewThreadScheduler-1
Begin add!
End add!
Begin add!
End add!
  Add : RxNewThreadScheduler-1 2
  End add : RxNewThreadScheduler-1 2
  Add : RxNewThreadScheduler-1 2
  End add : RxNewThreadScheduler-1 2

相反,如果我们这样称呼测试方法:

schedule(Schedulers.newThread(), 2, false);

这将为每个子任务产生一个新的Thread实例,该实例将产生类似以下内容的输出:

RxNewThreadScheduler-1
Begin add!
End add!
Begin add!
  Add : RxNewThreadScheduler-2 1
  End add : RxNewThreadScheduler-2 2
End add!
Begin remove!
  Add : RxNewThreadScheduler-3 2
  End add : RxNewThreadScheduler-3 2
End remove!
Begin remove!
End remove!
Begin remove!
  Remove : RxNewThreadScheduler-5
  End remove : RxNewThreadScheduler-5
  Remove : RxNewThreadScheduler-4
  End remove : RxNewThreadScheduler-4
End remove!

通过使用新线程Scheduler实例,可以执行后台任务。

这里一个非常重要的要求是,它的工作人员需要被取消订阅,以避免线程和操作系统资源泄漏。请注意,每次创建新线程的成本很高,因此在大多数情况下,应该使用计算IOScheduler实例。

调度器。计算调度器

计算调度器与新线程非常相似,但它考虑了运行它的机器所拥有的处理器/内核的数量,并使用了一个线程池,可以重用有限数量的线程。每个新的Worker实例都会在其中一个Thread实例上安排顺序操作。如果线程在执行时未被使用,并且该线程处于活动状态,则它们将排队等待稍后在其上执行。

如果我们使用相同的Worker实例,我们只需将其线程上的所有操作排队,结果将与使用新线程Scheduler实例调度一个Worker实例相同。

我的机器有四个磁芯。假设我这样调用它的测试方法:

schedule(Schedulers.computation(), 5, false);

我会得到如下类似的输出:

RxComputationThreadPool-1
Begin add!
  Add : RxComputationThreadPool-2 1
  End add : RxComputationThreadPool-2 1
End add!
Begin add!
End add!
Begin add!
  Add : RxComputationThreadPool-3 3
  End add : RxComputationThreadPool-3 3
End add!
Begin add!
  Add : RxComputationThreadPool-4 4
End add!
Begin add!
  End add : RxComputationThreadPool-4 4
End add!
Begin remove!
End remove!
Begin remove!
  Add : RxComputationThreadPool-2 5
  End add : RxComputationThreadPool-2 5
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
  Remove : RxComputationThreadPool-3
End remove!
Begin remove!
  End remove : RxComputationThreadPool-3
  Remove : RxComputationThreadPool-2
End remove!
Begin remove!
  End remove : RxComputationThreadPool-2
End remove!
Begin remove!
  Remove : RxComputationThreadPool-2
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
End remove!
Begin remove!
  End remove : RxComputationThreadPool-2
End remove!
  Remove : RxComputationThreadPool-2
Begin remove!
  End remove : RxComputationThreadPool-2
End remove!
  Add : RxComputationThreadPool-1 5
  End add : RxComputationThreadPool-1 5
  Remove : RxComputationThreadPool-1
  End remove : RxComputationThreadPool-1

所有的都只使用池中的四个Thread实例执行(请注意,有一种方法可以将Thread实例的数量限制为小于可用处理器计数)。

计算Scheduler实例是您进行后台工作计算或处理的真正选择,因此它的名称。您可以将其用于应该在后台运行的所有操作,而不是与IO相关或阻塞操作。

Schedulers.io 调度器

输入输出(IO)调度器使用ScheduledExecutorService实例从线程池中为其工作线程检索线程。未使用的线程将被缓存并按需重用。如果有必要,它可以生成任意数量的线程。

同样,如果我们只使用一个Worker实例运行我们的示例,那么操作将在其线程上排队,并且它的行为将类似于计算新线程调度器。

假设我们使用多个Worker实例运行它,如下所示:

schedule(Schedulers.io(), 2, false);

它将根据需要从其中生成Thread实例。结果如下所示:

RxCachedThreadScheduler-1
Begin add!
End add!
Begin add!
 Add : RxCachedThreadScheduler-2 2
 End add : RxCachedThreadScheduler-2 2
End add!
Begin remove!
 Add : RxCachedThreadScheduler-3 2
 End add : RxCachedThreadScheduler-3 2
End remove!
Begin remove!
 Remove : RxCachedThreadScheduler-4
 End remove : RxCachedThreadScheduler-4
End remove!
Begin remove!
End remove!
Begin remove!
 Remove : RxCachedThreadScheduler-6
 End remove : RxCachedThreadScheduler-6
End remove!

IO调度器预留用于阻塞IO 操作。它用于请求服务器、读取文件和套接字以及其他类似的阻塞任务。注意,它的线程池是无限的;如果它的员工没有退订,人才库将无限期增长。

上述所有代码的源代码位于https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/SchedulersTypes.java

调度器。来自(执行器)方法

此可用于创建自定义Scheduler实例。如果预定义的调度器都不适合您,请使用此方法,将其传递给java.util.concurrent.Executor实例,以实现所需的行为。

现在我们已经了解了如何以及何时使用预定义的Scheduler实例,现在是时候看看如何将它们与Observable序列集成。

将可观察对象和调度器相结合

为了在其他线程上执行我们的可观察逻辑,我们可以使用调度器。有两个特殊操作符,接收Scheduler作为参数,生成Observable实例,可以对Thread实例执行与当前实例不同的操作。

可观察的订阅(调度器)方法

subscribeOn()方法创建Observable实例,其subscribe方法导致订阅发生在从传递的调度程序检索到的线程上。例如,我们有:

Observable<Integer> range = Observable
  .range(20, 4)
  .doOnEach(debug("Source"));
range.subscribe();

System.out.println("Hey!");

我们将获得以下输出:

main|Source: >20
main|Source: ->21
main|Source: -->22
main|Source: --->23
main|Source: -------->|
Hey!

这是正常;调用subscribe方法在主线程上执行可观察的逻辑,所有这些都完成后,我们才看到'Hey!'

让我们修改代码如下所示:

CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
  .range(20, 4)
  .doOnEach(debug("Source"))
  .subscribeOn(Schedulers.computation())
  .finallyDo(() -> latch.countDown());
range.subscribe();
System.out.println("Hey!");
latch.await();

输出更改为以下内容:

Hey!
RxComputationThreadPool-1|Source: >20
RxComputationThreadPool-1|Source: ->21
RxComputationThreadPool-1|Source: -->22
RxComputationThreadPool-1|Source: --->23
RxComputationThreadPool-1|Source:--------->|

这意味着调用方线程不会首先或在数字之间阻塞打印“Hey!',所有Observable实例可观察逻辑都在计算线程上执行。这样,您就可以使用您喜欢的每个调度器来决定在何处执行工作。

在这里,我们需要提到关于subscribeOn()方法的一些重要内容。如果您在整个链中多次这样称呼它:

CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
  .range(20, 3)
  .doOnEach(debug("Source"))
  .subscribeOn(Schedulers.computation());
Observable<Character> chars = range
  .map(n -> n + 48)
  .map(n -> Character.toChars(n))
  .subscribeOn(Schedulers.io())
  .map(c -> c[0])
  .subscribeOn(Schedulers.newThread())
  .doOnEach(debug("Chars ", "    "))
  .finallyDo(() -> latch.countDown());
chars.subscribe();
latch.await();

对它的调用是到链的开始最近的很重要。这里我们先在计算调度器上订阅,然后在IO调度器上订阅,然后在新线程调度器上订阅,但是我们的代码将在计算调度器上执行,因为这是在链中首先指定的IO

RxComputationThreadPool-1|Source: >20
RxComputationThreadPool-1|Chars :     >D
RxComputationThreadPool-1|Source: ->21
RxComputationThreadPool-1|Chars :     ->E
RxComputationThreadPool-1|Source: -->22
RxComputationThreadPool-1|Chars :     -->F
RxComputationThreadPool-1|Source: --->|
RxComputationThreadPool-1|Chars :     --->|

总之,不要在生成Observable实例的方法中指定调度器;将此选项留给方法的调用方。或者,让您的方法接收一个Scheduler实例作为参数;比如说Observable.interval方法。

subscribeOn()运算符可用于Observable实例,这些实例在用户订阅调用线程时阻止调用线程。对这些源使用subscribeOn()方法可以让调用线程与Observable实例逻辑同时进行。

那么另一个操作符呢,它帮助我们在其他线程上工作?

可观测的可观测(调度)算子

observeOn()操作符与subscribeOn()操作符类似,但它不是在传递的Scheduler实例上执行整个链,而是从其所在位置开始执行链的一部分。最简单的理解方法是通过示例。在稍微修改后,让我们使用上一个:

CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
  .range(20, 3)
  .doOnEach(debug("Source"));
Observable<Character> chars = range
  .map(n -> n + 48)
  .doOnEach(debug("+48 ", "    "))
  .map(n -> Character.toChars(n))
  .map(c -> c[0])
  .observeOn(Schedulers.computation())
  .doOnEach(debug("Chars ", "    "))
  .finallyDo(() -> latch.countDown());
chars.subscribe();
System.out.println("Hey!");
latch.await();

这里,我们告诉Observable链在订阅后在线程上执行,直到它到达observeOn()操作符。此时,它在计算调度程序上移动。其输出类似于以下内容:

main|Source: >20
main|+48 :     >68
main|Source: ->21
main|+48 :     ->69
main|Source: -->22
main|+48 :     -->70
RxComputationThreadPool-3|Chars :     >D
RxComputationThreadPool-3|Chars :     ->E
RxComputationThreadPool-3|Chars :     -->F
main|Source: --->|
main|+48 :    --->|
Hey!
RxComputationThreadPool-3|Chars :    --->|

如所示,呼叫操作员之前的链部分阻塞了线程,阻止打印Hey!。但是,在所有通知通过observeOn()操作符后,打印'Hey!'并在计算线程上继续执行。

如果我们将操作符向上移动Observable链,则大部分逻辑将使用计算调度器执行。

当然,observeOn()操作符可以与subscribeOn()操作符一起使用。这样,链的一部分可以在一个线程上执行,其余部分可以在另一个线程上执行(在大多数情况下)。这在编写客户端应用代码时特别有用,因为这些应用通常在一个事件排队线程上运行。您可以使用带有subscribeOn()/observeOn()操作符的IO调度程序读取文件/服务器,然后在事件线程上观察结果。

提示

这本书没有介绍 RxJava 的 Android 模块,但它得到了相当多的关注。您可以在这里阅读更多信息:https://github.com/ReactiveX/RxJava/wiki/The-RxJava-Android-Module

如果你是一个 Android 开发者,不要错过它!

中有SwingJavaFx与类似的模块。

让我们看一个同时使用subscribeOn()observeOn()运算符的示例:

CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> range = Observable
  .range(20, 3)
  .subscribeOn(Schedulers.newThread())
  .doOnEach(debug("Source"));
Observable<Character> chars = range
  .observeOn(Schedulers.io())
  .map(n -> n + 48)
  .doOnEach(debug("+48 ", "    "))
  .observeOn(Schedulers.computation())
  .map(n -> Character.toChars(n))
  .map(c -> c[0])
  .doOnEach(debug("Chars ", "    "))
  .finallyDo(() -> latch.countDown());
chars.subscribe();
latch.await();

在这里,我们在链的开头使用对subsribeOn()操作符的一次调用(实际上,我们把它放在哪里并不重要,因为这是对该操作符的唯一调用)和两次对observeOn()操作符的调用。执行此代码的结果如下所示:

RxNewThreadScheduler-1|Source: >20
RxNewThreadScheduler-1|Source: ->21
RxNewThreadScheduler-1|Source: -->22
RxNewThreadScheduler-1|Source: --->|
RxCachedThreadScheduler-1|+48 :     >68
RxCachedThreadScheduler-1|+48 :     ->69
RxCachedThreadScheduler-1|+48 :     -->70
RxComputationThreadPool-3|Chars :     >D
RxCachedThreadScheduler-1|+48 :     --->|
RxComputationThreadPool-3|Chars :     ->E
RxComputationThreadPool-3|Chars :     -->F
RxComputationThreadPool-3|Chars :     --->|

我们可以看到链条穿过三条线。如果我们使用更多的元素来执行此操作,一些代码将以并行的方式执行。结论是,使用observeOn()操作符,可以多次更改线程;使用subscribeOn()操作符,我们可以在订阅时一次性完成此操作—

前面的示例中的observeOn()/subscribeOn()运算符的源可在中找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/SubscribeOnAndObserveOn.java

通过这两个操作符,我们可以让Observable实例和多线程一起工作。但是同时并不意味着我们可以并行。这意味着我们的程序有多个线程,独立地取得了一些进展。真正的并行性是当我们的程序使用它所运行的机器的 CPU(内核)达到最大值时,它的线程实际上同时运行。

到目前为止,我们所有的示例都只是将链逻辑转移到另一个线程上。虽然有些示例确实在并行中执行了部分操作,但真正的并行示例看起来有所不同。

平行性

只有使用我们已经知道的操作符,我们才能实现并行化。想想flatMap()操作符;它为源发出的每个项创建一个Observable实例。如果我们在这些Observable实例上调用具有Scheduler实例的subscribeOn()操作符,它们中的每一个都将在新的Worker实例上调度,并且它们将以并行方式工作(如果主机允许)。以下是一个例子:

Observable<Integer> range = Observable
  .range(20, 5)
  .flatMap(n -> Observable
    .range(n, 3)
    .subscribeOn(Schedulers.computation())
    .doOnEach(debug("Source"))
  );
range.subscribe();

此代码的输出如下所示:

RxComputationThreadPool-3|Source: >23
RxComputationThreadPool-4|Source: >20
RxComputationThreadPool-2|Source: >22
RxComputationThreadPool-3|Source: ->24
RxComputationThreadPool-1|Source: >21
RxComputationThreadPool-2|Source: ->23
RxComputationThreadPool-3|Source: -->25
RxComputationThreadPool-3|Source: --->|
RxComputationThreadPool-4|Source: ->21
RxComputationThreadPool-4|Source: -->22
RxComputationThreadPool-4|Source: --->|
RxComputationThreadPool-2|Source: -->24
RxComputationThreadPool-2|Source: --->|
RxComputationThreadPool-1|Source: ->22
RxComputationThreadPool-1|Source: -->23
RxComputationThreadPool-1|Source: --->|
RxComputationThreadPool-4|Source: >24
RxComputationThreadPool-4|Source: ->25
RxComputationThreadPool-4|Source: -->26
RxComputationThreadPool-4|Source: --->|

通过可以看到通过flatMap()操作符定义的Observable实例在并行中执行的线程名称。事实上,这四个线程正在使用我的处理器的四个核心。

我将提供另一个例子,这次是针对远程服务器的并行请求。我们将使用上一章中定义的requestJson()方法。这个想法是:

  1. 我们将检索有关 GitHub 用户的追随者的信息(在本例中,我们将使用我的帐户)。
  2. 对于每个关注者,我们将获得其个人资料的 URL。
  3. 我们将在并行中请求追随者的简介。
  4. 我们将打印追随者的数量及其追随者的数量。

让我们看看这是如何实现的:

Observable<Map> response = CreateObservable.requestJson(
  client,
  "https://api.github.com/users/meddle0x53/followers"
); // (1)
response
  .map(followerJson -> followerJson.get("url")) // (2)
  .cast(String.class)
  .flatMap(profileUrl -> CreateObservable
    .requestJson(client, profileUrl)
    .subscribeOn(Schedulers.io()) // (3)
    .filter(res -> res.containsKey("followers"))
    .map(json ->  // (4)
      json.get("login") +  " : " +
      json.get("followers"))
  )
  .doOnNext(follower -> System.out.println(follower)) // (5)
  .count() // (6)
  .subscribe(sum -> System.out.println("meddle0x53 : " + sum));

下面是前面代码中发生的事情:

  1. 首先,我们对我的用户的追随者数据执行请求。
  2. 请求以JSON字符串的形式返回跟随者,并将其转换为Map对象(参见requestJson方法的实现)。从每个JSON文件中,读取到它所表示的跟随者概要文件的 URL。
  3. 为每个 URL 执行一个新请求。请求在IO线程上以并行方式运行,因为我们使用了与上一个示例相同的技术。值得一提的是,flatMap()运算符有一个重载,它接受一个maxConcurrent整数参数。我们可以使用它限制并发请求。
  4. 获取追随者的用户数据后,将生成其追随者的信息。
  5. 此信息作为副作用打印。
  6. 我们使用count()操作符统计我的追随者(与scan(0.0, (sum, element) -> sum + 1).last()调用相同)。然后我们把它们打印出来。打印数据的顺序不能保证与遍历跟随器的顺序相同。

上例的源代码可以在找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/ParallelRequestsExample.java

这都是关于并发并行的。一切都很简单,但功能强大。有一些规则(例如使用Subscribers.io实例进行阻塞操作,使用计算实例进行后台任务,等等),即使使用多线程可观察的动作链,您也必须遵循这些规则,以确保不会出错。

很有可能使用这种并行技术向Observable实例链中注入数据,这是一个问题。这就是我们必须处理它的原因。通过本章的其余部分,我们将学习如何处理来自上游可观察的动作链的太多元素。

下面是一个有趣的例子:

Path path = Paths.get("src", "main", "resources");
Observable<String> data = CreateObservable
  .listFolder(path, "*")
  .flatMap(file -> {
    if (!Files.isDirectory(file)) {
      return CreateObservable
    .from(file)
    .subscribeOn(Schedulers.io());
  }
  return Observable.empty();
});
subscribePrint(data, "Too many lines");

这将遍历文件夹中的所有文件,如果它们不是文件夹本身,则并行读取所有文件。例如,当我运行它时,文件夹中有五个文本文件,其中一个相当大。在使用我们的subscribePrint()方法打印这些文件的内容时,我们得到如下结果:

Too many lines : Morbi nec nulla ipsum.
Too many lines : Proin eu tellus tortor.
Too many lines : Lorem ipsum dolor sit am
Error from Too many lines:
rx.exceptions.MissingBackpressureException
Too many lines : Vivamus non vulputate tellus, at faucibus nunc.
Too many lines : Ut tristique, orci eu
Too many lines : Aliquam egestas malesuada mi vitae semper.
Too many lines : Nam vitae consectetur risus, vitae congue risus.
Too many lines : Donec facilisis sollicitudin est non molestie.
 rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349)
 rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:721)
 rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:698)
 rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:586)
 rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)

输出被裁剪,但重要的是我们得到了这个MissingBackpressureException异常。

读取每个文件的线程正试图将其数据推送到merge()操作符中(将flatMap()操作符实现为merge(map(func))。操作员正在处理大量数据,因此将尝试通知生产过剩的Observable实例减速(这种通知上游无法处理大量数据的能力称为背压。问题是他们没有实现这样的机制(背压,所以遇到了MissingBackpressureException异常。

通过使用一种特殊的onBackpressure*方法将背压应用到上游可观测物中,或者通过将大量进入的项目打包到一组较小的排放物中,试图避免这种情况。此包装通过缓冲滴下部分来料、节流(使用时间间隔或事件进行缓冲)和去抖动(使用物料排放间隔进行缓冲)完成。

让我们看看其中的一些。

节流

利用这种机制,我们可以调节Observable实例的排放率。我们可以指定时间间隔或另一个流控制Observable实例来实现这一点。

使用sample()操作符,我们可以使用另一个实例或时间间隔来控制Observable实例的排放。

data = data
  .sample(
 Observable
 .interval(100L, TimeUnit.MILLISECONDS)
 .take(10)
 .concatWith(
 Observable
 .interval(200L, TimeUnit.MILLISECONDS)
 )
 );
subscribePrint(data, "Too many lines");

采样Observable实例在前两秒内每 100 毫秒发射一次,然后开始每 200 毫秒发射一次。数据Observable实例丢弃其所有项,直到采样发出。发生这种情况时,数据Observable实例发出的最后一项被传递。所以我们有很大的数据丢失,但是遇到MissingBackpressureException异常更难(尽管有可能得到它)。

sample()操作符有两个额外的重载,您可以将时间间隔传递给它们,TimeUnit度量和Scheduler实例(可选):

data = data.sample(
 100L,
 TimeUnit.MILLISECONDS
);

Observable实例中使用sample()操作符可以让我们更详细地控制数据流。throttleLast()操作符只是接收时间间隔的sample()操作符的不同版本的别名。throttleFirst()操作符与throttleLast()操作符相同,但Observable实例将在间隔开始时发射它发射的第一项,而不是最后一项。默认情况下,这些操作符在计算调度程序上运行。

当您有多个类似的事件时,这些技术非常有用(以及本节中的大多数其他技术)。例如,如果要捕获并响应鼠标移动事件,则不需要所有事件,包含所有像素位置;你只需要其中的一些。

去抖动

在我们前面的示例中,去抖动不起作用。它的想法是只发射在给定时间间隔内不跟随其他项目的项目。因此,为了传播某些东西,排放之间必须经过一段时间。因为我们的数据Observable实例中的所有项看起来都是同时发出的,所以它们之间没有使用的间隔。所以我们需要稍微改变一下这个例子来演示这一点。

Observable<Object> sampler = Observable.create(subscriber -> {
  try {
    subscriber.onNext(0);
    Thread.sleep(100L);
    subscriber.onNext(10);
    Thread.sleep(200L);
    subscriber.onNext(20);
    Thread.sleep(150L);
    subscriber.onCompleted();
  }
  catch (Exception e) {
    subscriber.onError(e);
  }
}).repeat()
  .subscribeOn(Schedulers.computation());
data = data
  .sample(sampler)
  .debounce(150L, TimeUnit.MILLISECONDS);

在这里,我们使用带有特殊采样Observable实例的sample()操作符,以减少 100、200 和 150 毫秒的排放。通过使用repeat()操作符,我们创建一个无限Observable实例,重复源代码,并将其设置为在计算调度程序上执行。现在,我们可以使用debounce()操作符仅发射这组发射时间间隔为 150 毫秒或更长的项目。

去抖动节流一样,可用于过滤过度生产源中的类似事件。自动完成搜索就是一个很好的例子。我们不希望触发对用户输入的每一封信件的搜索;我们需要等待他/她停止键入,然后触发搜索。我们可以使用debounce()操作符,并设置合理的时间间隔debounce()运算符有一个重载,该重载将Scheduler实例作为其第三个参数。此外,还有一个重载,选择器返回一个Observable实例,以便对数据流进行更细粒度的控制。

缓冲器和窗口操作器

这两套算子是变换算子,很像map()flatMap()算子。它们转换集合中的系列元素—这些元素的序列将作为一个整体发射。

本书不会详细介绍这些操作员,但值得一提的是,buffer()操作员具有能够根据时间间隔选择器和其他Observable实例收集排放的过载。也可以将其配置为跳过项目。下面是一个使用buffer(int count, int skip)方法的示例,buffer()操作符的一个版本,它收集计数项并跳过跳过项:

data = data.buffer(2, 3000);
Helpers.subscribePrint(data, "Too many lines");

这将输出类似于以下内容的内容:

Too many lines : ["Lorem ipsum dolor sit amet, consectetur adipiscing elit.", "Donec facilisis sollicitudin est non molestie."]
Too many lines : ["Integer nec magna ac ex rhoncus imperdiet.", "Nullam pharetra iaculis sem."]
Too many lines : ["Integer nec magna ac ex rhoncus imperdiet.", "Nullam pharetra iaculis sem."]
Too many lines : ["Nam vitae consectetur risus, vitae congue risus.", "Donec facilisis sollicitudin est non molestie."]
Too many lines : ["Sed mollis facilisis rutrum.", "Proin enim risus, congue id eros at, pharetra consectetur ex."]
Too many lines ended!

window()运算符与buffer()运算符具有完全相同的重载集。不同之处在于,window()操作符创建的Observable实例发出Observable实例,而不是缓冲元素的数组,发出收集到的元素。

为了演示不同的重载,我们将使用window(long timespan, long timeshift, TimeUnit units)方法给出一个示例。该操作员收集在时间间隔内发出的元素,并跳过在时间间隔内发出的所有元素。重复此操作,直到源Observable实例完成。

data = data
  .window(3L, 200L, TimeUnit.MILLISECONDS)
  .flatMap(o -> o);
subscribePrint(data, "Too many lines");

我们使用flatMap()操作符展平Observable实例。结果包括在订阅的前三毫秒内发射的所有项,加上 200 毫秒间隔后三毫秒内发射的项,并且在源发射时重复此操作。

上一节中介绍的所有示例可在中找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter06/BackpressureExamples.java

背压操作人员

防止MissingBackpressureException异常的最后一组操作员实际上会在出现生产过剩的Observable实例时自动激活。

onBackpressureBuffer()操作员缓冲由其Observer实例的Observable发出的项目。然后以订阅者可以处理的方式发出缓冲项。例如:

Helpers.subscribePrint(
  data.onBackpressureBuffer(10000),
  "onBackpressureBuffer(int)"
);

这里我们使用了大容量的缓冲区,因为有大量的元素,但是请注意,溢出此缓冲区将返回MissingBackpressureException异常。

onBackpressureDrop()操作员丢弃Observable实例中所有用户无法处理的传入项目。

有一种方法可以通过实现智能观察者或订阅者来建立背压,但是这个主题超出了本书的范围。有一篇关于背压的优秀文章,可以在 RxJava 维基页面上看到 https://github.com/ReactiveX/RxJava/wiki/Backpressure 。本节中提到的许多操作符在这里有详细的描述,并且有大理石图可帮助您理解更复杂的操作符。

在本章中,我们学习了如何在与线程不同的其他线程上执行我们的可观察逻辑。有一些简单的规则和技巧可以做到这一点,如果每件事都得到相应的遵守,就不会有危险。使用这些技术,我们能够编写并发程序。我们还学习了如何使用调度器和flatMap()操作符实现并行执行,我们看到了一个实际的例子。

我们研究过的另一件有用的事情是如何处理生产过剩的数据源。有很多运营商可以通过不同的方式实现这一点,我们介绍了其中一些运营商,并讨论了它们的有用性。

有了这些,我们就有了编写任意 RxJava 程序的知识,能够处理来自不同来源的数据。我们知道如何使用多个线程来实现这一点。使用 RxJava 及其运算符和结构几乎就像使用新语言进行编码一样。它有自己的规则和流量控制方法。

为了编写稳定的应用,我们必须学习如何单元测试它们。测试异步代码并非易事。好消息是,RxJava 提供的一些操作符和类将帮助我们做到这一点。你可以在下一章中阅读更多关于它们的内容。

教程来源于Github,感谢apachecn大佬的无私奉献,致敬!

技术教程推荐

白话法律42讲 -〔周甲徳〕

许式伟的架构课 -〔许式伟〕

性能工程高手课 -〔庄振运〕

Serverless入门课 -〔蒲松洋(秦粤)〕

软件设计之美 -〔郑晔〕

数据分析思维课 -〔郭炜〕

零基础实战机器学习 -〔黄佳〕

超级访谈:对话张雪峰 -〔张雪峰〕

AI大模型企业应用实战 -〔蔡超〕