Java8 测试 RxJava 应用详解

在编写软件时,尤其是编写将被许多用户使用的软件时,我们需要确保一切都正常工作。我们可以编写可读、结构良好、模块化的代码,这将使更改和维护更容易。我们应该编写测试,因为每个特性都有回归的危险。当我们已经对现有代码进行了测试时,重构就不会那么困难了,因为测试可以针对新的、更改过的代码运行。

几乎所有的东西都需要测试和自动化。甚至还有测试驱动开发TDD)和行为驱动开发BDD等意识形态。如果我们不编写自动化测试,我们不断变化的代码往往会随着时间的推移而中断,测试和维护变得更加困难。

在本章中,我们将不讨论为什么需要测试代码。我们会接受这是强制性的,是我们作为程序员生活的一部分。我们将学习如何测试使用 RxJava 编写的代码。

我们将看到,为它编写单元测试并不是那么困难,但是有一些难以测试的案例,例如异步Observable实例。我们将学习一些新的操作符,这将帮助我们进行测试和一种新的Observable实例。

话虽如此,我们将在本章中介绍以下内容:

我们可以通过订阅Observable实例并收集所有传入通知来测试我们得到了什么。为了证明这一点,我们将开发一个factory方法来创建一个新的Observable实例,并测试其行为。

该方法将接收一个Comparator实例和多个项,并返回Observable实例,将这些项按排序顺序发出。项目将根据传递的Comparator实例进行排序。

我们可以使用 TDD 开发该方法。让我们首先定义测试,如下所示:

public class SortedObservableTest {
  private Observable<String> tested;
  private List<String> expected;
  @Before
  public void before() {
    tested = CreateObservable.<String>sorted(
 (a, b) -> a.compareTo(b),
 "Star", "Bar", "Car", "War", "Far", "Jar");
    expected = Arrays.asList(
      "Bar", "Car", "Far", "Jar", "Star", "War"
    );
  }
  TestData data = new TestData();
  tested.subscribe(
    (v) -> data.getResult().add(v),
    (e) -> data.setError(e),
    () -> data.setCompleted(true)
  );
  Assert.assertTrue(data.isCompleted());
  Assert.assertNull(data.getError());
  Assert.assertEquals(expected, data.getResult());
}

本章的示例使用JUnit框架进行测试。您可以在找到更多关于此的信息 http://junit.org

测试使用两个变量来存储预定义的可重用状态。第一个是我们用作测试源的Observable实例。在设置@Before方法中,分配给我们方法CreateObservable.sorted(Comparator, T...)的结果,该结果尚未实现。我们比较一组String实例,并希望它们按照存储在预期变量第二个可重用字段中的顺序接收。

测试本身非常冗长。它使用TestData类的一个实例来存储从测试的Observable实例传入的通知。

如果有OnCompleted通知,data.completed字段设置为True。我们期望这种情况发生,这就是为什么我们在测试方法结束时断言它。如果有OnError通知,data.error字段设置为错误。我们不期望这种情况发生,所以我们断言它是null

Observable实例发出的每个传入项都被添加到data.resultList字段。最后,它应该等于预期的List变量,我们断言。

上述测试的源代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter07/SortedObservableTest.java -这是第一种测试方法。

但是,这个测试当然失败了,因为CreateObservable.sorted(Comparator, T...)方法还没有实现。让我们实现它并再次运行测试:

@SafeVarargs
public static <T> Observable<T> sorted(
  Comparator<? super T> comparator,
  T... data) {
    List<T> listData = Arrays.asList(data);
    listData.sort(comparator);
  return Observable.from(listData);
}

就这么简单!它只是将传递的varargs数组转换为List变量,并使用其sort()方法将其与传递的Comparator实例进行排序。然后,使用Observable.from(Iterable)方法返回所需的Observable实例。

上述实现的源代码位于:https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/common/CreateObservable.java#L262

如果我们现在运行测试,它将通过。这很好!我们进行了第一次测试!但是编写类似的测试需要大量的样板代码。我们总是需要这三个状态变量,我们总是需要断言相同的东西。那么异步Observable实例呢,比如interval()timer()方法创建的实例?

有一些删除样板变量的技术,稍后,我们将研究如何测试异步行为。现在,我们将介绍一种新类型的可观测。

