在编写软件时,尤其是编写将被许多用户使用的软件时,我们需要确保一切都正常工作。我们可以编写可读、结构良好、模块化的代码,这将使更改和维护更容易。我们应该编写测试,因为每个特性都有回归的危险。当我们已经对现有代码进行了测试时,重构就不会那么困难了,因为测试可以针对新的、更改过的代码运行。
几乎所有的东西都需要测试和自动化。甚至还有测试驱动开发(TDD)和行为驱动开发(BDD等意识形态。如果我们不编写自动化测试,我们不断变化的代码往往会随着时间的推移而中断,测试和维护变得更加困难。
在本章中,我们将不讨论为什么需要测试代码。我们会接受这是强制性的,是我们作为程序员生活的一部分。我们将学习如何测试使用 RxJava 编写的代码。
我们将看到,为它编写单元测试并不是那么困难,但是有一些难以测试的案例,例如异步Observable
实例。我们将学习一些新的操作符,这将帮助我们进行测试和一种新的Observable
实例。
话虽如此,我们将在本章中介绍以下内容:
我们可以通过订阅源Observable
实例并收集所有传入通知来测试我们得到了什么。为了证明这一点,我们将开发一个factory
方法来创建一个新的Observable
实例,并测试其行为。
该方法将接收一个Comparator
实例和多个项,并返回Observable
实例,将这些项按排序顺序发出。项目将根据传递的Comparator
实例进行排序。
我们可以使用 TDD 开发该方法。让我们首先定义测试,如下所示:
public class SortedObservableTest {
private Observable<String> tested;
private List<String> expected;
@Before
public void before() {
tested = CreateObservable.<String>sorted(
(a, b) -> a.compareTo(b),
"Star", "Bar", "Car", "War", "Far", "Jar");
expected = Arrays.asList(
"Bar", "Car", "Far", "Jar", "Star", "War"
);
}
TestData data = new TestData();
tested.subscribe(
(v) -> data.getResult().add(v),
(e) -> data.setError(e),
() -> data.setCompleted(true)
);
Assert.assertTrue(data.isCompleted());
Assert.assertNull(data.getError());
Assert.assertEquals(expected, data.getResult());
}
本章的示例使用JUnit框架进行测试。您可以在找到更多关于此的信息 http://junit.org 。
测试使用两个变量来存储预定义的可重用状态。第一个是我们用作测试源的Observable
实例。在设置@Before
方法中,分配给我们方法CreateObservable.sorted(Comparator, T...)
的结果,该结果尚未实现。我们比较一组String
实例,并希望它们按照存储在预期变量第二个可重用字段中的顺序接收。
测试本身非常冗长。它使用TestData
类的一个实例来存储从测试的Observable
实例传入的通知。
如果有OnCompleted
通知,data.completed
字段设置为True
。我们期望这种情况发生,这就是为什么我们在测试方法结束时断言它。如果有OnError
通知,data.error
字段设置为错误。我们不期望这种情况发生,所以我们断言它是null
。
Observable
实例发出的每个传入项都被添加到data.resultList
字段。最后,它应该等于预期的List
变量,我们断言。
上述测试的源代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter07/SortedObservableTest.java -这是第一种测试方法。
但是,这个测试当然失败了,因为CreateObservable.sorted(Comparator, T...)
方法还没有实现。让我们实现它并再次运行测试:
@SafeVarargs
public static <T> Observable<T> sorted(
Comparator<? super T> comparator,
T... data) {
List<T> listData = Arrays.asList(data);
listData.sort(comparator);
return Observable.from(listData);
}
就这么简单!它只是将传递的varargs
数组转换为List
变量,并使用其sort()
方法将其与传递的Comparator
实例进行排序。然后,使用Observable.from(Iterable)
方法返回所需的Observable
实例。
如果我们现在运行测试,它将通过。这很好!我们进行了第一次测试!但是编写类似的测试需要大量的样板代码。我们总是需要这三个状态变量,我们总是需要断言相同的东西。那么异步Observable
实例呢,比如interval()
和timer()
方法创建的实例?
有一些删除样板变量的技术,稍后,我们将研究如何测试异步行为。现在,我们将介绍一种新类型的可观测。
每个Observable
实例都可以通过toBlocking()
方法转换成BlockingObservable
实例。BlockingObservable
实例有多个方法阻塞当前线程,而源Observable
实例在发送OnCompleted
或OnError
通知之前都会发出所有消息。如果有OnError
通知,则会抛出异常(直接抛出RuntimeException
异常,并将选中的异常包装在RuntimeException
实例中)。
toBlocking()
方法本身不阻塞,但返回的BlockingObservable
实例的方法可能会阻塞。让我们看看其中的一些方法:
我们可以使用forEach()
方法迭代BlockingObservable
实例中的所有项。下面是使用此选项的示例:
Observable
.interval(100L, TimeUnit.MILLISECONDS)
.take(5)
.toBlocking()
.forEach(System.out::println);
System.out.println("END");
这也是如何使异步代码同步行为的一个示例。由interval()
方法创建的Observable
实例将不在后台执行,因为toBlocking()
方法使当前线程等待它完成。这就是为什么我们在这里使用take(int)
方法,因为,否则主线程将永远被阻塞。forEach()
方法将使用传递的函数打印五项,然后我们才能看到END
输出。BlockingObservable
类也有toIterable()
方法。它返回的Iterable
实例也可以用于迭代源发出的序列。
first()
、last()
、firstOrDefault()
和lastOrDefault()
方法(我们在第 4 章、转换、过滤和积累您的数据中讨论过)。它们都在等待所需项目时阻塞。让我们来看看下面的代码片段:这将打印'3'
和'15'
。
一个有趣的方法是single()
方法;只有当源发出正好一项且源完成时,才返回一项。如果没有发出任何项,或者源发出多个项,则分别引发NoSuchElementException
异常或IllegalArgumentException
异常。
有一个next()
方法不阻塞,而是返回一个Iterable
实例。当从该Iterable
实例中检索到Iterator
实例时,其每个next()
方法将阻塞,同时等待下一个传入项。这可以在无限Observable
实例上使用,因为当前线程只有在等待下一个项时才会阻塞,然后才能继续。(请注意,如果在时间内没有人调用next()
方法,则可能会跳过源元素)。下面是使用此选项的示例:
Iterable<Long> next = Observable
.interval(100L, TimeUnit.MILLISECONDS)
.toBlocking()
.next();
Iterator<Long> iterator = next.iterator();
System.out.println(iterator.next());
System.out.println(iterator.next());
System.out.println(iterator.next());
当前线程将阻塞三次,持续 100 毫秒,0
、1
和2
将在每次暂停后打印。有一个类似的方法叫做latest()
,它返回一个Iterable
实例。行为是不同的,因为latest()
方法生成的Iterable
实例返回源发出的最后一个项目,或者等待下一个项目(如果没有)。
Iterable<Long> latest = Observable
.interval(1000L, TimeUnit.MILLISECONDS)
.toBlocking()
.latest();
iterator = latest.iterator();
System.out.println(iterator.next());
Thread.sleep(5500L);
System.out.println(iterator.next());
System.out.println(iterator.next());
这将打印0
,然后打印5
和6
。
使用BlockingObservable
实例可以帮助我们收集测试数据。但是有一组名为聚合运算符的Observable
运算符,当与BlockingObservables
实例结合时,它们也很有用。
聚合操作符生成Observable
实例,只发出一项并完成。此项由源Observable
实例发出的所有项组成或计算。在本节中,我们将只讨论其中的两个。更多详细信息,请参考https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators 。
第一个操作符是count()
或countLong()
方法。发出源Observable
实例发出的项数。例如:
Observable
.range(10, 100)
.count()
.subscribe(System.out::println);
这将打印100
。
另一个是toList()
或toSortedList()
方法,它发出一个list
变量(可以排序),包含源实例发出的所有项目,并完成。
List<Integer> list = Observable
.range(5, 15)
.toList()
.subscribe(System.out::println);
这将输出以下内容:
[5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
所有这些方法,结合toBlocking()
方法,都能很好地协同工作。例如,如果我们想要检索源Observable
实例发出的所有项的列表,我们可以这样做:
List<Integer> single = Observable
.range(5, 15)
.toList()
.toBlocking().single();
我们可以使用我们想要的这个项目集合:例如用于测试。
聚合操作符还包括一个collect()
操作符,可以用于生成Observable
实例和发出任意集合,例如Set()
操作符。
使用前面两个部分中学习的操作符和方法,我们能够重新编写我们编写的测试,如下所示:
@Test
public void testUsingBlockingObservable() {
List<String> result = tested
.toList()
.toBlocking()
.single();
Assert.assertEquals(expected, result);
}
这里没有样板代码。我们检索作为列表发出的所有项,并将它们与预期列表进行比较。
在大多数情况下,使用BlockingObsevables
类和聚合运算符非常有用。在测试发出长而慢序列的异步Observable
实例时,它们并没有那么有用。长时间阻塞测试用例不是一个好的做法:缓慢的测试是糟糕的测试。
前面测试的源代码可以在中找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter07/SortedObservableTest.java -这是第二种测试方法。
另一种测试方法没有帮助的情况是当我们想要检查源发送的Notification
对象或订阅状态时。
还有一种编写测试的技术可以让我们对订阅本身进行更细粒度的控制,这是通过一种特殊的Subscriber
-TestSubscriber
实现的。
TestSubscriber
实例是一个特殊的Subscriber
实例,可以传递给任何Observable
实例的subscribe()
方法。
我们可以从中检索所有收到的项目和通知。我们还可以查看最近收到通知的thread
和订阅状态。
让我们使用它重新编写测试,以演示其功能和存储内容:
@Test
public void testUsingTestSubscriber() {
TestSubscriber<String> subscriber =
new TestSubscriber<String>();
tested.subscribe(subscriber);
Assert.assertEquals(expected, subscriber.getOnNextEvents());
Assert.assertSame(1, subscriber.getOnCompletedEvents().size());
Assert.assertTrue(subscriber.getOnErrorEvents().isEmpty());
Assert.assertTrue(subscriber.isUnsubscribed());
}
同样,测试非常简单。我们创建一个TestSubscriber
实例,用它订阅到测试的Observable
实例。Observable
实例完成后,我们可以访问整个状态。让我们看看下面的术语列表:
getOnNextEvents()
方法,我们能够检索Observable
实例发出的所有项目,并将它们与预期的List
变量进行比较。getOnCompletedEvents()
方法,我们可以检查未完成的通知,并检查其是否已发送。例如,Observable.never()
方法不发送它。getOnErrorEvents()
方法,我们可以检查或错误通知是否存在。在本例中,我们断言不存在错误。isUnsubscribed()
方法,我们可以断言在完成之后,我们的Subscriber
实例被取消订阅。TestSubscriber
实例也有一些断言方法。因此,还有一种编写测试的方法:
@Test
public void testUsingTestSubscriberAssertions() {
TestSubscriber<String> subscriber = new TestSubscriber<String>();
tested.subscribe(subscriber);
subscriber.assertReceivedOnNext(expected);
subscriber.assertTerminalEvent();
subscriber.assertNoErrors();
subscriber.assertUnsubscribed();
}
这些几乎都是相同的断言,但都是通过TestSubscriber
实例自己的assert*
方法完成的。
前面测试的源代码可在找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter07/SortedObservableTest.java -这是第三种和第四种测试方法。
通过这些技术,我们可以测试RxJava
逻辑的不同行为和状态。在本章测试异步Observable
实例中,还有最后一件事需要学习,例如通过Observable.interval()
方法创建的实例。
还有最后一种预定义的scheduler
,我们在第 6 章中没有提到,使用与调度器的并发性和并行性。这是TestScheduler
调度器,一个scheduler
设计用于单元测试。它上面调度的所有动作都被包装在包含它们应该在执行的时间的对象中,并且在调用Scheduler
实例的triggerActions()
方法之前不会执行。此方法执行所有未执行且计划在Scheduler
实例当前时间或之前执行的操作。这次是虚拟的。这意味着它是由我们设定的,我们可以使用本scheduler
的特殊方法前进到未来的任何时刻。
为了演示它,我们想开发另一种方法来创建一种新类型的observable
。本章将不讨论方法本身的实现,但您可以在本书附带的源代码中找到它。
该方法创建一个Observable
实例,以设置的时间间隔发送项目。但是间隔不是等距的,例如使用内置的interval
方法。我们可以提供一个不同的多个间隔的列表,Observable
实例将无限循环。该方法的签名如下:
Observable<Long> interval(List<Long> gaps, TimeUnit unit, Scheduler scheduler)
如果我们传递一个只包含一个时间段值的List
变量,则其行为应该与Observable.interval
方法相同。下面是对这种情况的测试:
@Test
public void testBehavesAsNormalIntervalWithOneGap() {
TestScheduler testScheduler = Schedulers.test(); // (1)
Observable<Long> interval = CreateObservable.interval(
Arrays.asList(100L), TimeUnit.MILLISECONDS, testScheduler
); // (2)
TestSubscriber<Long> subscriber = new TestSubscriber<Long>();
interval.subscribe(subscriber); // (3)
assertTrue(subscriber.getOnNextEvents().isEmpty()); // (4)
testScheduler.advanceTimeBy(101L, TimeUnit.MILLISECONDS); // (5)
assertEquals(Arrays.asList(0L), subscriber.getOnNextEvents());
testScheduler.advanceTimeBy(101L, TimeUnit.MILLISECONDS); // (6)
assertEquals(
Arrays.asList(0L, 1L),
subscriber.getOnNextEvents()
);
testScheduler.advanceTimeTo(1L, TimeUnit.SECONDS); // (7)
assertEquals(
Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L),
subscriber.getOnNextEvents()
);
}
让我们看一下下面的解释:
Schedulers.test()
方法创建TestScheduler
实例。Scheduler
实例作为其第三个参数。它将在其上发射项目,因此我们传递TestScheduler
实例。TestSubscriber
实例,我们订阅到Observable
实例。TestScheduler
实例有一个advanceTimeBy(long, TimeUnit)
方法,它控制其Worker
实例的时间,因此我们可以使用它来获得 101 毫秒的未来。101 毫秒后,我们希望收到一个项目-0
。advanceTimeBy()
方法,我们将时间提前 101 毫秒,我们应该已经收到0
和1
。TestScheduler
实例的另一个重要方法是advanceTimeTo(long, TimeUnit)
方法。它可以用于在将来提前到特定的时间点。因此,我们使用它来了解订阅的一秒钟已经过去的时刻。到那时,我们预计已经收到十份通知。TestScheduler
实例使用其advanceTimeBy()
和advanceTimeTo()
方法控制时间,因此我们不需要阻塞主实例Thread
等待事件发生。我们可以回到已经发生的时间。对于TestScheduler
实例,有一个全局事件顺序。因此,如果两个任务被安排在确切的同一时间,它们有一个执行顺序,并且可能会导致预期特定全局顺序的测试出现问题。如果我们有这样一个操作符要测试,我们应该通过计时到不同的值来避免这种情况,一个是 100 毫秒,另一个是 101 毫秒。使用这种技术,测试异步Observable
实例不再是如此复杂的任务。
上述测试的源代码可在以下位置找到:https://github.com/meddle0x53/learning-rxjava/blob/master/src/test/java/com/packtpub/reactive/chapter07/CreateObservableIntervalTest.java 。
在本章中,我们不仅了解了如何使用 RxJava 编写程序,还了解了如何测试程序的各个方面。我们已经了解了一些新的操作符和BlockingObservables
类。
RxJava 库中有许多本书中没有提到的操作符,但我们研究了更重要和有用的操作符。您可以随时参考https://github.com/ReactiveX/RxJava/wiki 其余部分。还有更多关于订阅、背压和Observable
实例生命周期的内容,但根据您目前的知识,掌握库中的所有内容并不难。记住,这只是一个库,一个编写代码的工具。逻辑在这里是很重要的。这种编程方式与过程编程方式有些不同,但一旦你开始使用它,就会感觉很自然。
在下一章也是最后一章中,我们将学习如何释放由订阅分配的资源,如何防止内存泄漏,以及如何创建我们自己的操作符,这些操作符可以链接到RxJava
逻辑中。