Java8 创建和连接可观察对象、观察者和主体详解

RxJava 的Observable实例是反应式应用的构建块,RxJava 的这一优势是有益的。如果我们有一个源Observable实例,我们可以将逻辑链接到它并订阅以获得结果。我们只需要这个初始Observable实例。

在浏览器或桌面应用中,用户输入已经由我们可以通过Observable实例处理和转发的事件表示。但是,将我们所有的数据更改或操作转换为Observable实例,而不仅仅是用户输入,这将是非常棒的。例如,当我们从一个文件中读取数据时,将读取的每一行或每一个字节序列看作是一条可以通过Observable实例发出的消息,这将是一件好事。

我们将详细介绍如何将不同的数据源转换为Observable实例;不管它们是外部(文件或用户输入)还是内部(集合或标量)。此外,我们还将了解各种类型的Observable实例,具体取决于它们的行为。我们将学习的另一个重要内容是如何以及何时取消订阅Observable实例,以及如何使用订阅和Observer实例。此外,我们还将介绍主题类型及其用法。

在本章中,我们将了解:

有很多方法可以从不同的来源创建Observable实例。原则上,可以使用Observable.create(OnSubscribe<T>)方法创建Observable实例,但是有很多简单的方法,它们的实现都是为了让我们的生活更美好。让我们看看其中的一些。

Observable.from方法可以从不同的 Java 结构创建Observable实例。对于示例:

List<String> list = Arrays.asList(
  "blue", "red", "green", "yellow", "orange", "cyan", "purple"
);
Observable<String> listObservable = Observable.from(list);
listObservable.subscribe(System.out::println);

这段代码从List实例创建Observable实例。在Observable实例上调用subscribe方法时,源列表中包含的所有元素都被发送到订阅方法。对于对subscribe()方法的每次调用,整个集合从一开始就逐个元素发出:

listObservable.subscribe(
  color -> System.out.print(color + "|"),
  System.out::println,
  System.out::println
);
listObservable.subscribe(color -> System.out.print(color + "/"));

这将使用不同的格式打印两次颜色。

此版本的from方法的真实签名为final static <T> Observable<T> from(Iterable<? extends T> iterable)。这意味着可以将实现Iterable接口的任何类的实例传递给此方法。其中包括任何 Java 集合,例如:

Path resources = Paths.get("src", "main", "resources");
try (DirectoryStream<Path> dStream =Files.newDirectoryStream(resources)) {
  Observable<Path> dirObservable = Observable.from(dStream);
  dirObservable.subscribe(System.out::println);
}
catch (IOException e) {
  e.printStackTrace();
}

这会将文件夹的内容转换为我们可以订阅的事件。这是可能的,因为DirectoryStream参数是Iterable实例。请注意,每次调用此Observable实例的subscribe方法时,都会调用其Iterable源的iterator()方法,以获取一个新的Iterator实例,用于从头遍历数据。在本例中,第二次调用subscribe()方法时会抛出java.lang.IllegalStateException异常,因为DirectoryStream参数的iterator()方法只能调用一次。

用于从数组创建Observable实例的from方法的另一个重载是public final static <T> Observable<T> from(T[] array),使用Observable实例的示例如下:

Observable<Integer> arrayObservable = Observable.from(new Integer[] {3, 5, 8});
  arrayObservable.subscribe(System.out::println);

Observable.from()方法对于从集合或数组创建Observable实例非常有用。但有时我们需要从单个对象创建Observable实例;对于这些,可以使用Observable.just()方法。

使用Observable.from()方法示例的源代码可在查看和下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/CreatingObservablesWithFrom.java

just()方法以OnNext通知的形式发出其参数,然后发出OnCompleted通知。

例如,一个字母:

Observable.just('S').subscribe(System.out::println);

或一系列字母:

Observable
  .just('R', 'x', 'J', 'a', 'v', 'a')
  .subscribe(
    System.out::print,
    System.err::println,
    System.out::println
  );

第一段代码打印S和一个新行,第二段代码在一行上打印字母,并在完成时添加一个新行。该方法允许通过反应方式观察多达九个任意值(相同类型的对象)。例如,假设我们有一个简单的User类:

public static class User {
  private final String forename;
  private final String lastname;
  public User(String forename, String lastname) {
    this.forename = forename;
    this.lastname = lastname;
  }
  public String getForename() {
    return this.forename;
  }
  public String getLastname() {
    return this.lastname;
  }
}