每个Observable实例都可以通过toBlocking()方法转换成BlockingObservable实例。BlockingObservable实例有多个方法阻塞当前线程,而Observable实例在发送OnCompletedOnError通知之前都会发出所有消息。如果有OnError通知,则会抛出异常(直接抛出RuntimeException异常,并将选中的异常包装在RuntimeException实例中)。

toBlocking()方法本身不阻塞,但返回的BlockingObservable实例的方法可能会阻塞。让我们看看其中的一些方法:

  • 我们可以使用forEach()方法迭代BlockingObservable实例中的所有项。下面是使用此选项的示例:

    Observable
      .interval(100L, TimeUnit.MILLISECONDS)
      .take(5)
      .toBlocking()
     .forEach(System.out::println);
    System.out.println("END");

这也是如何使异步代码同步行为的一个示例。由interval()方法创建的Observable实例将不在后台执行,因为toBlocking()方法使当前线程等待它完成。这就是为什么我们在这里使用take(int)方法,因为,否则线程将永远被阻塞。forEach()方法将使用传递的函数打印五项,然后我们才能看到END输出。BlockingObservable类也有toIterable()方法。它返回的Iterable实例也可以用于迭代源发出的序列。

  • 有类似于异步阻塞方法,如first()last()firstOrDefault()lastOrDefault()方法(我们在第 4 章转换、过滤和积累您的数据中讨论过)。它们都在等待所需项目时阻塞。让我们来看看下面的代码片段:

这将打印'3''15'

  • 一个有趣的方法是single()方法;只有当发出正好一项且源完成时,才返回一项。如果没有发出任何项,或者发出多个项,则分别引发NoSuchElementException异常或IllegalArgumentException异常。

  • 有一个next()方法不阻塞,而是返回一个Iterable实例。当从该Iterable实例中检索到Iterator实例时,其每个next()方法将阻塞,同时等待下一个传入项。这可以在无限Observable实例上使用,因为当前线程只有在等待下一个项时才会阻塞,然后才能继续。(请注意,如果在时间内没有人调用next()方法,则可能会跳过源元素)。下面是使用此选项的示例:

    Iterable<Long> next = Observable
      .interval(100L, TimeUnit.MILLISECONDS)
      .toBlocking()
     .next();
    Iterator<Long> iterator = next.iterator();
    System.out.println(iterator.next());
    System.out.println(iterator.next());
    System.out.println(iterator.next());

当前线程阻塞三次,持续 100 毫秒,012将在每次暂停后打印。有一个类似的方法叫做latest(),它返回一个Iterable实例。行为是不同的,因为latest()方法生成的Iterable实例返回源发出的最后一个项目,或者等待下一个项目(如果没有)。

Iterable<Long> latest = Observable
  .interval(1000L, TimeUnit.MILLISECONDS)
  .toBlocking()
 .latest();
iterator = latest.iterator();
System.out.println(iterator.next());
Thread.sleep(5500L);
System.out.println(iterator.next());
System.out.println(iterator.next());

这将打印0,然后打印56

可以在查看/下载演示前面所有运算符以及聚合运算符的源代码 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter07/BlockingObservablesAndOperators.java

使用BlockingObservable实例可以帮助我们收集测试数据。但是有一组名为聚合运算符Observable运算符,当与BlockingObservables实例结合时,它们也很有用。

聚合操作符生成Observable实例,只发出一项并完成。此项由Observable实例发出的所有项组成或计算。在本节中,我们将只讨论其中的两个。更多详细信息,请参考https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators

第一个操作符是count()countLong()方法。发出Observable实例发出的项数。例如:

Observable
  .range(10, 100)
  .count()
  .subscribe(System.out::println);

这将打印100

另一个是toList()toSortedList()方法,它发出一个list变量(可以排序),包含实例发出的所有项目,并完成。

List<Integer> list = Observable
  .range(5, 15)
  .toList()
  .subscribe(System.out::println);

这将输出以下内容:

