如今,术语反应式编程正在成为一种趋势。各种编程语言的库和框架正在涌现。关于反应式编程的博客文章、文章和演示正在创建中。Facebook、SoundCloud、Microsoft 和 Netflix 等大公司都在支持和使用这一概念。因此,作为程序员,我们开始对此感到好奇。为什么人们对反应式编程如此兴奋?反应性是什么意思?这对我们的项目有帮助吗?我们应该学习如何使用它吗?
同时,Java 以其多线程、速度、可靠性和良好的可移植性而广受欢迎。它用于构建各种各样的应用,从搜索引擎到数据库,再到运行在服务器集群上的复杂 web 应用。但是 Java 的名声也不好,仅使用内置工具很难编写并发和简单的应用,用 Java 编程需要编写大量样板代码。此外,如果您需要异步(例如使用 futures),您可以很容易地进入“回调地狱”,这实际上适用于所有编程语言。
换句话说,Java 是强大的,您可以用它创建伟大的应用,但这并不容易。好消息是,有一种方法可以改变这一点,即使用反应式编程风格。
本书将呈现RxJava(https://github.com/ReactiveX/RxJava ),反应式编程范式的开源 Java 实现。使用 RxJava 编写代码需要一种不同的思维方式,但它将使您能够使用结构良好的简单代码片段创建复杂的逻辑。
在本章中,我们将介绍:
反应式编程是一种以传播变化为中心的范式。换句话说,如果一个程序将修改其数据的所有更改传播给所有相关方(用户、其他程序、组件和子部分),那么这个程序可以称为反应性。
Microsoft Excel 就是一个简单的例子。如果您在单元格 A1 中设置了一个数字,在单元格“B1”中设置了另一个数字,并将单元格“C1”设置为SUM(A1, B1)
;每当“A1”或“B1”发生变化时,“C1”将更新为它们的总和。
让我们称之为的**无功和**。
分配一个简单变量c等于a和b变量之和与反应和方法有什么区别?
在普通 Java 程序中,当我们更改“a”或“b”时,我们必须自己更新“c”。换句话说,由“a”和“b”表示的数据流中的变化不会传播到“c”。下面通过源代码对此进行了说明:
int a = 4;
int b = 5;
int c = a + b;
System.out.println(c); // 9
a = 6;
System.out.println(c);
// 9 again, but if 'c' was tracking the changes of 'a' and 'b',
// it would've been 6 + 5 = 11
下载示例代码
您可以下载您在账户购买的所有 Packt 书籍的示例代码文件 http://www.packtpub.com 。如果您在其他地方购买了本书,您可以访问http://www.packtpub.com/support 并注册,将文件直接通过电子邮件发送给您。
这是一个非常简单的解释什么是“被动”的意思。当然,这个想法有多种实现方式,这些实现方式必须解决各种问题。
对于我们来说,回答这个问题最简单的方法就是思考一下我们现在在构建应用时的需求。
虽然 10-15 年前网站进行维护或响应速度慢是正常的,但今天所有东西都应该全天候在线,并以闪电般的速度响应;如果速度慢或慢,用户会更喜欢其他服务。今天,慢意味着无法使用或损坏。我们正在处理大量数据,这些数据需要我们快速服务和处理。
HTTP 失败在最近的过去并不罕见,但现在,我们必须具备容错能力,为用户提供可读且合理的消息更新。
在过去,我们编写简单的桌面应用,但今天我们编写 web 应用,它应该快速响应。在大多数情况下,这些应用与大量远程服务通信。
如果我们希望我们的软件具有竞争力,这些是我们必须满足的新要求。换句话说,我们必须:
让我们思考一下如何实现这一点:
如果应用是事件驱动的,则可以将其解耦为多个独立组件。这有助于我们变得更具可伸缩性,因为我们始终可以添加新组件或删除旧组件,而不会停止或破坏系统。如果将错误和故障传递给正确的组件(该组件可以将其作为通知处理),则应用可以变得更具容错性或弹性。因此,如果我们将系统构建为事件驱动,我们可以更轻松地实现可伸缩性和容错性,并且一个可伸缩、解耦和防错的应用能够快速响应用户。
反应性宣言(http://www.reactivemanifesto.org/ 是定义我们前面提到的四个反应性原则的文件。每个反应式系统应为消息驱动(事件驱动)。通过这种方式,它可以变得松散耦合,因此具有可伸缩性和弹性(容错),这意味着它是可靠和响应性的(参见上图)。
请注意,《反应式宣言》描述的是反应式系统,与我们对反应式编程的定义不同。您可以构建一个消息驱动、有弹性、可扩展且响应迅速的应用,而无需使用反应式库或语言。
应用数据中的更改可以用通知建模,通知可以传播到正确的处理程序。因此,使用反应式编程编写应用是遵守宣言的最简单方法。
要编写反应性程序,我们需要一个库或一种特定的编程语言,因为自己构建类似的东西是一项相当困难的任务。Java 并不是一种真正的反应式编程语言(它提供了一些工具,比如java.util.Observable
类,但它们非常有限)。它是一种静态类型的、面向对象的语言,我们编写了许多样板代码来完成简单的事情(例如 POJO)。但是我们可以使用 Java 中的反应库。在本书中,我们将使用 RxJava(由 Java 开源社区的人开发,由 Netflix 指导)。
您可以从 Github(下载并构建 RxJavahttps://github.com/ReactiveX/RxJava 。它不需要依赖项,并且支持 Java8Lambdas。由其 Javadoc 和 GitHub wiki 页面提供的文档结构良好,其中一些是最好的。以下是如何签出项目并运行生成:
$ git clone git@github.com:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build
当然,您也可以下载预构建的 JAR。对于本书,我们将使用 1.0.8 版。
如果使用 Maven,可以将 RxJava 作为依赖项添加到pom.xml
文件中:
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.8</version>
</dependency>
或者,对于 Apache Ivy,将此代码段放在 Ivy 文件的依赖项中:
<dependency org="io.reactivex" name="rxjava" rev="1.0.8" />
如果改用 Gradle,请按如下方式更新build.gradle
文件的依赖项:
dependencies {
...
compile 'io.reactivex:rxjava:1.0.8'
...
}
本书附带的代码示例和程序可以用 Gradle 构建和测试。可从该 Github 存储库下载:https://github.com/meddle0x53/learning-rxjava 。
现在,让我们来看看 RxJava 的全部内容。我们将从一些众所周知的事情开始,并逐渐了解图书馆的秘密。
作为一名 Java 程序员,您很可能听说过或使用过Iterator
模式。想法很简单:Iterator
实例用于遍历容器(收集/数据源/生成器),在需要时逐个拉动容器的元素,直到到达容器的末端。下面是一个在 Java 中如何使用它的小示例:
List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); // (1)
Iterator<String> iterator = list.iterator(); // (2)
while(iterator.hasNext()) { // 3
// Prints elements (4)
System.out.println(iterator.next());
}
每个java.util.Collection
对象都是Iterable
实例,这意味着它有iterator()
方法。此方法创建一个Iterator
实例,其源为集合。让我们看看前面的代码是做什么的:
List
实例。iterator()
方法从这个List
实例创建一个Iterator
实例。Iterator
接口有两种重要方式:hasNext()
和next()
。hasNext()
方法是用来检查Iterator
实例是否有更多的元素需要遍历。在这里,我们还没有开始遍历元素,因此它将返回True
。当我们通过五个字符串时,它将返回False
,程序将在while
循环后继续。Iterator
实例上的next()
方法时,它将按照元素插入集合的顺序返回元素。所以字符串将被打印出来。在本例中,我们的程序使用Iterator
实例使用List
实例中的项目。它提取数据(这里用字符串表示)和当前线程块,直到请求的数据准备就绪并被接收。因此,例如,如果Iterator
实例在每次next()
方法调用时向 web 服务器发出请求,则在等待每个响应到达时,程序的主线程将被阻塞。
RxJava 的构建块是可观察的。Observable
类(注意,这不是 JDK 附带的java.util.Observable
类)是Iterator
类的数学对偶,这基本上意味着它们就像同一枚硬币的两面。它有一个基础的集合或计算,产生消费者可以使用的值。但区别在于,消费者不会像Iterator
模式那样从生产者那里“拉”出这些值。恰恰相反,;制作人将值作为通知推送到消费者。
**下面是使用Observable
实例编写的同一程序的示例:
List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); // (1)
Observable<String> observable = Observable.from(list); // (2)
observable.subscribe(new Action1<String>() { // (3)
@Override
public void call(String element) {
System.out.println(element); // Prints the element (4)
}
});
以下是代码中发生的情况:
from(Iterable<? extends T> iterable)
方法从列表中创建一个Observable
实例。此方法用于创建Observable
实例,将Iterable
实例(本例中的列表)中的所有值逐一同步发送给其订户(消费者)。我们将在第 3 章创建并连接观察者、观察者和受试者中,逐一了解这些值是如何发送给订阅者的。Observable
实例。通过订阅,我们告诉 RxJava 我们对这个Observable
实例感兴趣,并希望从它接收通知。我们通过定义一个方法-call(T)
,使用实现Action1
接口的匿名类进行订阅。Observable
实例每次有值时都会调用此方法,准备推送。总是创建新的Action1
实例可能看起来太冗长,但 Java8 解决了这个冗长的问题。我们将在第 2 章中使用 Java8的函数构造了解更多信息。call()
方法,并将其打印出来。RxJavaObservable
类的实例的行为有点像异步迭代器,它通知订阅者/消费者自己有下一个值。事实上,Observable
类为经典的Observer
模式(用 Java 实现,参见java.util.Observable
,参见Design Patterns: Elements of Reusable Object-Oriented Software
由四人帮实现)添加了两个Iterable
类型中可用的东西。
hasNext()
方法,而是可以附加一个订阅者来监听“OnCompleted
通知。Observable
实例,而不是尝试捕获错误。这些侦听器可以使用subscribe(Action1<? super T>, Action1 <Throwable>, Action0)
方法连接。让我们通过添加错误扩展Observable
实例示例,并完成侦听器:
List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five");
Observable<String> observable = Observable.from(list);
observable.subscribe(new Action1<String>() {
@Override
public void call(String element) {
System.out.println(element);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable t) {
System.err.println(t); // (1)
}
},
new Action0() {
@Override
public void call() {
System.out.println("We've finnished!"); // (2)
}
});
这里的新事物是:
Observable
实例将通过此侦听器的call(Throwable)
方法发送此错误。这类似于Iterator
实例中的 try-catch 块。Observable
实例将调用此call()
方法。这类似于使用hasNext()
方法来查看对Iterable
实例的遍历是否已完成并打印“We's finished!”。此示例可在 GitHub 上获得,并可在上查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter01/ObservableVSIterator.java 。
我们看到了如何使用Observable
实例,并且它们与我们熟悉的Iterator
实例没有太大区别。这些Observable
实例可用于构建异步流并将数据更新推送到其订户(它们可以有多个订户)。这是反应式编程范例的一个实现。数据正在传播到所有感兴趣的方(即订阅者)。
使用这种流进行编码是一种更具功能性的反应式编程实现。当然,它有正式的定义和复杂的术语,但这是最简单的解释。
订阅活动应该熟悉;例如,单击 GUI 应用中的按钮将触发一个事件,该事件将传播到订阅服务器处理程序。但是,使用 RxJava,我们可以从任何文件输入、套接字、响应、变量、缓存、用户输入等创建数据流。除此之外,还可以通知消费者流已关闭,或者出现错误。因此,通过使用这些流,我们的应用可以对故障做出反应。
总而言之,流是一系列正在进行的消息/事件,在实时处理时按顺序排列。它可以被视为一个随时间变化的值,订阅者(消费者)可以根据它观察到这些变化。因此,回到 Excel 中的示例,我们已经用“反应变量”或 RxJava 的Observable
实例有效地替换了传统变量。 ## 实现无功和
现在我们已经熟悉了Observable
类以及如何使用它以反应式方式编码的思想,我们准备实现本章开头提到的反应式和。
让我们看看我们的计划必须满足的要求:
exit
。a:<number>
,则a收集器将更新为<编号>。b:<number>
,则b收集器将更新为<编号>。源代码包含我们将在接下来的四章中详细讨论的功能。
第一段代码表示程序的主体:
ConnectableObservable<String> input = from(System.in); // (1)
Observable<Double> a = varStream("a", input); (2)
Observable<Double> b = varStream("b", input);
ReactiveSum sum = new ReactiveSum(a, b); (3)
input.connect(); (4)
这里发生了很多新的事情:
Observable
实例,表示标准输入流(System.in
。因此,我们使用from(InputStream)
方法(实现将在下一个代码段中介绍)从System.in
创建一个ConnectableObservable
变量。ConnectableObservable
变量是一个Observable
实例,只有在调用其connect()
方法后,才开始发出来自其源的事件。在第 3 章创建并连接观察者、观察者和受试者中阅读更多关于它的内容。varStream(String, Observable)
方法创建了两个Observable
实例,分别表示a
和b
值,我们将在后面进行研究。这些值的源流是输入流。a
和b
值创建一个ReactiveSum
实例。此代码负责在程序中构建依赖项并启动它。a
和b
值取决于用户输入,其总和取决于用户输入。
现在我们来看一下from(InputStream)
方法的实现,它创建了一个Observable
实例,源代码为java.io.InputStream
:
static ConnectableObservable<String> from(final InputStream stream) {
return from(new BufferedReader(new InputStreamReader(stream)));// (1)
}
static ConnectableObservable<String> from(final BufferedReader reader) {
return Observable.create(new OnSubscribe<String>() { // (2)
@Override
public void call(Subscriber<? super String> subscriber) {
if (subscriber.isUnsubscribed()) { // (3)
return;
}
try {
String line;
while(!subscriber.isUnsubscribed() &&
(line = reader.readLine()) != null) { // (4)
if (line == null || line.equals("exit")) { // (5)
break;
}
subscriber.onNext(line); // (6)
}
}
catch (IOException e) { // (7)
subscriber.onError(e);
}
if (!subscriber.isUnsubscribed()) // (8)
subscriber.onCompleted();
}
}
}).publish(); // (9)
}
这是一段复杂的代码,让我们一步一步地看:
InputStream
参数转换为BufferedReader
对象并调用from(BufferedReader)
方法。我们这样做是因为我们将使用字符串作为数据,并且使用Reader
实例更容易。Observable.create(OnSubscribe)
方法创建的Observable
实例。这种方法是我们在本书中最常用的方法。用于创建具有自定义行为的Observable
实例。传递给它的rx.Observable.OnSubscribe
接口有一个方法call(Subscriber)
。此方法用于实现Observable
实例的行为,因为传递给它的Subscriber
实例可用于向Observable
实例的订户发送消息。订户是Observable
实例的客户端,该实例使用其通知。请阅读第 3 章中的更多内容,创建并连接观察者、观察者和受试者。Observable
实例,则不应执行任何操作。exit
并点击输入,主循环停止。onNext(T)
方法作为通知传递给Observable
实例的订户。通过这种方式,我们将所有信息传递给相关方。他们的工作是过滤和转换原始消息。onError(Throwable)
方法向订户发出OnError
通知。Observable
实例,则使用onCompleted()
方法向用户发送OnCompleted
通知。publish()
方法,我们将新的Observable
实例转换为ConnectableObservable
实例。我们必须这样做,否则,对于这个Observable
实例的每个订阅,我们的逻辑将从一开始就执行。在我们的例子中,我们只想执行一次,所有订阅者都会收到相同的通知;这可以通过使用ConnectableObservable
实例来实现。在第 3 章创建并连接观察者、观察者和受试者中了解更多相关信息。这说明了一种将 Java 的 IO 流转换为Observable
实例的简化方法。当然,在这个主循环中,程序的主线程将阻塞等待用户输入。这可以通过使用正确的Scheduler
实例将逻辑移动到另一个线程来防止。我们将在第 6 章中使用与调度器的并发性和并行性重新讨论这个主题。
现在,用户输入到终端中的每一行都通过该方法创建的ConnectableObservable
实例作为通知进行传播。现在是时候来看看我们如何将我们的值Observable
实例(表示总和的收集器)连接到这个输入Observable
实例。下面是varStream(String, Observable)
方法的实现,它获取一个值和源Observable
实例的名称,并返回一个表示该值的Observable
实例:
public static Observable<Double> varStream(final String varName, Observable<String> input) {
final Pattern pattern = Pattern.compile("\\^s*" + varName + "\\s*[:|=]\\s*(-?\\d+\\.?\\d*)$"); // (1)
return input
.map(new Func1<String, Matcher>() {
public Matcher call(String str) {
return pattern.matcher(str); // (2)
}
})
.filter(new Func1<Matcher, Boolean>() {
public Boolean call(Matcher matcher) {
return matcher.matches() && matcher.group(1) != null; // (3)
}
})
.map(new Func1<Matcher, Double>() {
public Double call(Matcher matcher) {
return Double.parseDouble(matcher.group(1)); // (4)
}
});
}
这里对Observable
实例调用的map()
和filter()
方法是 RxJava 提供的 fluent API 的一部分。可以在Observable
实例上调用它们,创建一个新的Observable
实例,该实例依赖于这些方法并转换或过滤传入数据。正确地使用这些方法,您可以通过一系列步骤来表达复杂的逻辑,从而实现您的目标。在第 4 章转换、过滤和积累您的数据中了解更多信息。让我们分析一下代码:
<var_name>: <value>
或<var_name> = <value>
的消息感兴趣,因此我们将使用此正则表达式仅过滤和处理这类消息。记住,我们的输入Observable
实例发送用户写入的每一行;我们的工作是以正确的方式处理它。Matcher
实例。Double
数值。这就是a
和b
的值是如何用随时间变化的双值流表示的。现在我们可以实现它们的总和。我们将其实现为一个实现Observer
接口的类,因为我想向您展示使用Observer
接口订阅Observable
实例的另一种方式。代码如下:
public static final class ReactiveSum implements Observer<Double> { // (1)
private double sum;
public ReactiveSum(Observable<Double> a, Observable<Double> b) {
this.sum = 0;
Observable.combineLatest(a, b, new Func2<Double, Double, Double>() { // (5)
public Double call(Double a, Double b) {
return a + b;
}
}).subscribe(this); // (6)
}
public void onCompleted() {
System.out.println("Exiting last sum was : " + this.sum); // (4)
}
public void onError(Throwable e) {
System.err.println("Got an error!"); // (3)
e.printStackTrace();
}
public void onNext(Double sum) {
this.sum = sum;
System.out.println("update : a + b = " + sum); // (2)
}
}
这是实际总和的实现,取决于表示其收集器的两个Observable
实例:
Observer
接口。Observer
实例可以传递给Observable
实例的subscribe(Observer)
方法,并定义了三种以三种通知类型命名的方法:onNext(T)
、onError(Throwable)
、onCompleted
。在第 3 章创建并连接观察者、观察者和受试者中了解更多有关此界面的信息。onNext(Double)
方法实现中,我们将总和设置为传入值,并打印标准输出的更新。combineLatest(Observable, Observable, Func2)
方法实现求和。此方法创建一个新的Observable
实例。当传递给 CombineTest 的两个Observable
实例中的任何一个收到更新时,新的Observable
实例将被更新。通过新的Observable
实例发出的值由第三个参数函数计算,该函数可以访问两个源序列的最新值。在我们的例子中,我们总结了这些值。在传递给该方法的两个Observable
实例发出至少一个值之前,不会有任何通知。因此,只有在a
和b
都有通知的情况下,我们才有总数。请阅读第 5 章、组合符、条件和错误处理中有关此方法和其他组合器的更多信息。Observer
实例订阅到组合的Observable
实例。下面是此示例输出的示例:
Reacitve Sum. Type 'a: <number>' and 'b: <number>' to try it.
a:4
b:5
update : a + b = 9.0
a:6
update : a + b = 11.0
就是这样!我们使用数据流实现了无功和。
这个示例的源代码可以从这里下载并试用:https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter01/ReactiveSumV1.java 。 # 总结
在本章中,我们介绍了反应式原则以及我们应该学习和使用它们的原因。构建一个反应式应用并不难;它只需要在一些声明性的步骤中构造程序。使用 RxJava,可以通过构建多个以正确方式连接的异步流来实现这一点,并通过其使用者一路转换数据。
本章中介绍的两个示例乍看起来可能有点复杂和混乱,但实际上,它们非常简单。其中有很多新的东西,但所有的东西都将在下面的章节中详细解释。
如果您想了解更多关于反应式编程的信息,请查看 Netflix API 中使用 RxJava 的反应式编程,这是一篇关于该主题的优秀文章,可在上找到 http://techblog.netflix.com/2013/02/rxjava-netflix-api.html 。另一篇介绍这一概念的文章可以在这里找到:https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 。
这些是关于反应式编程和 RX 的幻灯片,作者 Ben Christensen,RxJava 的创建者之一:https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014 。
在下一章中,我们将讨论函数式编程的一些概念及其在 Java8 中的实现。这将为我们提供本章其余部分所需的基本思想,并将帮助我们在编写反应式程序时摆脱 Java 冗长。**