我们可以这样打印User实例的全名:

Observable
  .just(new User("Dali", "Bali"))
  .map(u -> u.getForename() + " " + u.getLastname())
  .subscribe(System.out::println);

这不是很实用,但展示了将数据放在Observable实例上下文中,并利用map()方法的优势。一切都可以成为一件大事。

还有一些更方便的工厂方法,可用于各种情况。让我们在下一节中看看它们。

Observable.just()方法示例的源代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/CreatingObservablesUsingJust.java

在这里,我们将检查一些可以与转换运算符(如 flatMap)或组合运算符(如.zip文件)结合使用的方法(在下一章中详细介绍)。

为了检查他们的结果,我们将使用以下方法创建订阅:

void subscribePrint(Observable<T> observable, String name) {
  observable.subscribe(
    (v) -> System.out.println(name + " : " + v),
    (e) -> {
      System.err.println("Error from " + name + ":");
      System.err.println(e.getMessage());
    },
    () -> System.out.println(name + " ended!")
  );
}

上述方法的思想是订阅一个Observable实例,并用名称标记它。在OnNext上打印以名称为前缀的值;在OnError上,将错误与名称一起打印;而在完成的上,它会打印带有名称前缀的'ended!'。这有助于我们调试结果。

上述方法的源代码见https://github.com/meddle0x53/learning-rxjava/blob/4a2598aa0835235e6ef3bc3371a3c19896161628/src/main/java/com/packtpub/reactive/common/Helpers.java#L25

以下是介绍新工厂方法的代码:

subscribePrint(
  Observable.interval(500L, TimeUnit.MILLISECONDS),
  "Interval Observable"
);
subscribePrint(
  Observable.timer(0L, 1L, TimeUnit.SECONDS),
  "Timed Interval Observable"
);
subscribePrint(
  Observable.timer(1L, TimeUnit.SECONDS),
  "Timer Observable"
);

subscribePrint(
  Observable.error(new Exception("Test Error!")),
  "Error Observable"
);
subscribePrint(Observable.empty(), "Empty Observable");
subscribePrint(Observable.never(), "Never Observable");
subscribePrint(Observable.range(1, 3), "Range Observable");
Thread.sleep(2000L);

下面是代码中发生的情况:

  • Observable<Long> Observable.interval(long, TimeUnit, [Scheduler]):此方法创建一个Observable实例,该实例将以给定的间隔发出顺序号。它可以用来实现定期轮询或连续状态记录,只需忽略发出的数量并发出有用的消息。此方法的特殊之处在于,它默认在计算线程上运行。我们可以通过将第三个参数传递给方法 aScheduler实例来改变这一点(更多关于第 6 章中的Scheduler实例,使用与调度器的并发性和并行性)。
  • Observable<Long> Observable.timer(long, long, TimeUnit, [Scheduler])interval()方法在等待指定的时间间隔过去后才开始发送数字。如果我们想告诉它什么时候开始工作呢?我们可以使用这个timer()方法来实现这一点。它的第一个参数是开始时间,第二个和第三个参数用于间隔设置。同样,默认情况下在计算线程上执行,同样,这是可配置的。
  • Observable<Long> Observable.timer(long, TimeUnit, [Scheduler]):这只在计算线程(默认)上经过一定时间后才发出输出'0'。之后,发出已完成通知。
  • <T> Observable<T> Observable.error(Throwable):这只会发出作为错误通知传递给它的错误。这类似于经典的命令式 Java 世界中的“throw”关键字。
  • <T> Observable<T> Observable.empty():此条不会发出物品,但会立即发出一条OnCompleted通知。
  • <T> Observable<T> Observable.never():这没有任何作用。不向其Observer实例发送通知,甚至OnCompleted通知也不发送。
  • Observable<Integer>``Observable.range(int, int, [Scheduler]):此方法发送从传递的第一个参数开始的序列号。第二个参数是排放的数量。

此程序将打印以下输出:

Timed Interval Observable : 0
Error from Error Observable:
Test Error!
Range Observable : 1
Range Observable : 2
Range Observable : 3
Range Observable ended!
Empty Observable ended!
Interval Observable : 0
Interval Observable : 1
Timed Interval Observable : 1
Timer Observable : 0
Timer Observable ended!
Interval Observable : 2
Interval Observable : 3
Timed Interval Observable : 2