[5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

所有这些方法,结合toBlocking()方法,都能很好地协同工作。例如,如果我们想要检索Observable实例发出的所有项的列表,我们可以这样做:

List<Integer> single = Observable
  .range(5, 15)
  .toList()
 .toBlocking().single();

我们可以使用我们想要的这个项目集合:例如用于测试。

提示

聚合操作符还包括一个collect()操作符,可以用于生成Observable实例和发出任意集合,例如Set()操作符。

使用前面两个部分中学习的操作符和方法,我们能够重新编写我们编写的测试,如下所示:

@Test
public void testUsingBlockingObservable() {
  List<String> result = tested
    .toList()
 .toBlocking()
 .single();
  Assert.assertEquals(expected, result);
}

这里没有样板代码。我们检索作为列表发出的所有项,并将它们与预期列表进行比较。

在大多数情况下,使用BlockingObsevables类和聚合运算符非常有用。在测试发出长而慢序列的异步Observable实例时,它们并没有那么有用。长时间阻塞测试用例不是一个好的做法:缓慢的测试是糟糕的测试。

前面测试的源代码可以在中找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter07/SortedObservableTest.java -这是第二种测试方法。

另一种测试方法没有帮助的情况是当我们想要检查发送的Notification对象或订阅状态时。

还有一种编写测试的技术可以让我们对订阅本身进行更细粒度的控制,这是通过一种特殊的Subscriber-TestSubscriber实现的。

TestSubscriber实例是一个特殊的Subscriber实例,可以传递给任何Observable实例的subscribe()方法。

我们可以从中检索所有收到的项目和通知。我们还可以查看最近收到通知的thread和订阅状态。

让我们使用它重新编写测试,以演示其功能和存储内容:

@Test
public void testUsingTestSubscriber() {
  TestSubscriber<String> subscriber =
 new TestSubscriber<String>();
  tested.subscribe(subscriber);
  Assert.assertEquals(expected, subscriber.getOnNextEvents());
  Assert.assertSame(1, subscriber.getOnCompletedEvents().size());
  Assert.assertTrue(subscriber.getOnErrorEvents().isEmpty());
  Assert.assertTrue(subscriber.isUnsubscribed());
}

同样,测试非常简单。我们创建一个TestSubscriber实例,用它订阅测试的Observable实例。Observable实例完成后,我们可以访问整个状态。让我们看看下面的术语列表:

  • 通过getOnNextEvents()方法,我们能够检索Observable实例发出的所有项目,并将它们与预期的List变量进行比较。
  • 通过getOnCompletedEvents()方法,我们可以检查未完成的通知,并检查其是否已发送。例如,Observable.never()方法不发送它。
  • 通过getOnErrorEvents()方法,我们可以检查错误通知是否存在。在本例中,我们断言不存在错误
  • 使用isUnsubscribed()方法,我们可以断言完成之后,我们的Subscriber实例被取消订阅

TestSubscriber实例也有一些断言方法。因此,还有一种编写测试的方法:

@Test
public void testUsingTestSubscriberAssertions() {
  TestSubscriber<String> subscriber = new TestSubscriber<String>();
  tested.subscribe(subscriber);
 subscriber.assertReceivedOnNext(expected);
 subscriber.assertTerminalEvent();
 subscriber.assertNoErrors();
 subscriber.assertUnsubscribed();
}

这些几乎都是相同的断言,但都是通过TestSubscriber实例自己的assert*方法完成的。

前面测试的源代码可在找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter07/SortedObservableTest.java -这是第三种和第四种测试方法。

通过这些技术,我们可以测试RxJava逻辑的不同行为和状态。在本章测试异步Observable实例中,还有最后一件事需要学习,例如通过Observable.interval()方法创建的实例。

还有最后一种预定义的scheduler,我们在第 6 章中没有提到,使用与调度器的并发性和并行性。这是TestScheduler调度器,一个scheduler设计用于单元测试。它上面调度的所有动作都被包装在包含它们应该在执行的时间的对象中,并且在调用Scheduler实例的triggerActions()方法之前不会执行。此方法执行所有未执行且计划在Scheduler实例当前时间或之前执行的操作。这次是虚拟的。这意味着它是由我们设定的,我们可以使用本scheduler的特殊方法前进到未来的任何时刻。

为了演示它,我们想开发另一种方法来创建一种新类型的observable。本章将不讨论方法本身的实现,但您可以在本书附带的源代码中找到它。

该方法创建一个Observable实例,以设置的时间间隔发送项目。但是间隔不是等距的,例如使用内置的interval方法。我们可以提供一个不同的多个间隔的列表,Observable实例将无限循环。该方法的签名如下:

Observable<Long> interval(List<Long> gaps, TimeUnit unit, Scheduler scheduler)

如果我们传递一个只包含一个时间段值的List变量,则其行为应该与Observable.interval方法相同。下面是对这种情况的测试:

@Test
public void testBehavesAsNormalIntervalWithOneGap() {
  TestScheduler testScheduler = Schedulers.test(); // (1)
  Observable<Long> interval = CreateObservable.interval(
 Arrays.asList(100L), TimeUnit.MILLISECONDS, testScheduler
 ); // (2)
  TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
  interval.subscribe(subscriber); // (3)
  assertTrue(subscriber.getOnNextEvents().isEmpty()); // (4)
  testScheduler.advanceTimeBy(101L, TimeUnit.MILLISECONDS); // (5)
  assertEquals(Arrays.asList(0L), subscriber.getOnNextEvents());
  testScheduler.advanceTimeBy(101L, TimeUnit.MILLISECONDS); // (6)
  assertEquals(
    Arrays.asList(0L, 1L),
    subscriber.getOnNextEvents()
  );
  testScheduler.advanceTimeTo(1L, TimeUnit.SECONDS); // (7)
  assertEquals(
    Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L),
    subscriber.getOnNextEvents()
  );
}

让我们看一下下面的解释:

  1. 我们使用Schedulers.test()方法创建TestScheduler实例。
  2. 我们的方法接收一个Scheduler实例作为其第三个参数。它将在其上发射项目,因此我们传递TestScheduler实例。
  3. 使用TestSubscriber实例,我们订阅Observable实例。
  4. 在订阅之后,我们不应该立即收到任何通知,所以我们会进行检查。
  5. TestScheduler实例有一个advanceTimeBy(long, TimeUnit)方法,它控制其Worker实例的时间,因此我们可以使用它来获得 101 毫秒的未来。101 毫秒后,我们希望收到一个项目-0
  6. 使用advanceTimeBy()方法,我们将时间提前 101 毫秒,我们应该已经收到01
  7. TestScheduler实例的另一个重要方法是advanceTimeTo(long, TimeUnit)方法。它可以用于在将来提前到特定的时间点。因此,我们使用它来了解订阅的一秒钟已经过去的时刻。到那时,我们预计已经收到十份通知。

TestScheduler实例使用其advanceTimeBy()advanceTimeTo()方法控制时间,因此我们不需要阻塞主实例Thread等待事件发生。我们可以回到已经发生的时间。对于TestScheduler实例,有一个全局事件顺序。因此,如果两个任务被安排在确切的同一时间,它们有一个执行顺序,并且可能会导致预期特定全局顺序的测试出现问题。如果我们有这样一个操作符要测试,我们应该通过计时到不同的值来避免这种情况,一个是 100 毫秒,另一个是 101 毫秒。使用这种技术,测试异步Observable实例不再是如此复杂的任务。

上述测试的源代码可在以下位置找到:https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter07/CreateObservableIntervalTest.java

在本章中,我们不仅了解了如何使用 RxJava 编写程序,还了解了如何测试程序的各个方面。我们已经了解了一些新的操作符和BlockingObservables类。

RxJava 库中有许多本书中没有提到的操作符,但我们研究了更重要和有用的操作符。您可以随时参考https://github.com/ReactiveX/RxJava/wiki 其余部分。还有更多关于订阅背压Observable实例生命周期的内容,但根据您目前的知识,掌握库中的所有内容并不难。记住,这只是一个库,一个编写代码的工具。逻辑在这里是很重要的。这种编程方式与过程编程方式有些不同,但一旦你开始使用它,就会感觉很自然。

在下一章也是最后一章中,我们将学习如何释放由订阅分配的资源,如何防止内存泄漏,以及如何创建我们自己的操作符,这些操作符可以链接到RxJava逻辑中。

教程来源于Github,感谢apachecn大佬的无私奉献,致敬!

技术教程推荐

Elasticsearch核心技术与实战 -〔阮一鸣〕

移动端自动化测试实战 -〔思寒〕

如何看懂一幅画 -〔罗桂霞〕

技术管理案例课 -〔许健〕

WebAssembly入门课 -〔于航〕

乔新亮的CTO成长复盘 -〔乔新亮〕

讲好故事 -〔涵柏〕

大数据经典论文解读 -〔徐文浩〕

林外 · 专利写作第一课 -〔林外〕