如您所见,interval Observable实例不发送未完成通知。程序在两秒后结束,interval Observable实例在 500 毫秒后开始发射,每 500 毫秒一次;因此,它会发出三个OnNext通知。timed interval Observable实例创建后立即开始发射,每秒发射一次;因此,我们收到了来自它的两个通知。

上例的源代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/CreatingObservablesUsingVariousFactoryMethods.java

所有这些方法都是使用Observable.create()方法实现的。

让我们先看看方法的签名:

public final static <T> Observable<T> create(OnSubscribe<T>)

它接受一个类型为OnSubscribe的参数。该接口扩展了Action1<Subscriber<? super T>>接口;换句话说,这个类型只有一个方法,只接受一个类型为Subscriber<T>的参数,不返回任何内容。每次调用Observable.subscribe()方法时都会调用此函数。它的参数是Subscriber类的一个实例,实际上是订阅Observable实例的观察者(这里Subscriber类和观察者接口具有相同的角色)。我们将在本章后面讨论它们)。我们可以在上面调用onNext()onError()onCompleted()方法,实现我们自己的定制行为。

用一个例子更容易理解这一点。让我们实现一个简单版本的Observable.from(Iterabale<T>)方法:

<T> Observable<T> fromIterable(final Iterable<T> iterable) {
  return Observable.create(new OnSubscribe<T>() {
    @Override
    public void call(Subscriber<? super T> subscriber) {
      try {
        Iterator<T> iterator = iterable.iterator(); // (1)
        while (iterator.hasNext()) { // (2)
          subscriber.onNext(iterator.next());
        }
        subscriber.onCompleted(); // (3)
      }
      catch (Exception e) {
        subscriber.onError(e); // (4)
      }
    }
  });
}

该方法将一个Iterable<T>参数作为参数,并返回一个Observable<T>参数。行为如下:

  1. Observer/Subscriber实例订阅生成的Observable实例时,从Iterable源中检索Iterator实例。Subscriber类实际上实现了Observer接口。它是一个抽象类,on*方法不是由它实现的。
  2. 虽然存在元素,但它们作为OnNext通知发出。
  3. 当所有元素都发出时,会发出一个OnCompleted通知。
  4. 如果在任何时候发生错误,将发送带有错误的OnError通知。

这是Observable.from(Iterable<T>)方法行为的一个非常简单和幼稚的实现。第一章和第二章中描述的无功和是Observable.create方法(由CreateObservable.from()使用)功率的另一个示例。

但正如我们所看到的,传递给create()方法的逻辑是在Observable实例上调用Observable.subscribe()方法时触发的。到目前为止,我们一直在用这种方法创建Observable实例并订阅。现在是详细研究它的时候了。

Observable.subscribe()方法有很多重载,如下所示:

  • subscribe():此忽略Observable实例的所有排放,如果有OnError通知,则抛出OnErrorNotImplementedException异常。这只能用于触发OnSubscribe.call行为。

  • subscribe(Action1<? super T>):只订阅onNext()方法触发更新。它忽略OnCompleted通知,如果有OnError通知,则抛出OnErrorNotImplementedException异常。对于真正的生产代码来说,这不是一个好的选择,因为很难保证不会抛出错误。

  • subscribe(Action1<? super T>, Action1<Throwable>):与前一个相同,但是如果有OnError通知,则调用第二个参数。

  • subscribe(Action1<? super T>,Action1<Throwable>, Action0):与前一个相同,但第三个参数是在OnCompleted通知时调用的。

  • subscribe(Observer<? super T>):使用Observer参数的onNext/onError/onCompleted方法观察Observable实例的通知。在第一章中,我们在实现“无功和”时使用了这一点。

  • subscribe(Subscriber<? super T>): This is the same as the preceding one, but the Subscriber implementation of the Observer interface is used to observe notifications. The Subscriber class provides advanced functionality, such as unsubscription (cancellation) and backpressure (flow control). Actually, all the preceding methods call this one; that's why we will be referring to it when talking about Observable.subscribe from now on. The method ensures that the Subscriber instance passed sees an Observable instance, complying with the following Rx contract:

    “发送到 Observer 接口实例的消息遵循以下语法:

    onNext(未完成?错误)?*

    此语法允许可观察序列向订阅者发送任意数量(0 或更多)的OnNext()方法消息,然后可选地发送一条成功(onCompleted或失败(onError消息。指示可观测序列已完成的单个消息确保可观测序列的使用者可以确定地确定执行清理操作是安全的。单个故障进一步确保了在多个可观察序列上工作的操作员可以维护中止语义。“

    –RxJava 的 JavaDoc 的一部分。

    这是通过在传递的Subscriber实例-SafeSubscriber周围使用包装器在内部完成的。

  • unsafeSubscribe(Subscriber<? super T>):与前一条相同,但没有Rx 合同保护。它旨在帮助实现自定义操作符(参见第 8 章资源管理和扩展 RxJava),而无需subscribe()方法保护的额外开销;不鼓励使用此方法观察通用代码中的Observable实例。

所有这些方法都返回类型为Subscription的结果,可用于Observable实例发出的通知中取消订阅。取消订阅通常会清理与订阅相关的内部资源;例如,如果我们使用Observable.create()方法实现一个 HTTP 请求,并希望在特定时间取消该请求,或者我们有一个Observable实例无限次地发出一系列数字/单词/任意数据,并希望停止该请求。

Subscription接口有两种方式:

  • void unsubscribe():用于退订
  • boolean isUnsubscribed():用于检查Subscription实例是否已经退订

Subscriber类的实例传递给Observable.create()方法的OnSubscribe()方法,实现Subscription接口。因此,在对Observable实例的行为进行编码时,可以取消订阅并检查Subscriber是否已订阅。让我们更新我们的Observable<T> fromIterable(Iterable<T>)方法实现,以对取消订阅做出反应:

<T> Observable<T> fromIterable(final Iterable<T> iterable) {
  return Observable.create(new OnSubscribe<T>() {
    @Override
    public void call(Subscriber<? super T> subscriber) {
      try {
        Iterator<T> iterator = iterable.iterator();
        while (iterator.hasNext()) {
          if (subscriber.isUnsubscribed()) {
 return;
 }
          subscriber.onNext(iterator.next());
        }
        if (!subscriber.isUnsubscribed()) {
 subscriber.onCompleted();
 }
 }
 catch (Exception e) {
 if (!subscriber.isUnsubscribed()) {
 subscriber.onError(e);
 }
 }
    }
  });
}

这里的新事物是使用Subscription.isUnsubscribed()方法来确定数据发送是否应该终止。我们检查Subscriber在每次迭代中是否已经退订,因为它可以随时退订,之后我们不需要发出任何信息。发出所有信息后,如果用户已经取消订阅,则跳过onCompleted()方法。如果存在异常,则仅当Subscriber实例仍然订阅时,才会作为OnError通知发出。

让我们看看退订是如何运作的:

Path path = Paths.get("src", "main", "resources", "lorem_big.txt"); // (1)
List<String> data = Files.readAllLines(path);
Observable<String> observable = fromIterable(data).subscribeOn(Schedulers.computation()); // (2)
Subscription subscription = subscribePrint(observable, "File");// (3)
System.out.println("Before unsubscribe!");
System.out.println("-------------------");
subscription.unsubscribe(); // (4)
System.out.println("-------------------");
System.out.println("After unsubscribe!");

下面是本例中发生的情况:

  1. 数据源是一个巨大的文件,因为我们需要花一些时间来迭代。
  2. Observable实例的所有订阅都将在另一个线程上进行,因为我们希望在主线程上取消订阅
  3. 使用本章定义的subscribePrint()方法,但修改为返回Subscription
  4. 订阅用于从Observable实例退订,所以不会打印整个文件,并且在退订执行时会有标记显示。

输出如下所示:

File : Donec facilisis sollicitudin est non molestie.
File : Integer nec magna ac ex rhoncus imperdiet.
Before unsubscribe!
-------------------
File : Nullam pharetra iaculis sem.
-------------------
After unsubscribe!

所以文件的大部分内容都被跳过了。请注意,在取消订阅之后,可能会立即发出的内容;例如,如果Subscriber实例在检查取消订阅之后立即取消订阅,并且程序已经在执行if语句的主体,则检查用户是否取消订阅。

上例的源代码可在下载/查看 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/ObservableCreateExample.java

另一件需要注意的事情是Subscriber实例有一个void add(Subscription s)方法。当Subscriber取消订阅时,每一个传递给它的订阅都会自动取消订阅。这样,我们就可以在Subscriber实例中添加额外的动作;例如,应该在取消订阅时执行的操作(类似于 Java 中的 try finally 构造)。这就是取消订阅的工作原理。在第 8 章资源管理和扩展 RxJava中,我们将讨论资源管理。我们将学习如何通过Subscription包装器将Observable实例附加到Subscriber实例,以及调用取消订阅将如何释放任何分配的资源。

我们将在本章中介绍的下一个主题与订阅行为有关。我们将讨论热和冷的例子。

查看前面使用的Observable.create()Observable.just()Observable.from()方法实现的示例,我们可以说,在有人订阅它们之前,它们处于非活动状态,不会发出任何东西。但是,每次有人订阅时,他们都会发出通知。例如,如果我们订阅一个Observable.from(Iterable)对象三次,Iterable实例将迭代三次。这样的Observable实例称为冷可观察实例。

我们在本章中使用的所有工厂方法都返回冷观测值。冷观测产生按需通知,对于每个订户,它们产生独立通知。

Observable实例,当它们开始发出通知时,是否有订阅并不重要。他们继续发射,直到完成。所有订户都会收到相同的通知,默认情况下,订户订阅时,不会收到之前发出的通知。这些都是可观察到的热门实例。

我们可以说,冷观测为每个订户生成通知,而热观测总是在运行,向所有订户广播通知。把一个可观测的热点想象成一个电台。所有正在听这首歌的听众都在听同一首歌。冷冰冰的可见光是一张音乐 CD。许多人可以购买它并独立收听。

正如我们提到的,在这本书中有很多例子使用冷观测。热可观测实例呢?如果您还记得我们在第一章中实现“无功和”时,我们有一个Observable实例,它发出用户在标准输入流中键入的每一行。这一个很热,我们从中派生了两个Observable实例,一个用于收集器a,另一个用于b。他们收到相同的输入行,只过滤他们感兴趣的行。这个输入Observable实例是使用一种特殊类型的Observable实现的,称为ConnectableObservable

可连通可观测类

这些Observable实例在调用其connect()方法之前处于非活动状态。在那之后,它们成为热门的可观测对象。通过调用Observable实例的publish()方法,可以从任何Observable实例创建ConnectableObservable实例。换句话说,publish()方法可以将任何可观察到的冷变为热。让我们看看这个例子:

Observable<Long> interval = Observable.interval(100L, TimeUnit.MILLISECONDS);
ConnectableObservable<Long> published = interval.publish();
Subscription sub1 = subscribePrint(published, "First");
Subscription sub2 = subscribePrint(published, "Second");
published.connect();
Subscription sub3 = null;
try {
  Thread.sleep(500L);
  sub3 = subscribePrint(published, "Third");
  Thread.sleep(500L);
}
catch (InterruptedException e) {}
sub1.unsubscribe();
sub2.unsubscribe();
sub3.unsubscribe();

在调用connect()方法之前,不会发生任何事情。之后,我们将看到为每个订户输出两次相同的序列号。第三个订阅服务器将加入其他两个订阅服务器,打印前 500 毫秒后发出的号码,但不会打印订阅前发出的号码。

如果我们希望接收订阅之前发出的所有通知,然后继续接收传入的通知,该怎么办?这可以通过调用replay()方法而不是publish()方法来实现。它从源Observable实例创建一个ConnectableObservable实例,有一个小小的转折:所有订阅者,无论何时订阅,都将收到所有通知(之前的通知将按顺序同步到达)。

有一种方法可以激活Observable实例变热,而无需调用connect()方法。可在第一次订阅时激活,在每个Subscriber实例退订时停用。这样的Observable实例可以通过调用ConnectableObservable实例上的refCount()方法来创建Observable实例(该方法的名称来源于“引用计数”;该方法统计订阅了Observable实例的Subscriber实例)。以下是使用refCount()方法实现的上述示例:

Observable<Long> refCount = interval.publish().refCount();
Subscription sub1 = subscribePrint(refCount, "First");
Subscription sub2 = subscribePrint(refCount, "Second");
try {
  Thread.sleep(300L);
}
catch (InterruptedException e) {}
sub1.unsubscribe();
sub2.unsubscribe();
Subscription sub3 = subscribePrint(refCount, "Third");
try {
  Thread.sleep(300L);
}
catch (InterruptedException e) { }
sub3.unsubscribe();

sub2取消订阅Observable实例将被停用。如果在此之后有人订阅,它将开始从头开始发射序列。这就是sub3所发生的事情。有一个share()方法,它是publish().refCount()调用的别名。

上例的源代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/UsingConnectableObservables.java

还有另一种方法可以创建热可观察对象:使用Subject实例。我们将在本章的下一节和最后一节介绍它们。

Subject实例同时为Observable实例和Observer实例。与Observable实例一样,它们可以有多个Observer实例,接收相同的通知。这就是为什么它们可以用来将冷的Observable实例转化为热的实例。像Observer实例一样,它们让我们可以访问它们的onNext()onError()onCompleted()方法。

让我们看看前面热间隔示例的实现,使用Subject实例:

Observable<Long> interval = Observable.interval(100L, TimeUnit.MILLISECONDS); // (1)
Subject<Long, Long> publishSubject = PublishSubject.create(); // (2)
interval.subscribe(publishSubject);
// (3)
Subscription sub1 = subscribePrint(publishSubject, "First");
Subscription sub2 = subscribePrint(publishSubject, "Second");
Subscription sub3 = null;
try {
  Thread.sleep(300L);
  publishSubject.onNext(555L); // (4)
  sub3 = subscribePrint(publishSubject, "Third"); // (5)
  Thread.sleep(500L);
}
catch (InterruptedException e) {}
sub1.unsubscribe(); // (6)
sub2.unsubscribe();
sub3.unsubscribe();

现在的例子略有不同:

  1. 间隔Observable实例的创建方式与前面相同。

  2. Here, we create a PublishSubject instance—a Subject instance that emits to an Observer instance only those items that are emitted by the source Observable instance subsequent to the time of the subscription. This behavior is similar to that of the ConnectableObservable instance created by the publish() method. The new Subject instance is subscribed to the interval Observable instance , created by the interval factory method, which is possible because the Subject class implements the Observer interface. Also, note that the Subject signature has two generic types—one for the type of notifications the Subject instance will receive and another for the type of the notifications it will emit. The PublishSubject class has the same type for its input and output notifications.

    请注意,可以在不订阅源Observable实例的情况下创建PublishSubject实例。它将只发出传递给其onNext()onError()方法的通知,并在调用其onCompleted()方法时完成。

  3. 我们可以订阅Subject实例;这毕竟是一个Observable实例。

  4. 我们可以随时发出自定义通知。它将广播给主题的所有订户。我们甚至可以调用onCompleted()方法并关闭通知流。

  5. 第三个订阅者将仅在订阅后接收发出的通知。

  6. 全部取消订阅时,Subject实例将继续发射。

此示例的源代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/SubjectsDemonstration.java

RxJava 附带了四种类型的主题:

  • PublishSubject:这是我们在上一个示例中看到的,其行为类似于ConnectableObservable,使用publish()方法创建。
  • ReplaySubject:向任何观察者发送源Observable实例发出的所有项目,而不管观察者何时订阅。因此,它的行为类似于使用replay()方法创建的ConnectableObservableReplaySubject类有许多工厂方法。默认情况下,缓存所有内容;记住这一点,因为它会消耗内存。有工厂方法用于创建具有大小限制和/或时间限制的缓冲区。与PublishSubject类一样,这个类可以在没有源Observable实例的情况下使用。使用其onNext()onError()onCompleted()方法发出的所有通知都将发送给每个订阅者,即使在调用on*方法后订阅者已订阅。
  • BehaviorSubject:当观测者订阅它时,它会发出源Observable实例最近发出的项(如果还没有发出,则发出种子/默认值),然后继续发出源Observable实例以后发出的任何其他项。BehaviorSubject类几乎与ReplaySubjects类相似,缓冲区大小为 1。BehaviorSubject类可用于实现有状态的反应性实例—反应性属性。同样,不需要源Observable实例。
  • AsyncSubject:此发出源Observable实例发出的最后一个值(且仅发出该值),且仅在源Observable实例完成后发出。如果源Observable实例没有发出任何值,AsyncSubject实例也会在不发出任何值的情况下完成。在 RxJava 的世界里,这有点像一个承诺。不需要源Observable实例;可以通过调用on*方法将值、错误或OnCompleted通知传递给它。

使用主题似乎是解决各种问题的一种很酷的方法,但你应该避免使用它们。或者,至少在返回类型为Observable的结果的方法中实现它们及其行为。

Subject实例的危险在于,它们允许访问onNext()onError()onCompleted()方法,您的逻辑可能会变得混乱(需要在本章前面引用的 Rx 合同之后调用它们)。它们很容易被滥用。

选择使用实例(即通过publish()方法)而不是Subject,当您需要从冷实例创建热的可观察对象时。

但让我们看看Subject实例的一个很好的用途,即前面提到的反应性质。同样,我们将实施‘无功和】,但这一次将完全不同。下面是定义它的类:

public class ReactiveSum { // (1)
  private BehaviorSubject<Double> a = BehaviorSubject.create(0.0);
 private BehaviorSubject<Double> b = BehaviorSubject.create(0.0);
 private BehaviorSubject<Double> c = BehaviorSubject.create(0.0);
  public ReactiveSum() { // (2)
    Observable.combineLatest(a, b, (x, y) -> x + y).subscribe(c);
  }
  public double getA() { // (3)
    return a.getValue();
  }
  public void setA(double a) {
    this.a.onNext(a);
  }
  public double getB() {
    return b.getValue();
  }
  public void setB(double b) {
    this.b.onNext(b);
  }
  public double getC() { // (4)
    return c.getValue();
  }
  public Observable<Double> obsC() {
    return c.asObservable();
  }
}

这个类有三个双重属性:两个可设置属性ab,以及它们的c。当ab发生变化时,c自动更新为其总和。有一种特殊的方法可以用来跟踪c的变化。那么它是如何工作的呢?

  1. ReactiveSum是一个普通的 Java 类,定义了三个BehaviorSubject<Double>类型的私有字段,分别代表变量abc,默认值为零。

  2. 在构造函数中,我们使用combineLatest()方法订阅c以依赖ab并等于它们的和。

  3. 属性ab具有 getter 和 setter。getter 返回其当前值和上次接收的值。setter将传递的值发送到其Subject实例,使其成为最后一个实例。

    使用 BehaviorSubject参数的 getValue()方法进行检索。它在 rxjava1.0.5 上提供。

  4. 属性c是只读的,因此它只有一个 getter,但它可以被监听。这可以通过obsC()方法完成,该方法将其作为Observable实例返回。请记住,当您使用主题时,总是将它们封装在类型或方法中,并将可观察对象返回给外部世界。

这个ReactiveSum类可以这样使用:

ReactiveSum sum = new ReactiveSum();
subscribePrint(sum.obsC(), "Sum");
sum.setA(5);
sum.setB(4);

这将输出以下内容:

Sum : 0.0
Sum : 5.0
Sum : 9.0

第一个值是在subscribe``()方法上发出的(记住BehaviorSubject实例在订阅时总是发出最后一个值),另外两个值在设置ab时会自动发出

上例的源代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/ReactiveSumV3.java

反应性属性可以用于实现绑定和计数器,因此它们对于桌面或浏览器应用非常有用。但这个例子与任何功能范式都相去甚远。

在本章中,我们学习了许多创建不同类型的Observable实例和其他相关实例的方法(ObserverSubscriberSubscriptionSubject。我们一直在从计时器、值、集合和外部源(如文件)创建它们。使用这些知识作为基础,我们可以通过将操作链接到逻辑,开始构建逻辑。这里介绍的许多工厂方法我们将在下一章中介绍。例如,我们将使用Observable.create方法构建不同的行为。

在下一章中,我们将介绍各种运算符,这将使我们能够使用Observable实例编写真正的逻辑。我们已经提到了其中一些,例如map()filter(),但现在是深入研究它们的时候了。

教程来源于Github,感谢apachecn大佬的无私奉献,致敬!

技术教程推荐

深入浅出区块链 -〔陈浩〕

技术管理实战36讲 -〔刘建国〕

深入浅出计算机组成原理 -〔徐文浩〕

后端技术面试 38 讲 -〔李智慧〕

React Hooks 核心原理与实战 -〔王沛〕

商业思维案例笔记 -〔曹雄峰〕

Serverless进阶实战课 -〔静远〕

结构执行力 -〔李忠秋〕

LangChain 实战课 -〔黄佳〕