Java8 转换、过滤和积累您的数据详解

现在我们有了从各种源数据创建Observable实例的方法,是时候围绕这些实例构建编程逻辑了。我们将介绍用于实现逐步计算的基本反应运算符(处理数据的反应方式)。

我们将从变换开始,使用著名的flatMap()map()运算符,以及一些不太常见的变换运算符。之后,我们将学习如何使用filter()操作符过滤数据,跳过元素,只在给定的时间位置接收元素。本章还将介绍与scan操作员积累数据。大多数操作员将使用大理石图进行演示。

本章涵盖以下主题:

我们在前面的一些示例中使用了运算符。将输入值转换为其他值的高阶函数称为转换。可以在Observable实例上调用的高阶函数,从中生成新的Observable实例,称为运算符。转换操作符以某种方式转换源Observable实例发出的元素。

为了了解不同操作员的工作原理,我们将使用名为大理石图的图片。对于示例,此示例描述了map操作员:

Observable transformations

图中中心的矩形表示运算符(函数)。它将其输入(圆)转换为其他内容(三角形)。矩形上方的箭头表示源Observable实例,其上的彩色圆圈表示及时发出的OnNext通知,末端的垂直线为OnCompleted通知。矩形下方的箭头是Observable实例及其转换元素的输出。

因此,map()运算符正是这样做的:它将源中的每个“下一个值转换为通过传递给它的函数定义的其他值。下面是一个小例子:

Observable<String> mapped = Observable
  .just(2, 3, 5, 8)
  .map(v -> v * 3)
  .map(v -> (v % 2 == 0) ? "even" : "odd");
subscribePrint(mapped, "map");

第一个map()操作符将源发出的每个数字转换为自身,乘以三。第二个map()运算符将每个相乘的数字转换为字符串。如果数字为偶数,则字符串为“even”,否则为“odd”。

使用map()运算符,我们可以将每个发出的值转换为新值。还有更强大的转换运算符,它们看起来与map()运算符类似,但有自己的用法和用途。我们来看看。

使用各种 flatMap 运算符的变换

flatMap运算符与map()运算符类似,但有两个区别:

  • flatMap运算符的参数总是将值或值序列转换为Observable实例的形式,而不是接收将值转换为任意类型值的函数。
  • 它合并那些产生的Observable实例发出的值。这意味着它不会将Observable实例作为值发出,而是发出它们的通知。

这是它的大理石图:

Transformations with the various flatMap operators

我们可以看到,源Observable实例中的每个值都被转换为Observable实例,最后,这些导数观测值的所有值都被生成的Observable实例发出。注意,得到的Observable实例可能以交错方式甚至无序地发出导数Observable实例的值。

flatMap运算符对于分叉逻辑非常有用。例如,如果一个Observable实例代表一个文件系统文件夹并从中发出文件,我们可以使用flatMap操作符将每个文件对象转换为一个Observable实例,并对这些文件可观察对象应用一些操作。结果将是这些操作的摘要。以下是从文件夹中读取一些文件并将其转储到标准输出的示例:

Observable<Path> listFolder(Path dir, String glob) { // (1)
  return Observable.<Path>create(subscriber -> {
    try {
      DirectoryStream<Path> stream = Files.newDirectoryStream(dir, glob);
      subscriber.add(Subscriptions.create(() -> {
        try {
          stream.close();
        }
        catch (IOException e) {
          e.printStackTrace();
        }
      }));
      Observable.<Path>from(stream).subscribe(subscriber);
    }
    catch (DirectoryIteratorException ex) {
      subscriber.onError(ex);
    }
    catch (IOException ioe) {
      subscriber.onError(ioe);
    }
  });
}
Observable<String> from(final Path path) { // (2)
  return Observable.<String>create(subscriber -> {
    try {
      BufferedReader reader = Files.newBufferedReader(path);
      subscriber.add(Subscriptions.create(() -> {
        try {
          reader.close();
        }
        catch (IOException e) {
          e.printStackTrace();
        }
      }));
      String line = null;
      while ((line = reader.readLine()) != null && !subscriber.isUnsubscribed()) {
        subscriber.onNext(line);
      }
      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();
      }
    }
    catch (IOException ioe) {
      if (!subscriber.isUnsubscribed()) {
        subscriber.onError(ioe);
      }
    }
  });
}
Observable<String> fsObs = listFolder(
  Paths.get("src", "main", "resources"), "{lorem.txt,letters.txt}"
).flatMap(path -> from(path)); // (3)
subscribePrint(fsObs, "FS"); // (4)

这段代码介绍了处理文件夹和文件的两种方法。我们将简要介绍一下它们,以及在这个flatMap示例中如何使用它们:

  1. The first method, listFolder(), takes a folder in the form of a Path variable and a glob expression. It returns an Observable instance representing this folder. This Observable instance emits all the files in the folder, complying the glob expression as Path objects.

    该方法使用Observable.create()Observable.from()运算符实现。这个实现的主要思想是,如果发生异常,应该由生成的Observable实例来处理和发出。

    注意使用Subscriber.add()操作符向订阅服务器添加一个新的Subscription实例,该实例是使用Subscriptions.create()操作符创建的。此方法使用操作创建一个Subscription实例。当Subscription实例未订阅时执行此动作,即本例中Subscriber实例未订阅时执行此动作。因此,这类似于在最后一块中关闭stream

  2. The other method this example introduces is Observable<String> from(Path).

    它逐行读取定位并传递给path实例的文件,并以OnNext()通知的形式发出这些行。该方法使用Subscription实例上的Subscriber.add()操作符关闭文件的stream

  3. 使用flatMap的示例使用listFolder()操作符从文件夹创建Observable实例,该操作符向文件发送两个Path参数。对每个文件使用flatMap()操作符,我们使用from(Path)操作符创建一个Observable实例,它以行的形式发送文件内容。

  4. 前一个链的结果将是两个文件内容,打印在标准输出上。如果我们对每个可观察到的文件路径使用Scheduler实例(参见第 6 章与调度器使用并发和并行),则内容将置乱,因为flatMap操作符交错了Observable的通知它合并的实例。

介绍Observable<String> from(final Path path)方法的源代码可以在找到 https://github.com/meddle0x53/learning-rxjava/blob/724eadf5b0db988b185f8d86006d772286037625/src/main/java/com/packtpub/reactive/common/CreateObservable.java#L61

包含Observable<Path> listFolder(Path dir, String glob)方法的源代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/724eadf5b0db988b185f8d86006d772286037625/src/main/java/com/packtpub/reactive/common/CreateObservable.java#L128

使用flatMap操作符的示例可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/FlatMapAndFiles.java

flatMap运算符有多个过载。以为例,有一个具有三个功能,一个用于OnNext,一个用于OnError,一个用于OnComleted。它还将错误完成的事件转换为Observable实例,如果存在OnErrorOnCompleted事件,则将其Observable实例转换合并到生成的Observable实例中,然后发出OnCompleted通知。以下是一个例子:

Observable<Integer> flatMapped = Observable
  .just(-1, 0, 1)
  .map(v -> 2 / v)
  .flatMap(
 v -> Observable.just(v),
 e -> Observable.just(0),
 () -> Observable.just(42)
 );
subscribePrint(flatMapped, "flatMap");

其输出将为-2(2/-1)0(由于2/0引起的错误)。由于错误,因此1不会发出,也不会到达flatMap操作员。

另一个有趣的重载是Observable<R> flatMap(Func1<T, Observable<U>>, Func2<T, U, R>)。这是它的大理石图:

Transformations with the various flatMap operators

这一个将源Observable实例中的项与这些源项触发的Observable实例相结合,并使用原始项和派生项对调用用户提供的函数。然后,Observable实例将发出此函数的结果。以下是一个例子:

Observable<Integer> flatMapped = Observable
.just(5, 432)
.flatMap(
 v -> Observable.range(v, 2),
 (x, y) -> x + y);
subscribePrint(flatMapped, "flatMap");

输出为:

flatMap : 10
flatMap : 11
flatMap : 864
flatMap : 865
flatMap ended!

这是因为源Observable实例发出的第一个元素是5,所以flatMap操作符使用range()操作符将其转换为Observable实例,该实例发出56。但是这个flatMap操作员并没有停在这里;对于此范围Observable实例发出的每个项目,它应用第二个函数,第一个参数为原始项目(5,第二个参数为范围发出的项目。所以我们有5+55+6。同样适用于源Observable实例432发出的第二项。变为432+432=864432+433=865

当所有派生项都需要访问其源项时,此重载非常有用,并且通常可以避免我们使用某种元组类,从而节省对内存和库的依赖性。在前面的文件示例中,我们可以在每个输出行前面加上文件名:

CreateObservable.listFolder(
  Paths.get("src", "main", "resources"),
  "{lorem.txt,letters.txt}"
).flatMap(
 path -> CreateObservable.from(path),
 (path, line) -> path.getFileName() + " : " + line
);

运算符flatMapIterable不将 lambda 作为参数,lambda 将任意值作为参数,并返回一个Observable实例。相反,传递给它的 lambda 接受任意值并返回一个Iterable实例。所有这些Iterable实例都被展平为结果Observable实例发出的值。让我们看看下面的代码片段:

Observable<?> fIterableMapped = Observable
.just(
  Arrays.asList(2, 4),
  Arrays.asList("two", "four"),
)
.flatMapIterable(l -> l);

这个简单的示例合并了源Observable实例发出的两个列表,结果发出四项。值得一提的是,调用flatMapIterable(list -> list)与调用flatMap(l → Observable.from(l))是一样的。

flatMap运算符的另一种形式是concatMap运算符。它的行为与原始的flatMap操作符类似,只是为了生成自己的序列,它连接而不是合并生成的Observable实例。下图显示了它的工作原理:

Transformations with the various flatMap operators

不同的导数观测值中的项与flatMap操作符一样不是交错的。flatMapconcatMap操作符之间的一个显著区别是flatMap操作符并行使用内部Observable实例,而concatMap操作符一次只订阅一个Observable实例。

最后一个类似于flatMap的操作符是switchMap。其大理石图如下所示:

Transformations with the various flatMap operators

其操作方式与flatMap操作符类似,除了每当源Observable实例发出新项目时,它停止镜像先前发出的项目生成的Observable实例,并开始仅镜像当前Observable实例。换句话说,当下一个衍生Observable实例开始发射其项目时,它会在内部取消当前衍生实例的订阅。以下是一个例子:

Observable<Object> obs = Observable
.interval(40L, TimeUnit.MILLISECONDS)
.switchMap(v ->
 Observable
 .timer(0L, 10L, TimeUnit.MILLISECONDS)
 .map(u -> "Observable <" + (v + 1) + "> : " + (v + u)))
);
subscribePrint(obs, "switchMap");

Observable实例正在使用Observable.interval()操作符每隔 40 毫秒发出序列号(以零开始)。使用switchMap操作符,为每个数字创建一个新的Observable实例,该实例发出另一个数字序列。第二个数字序列从传递给switchMap运算符的源编号开始(通过使用map()运算符将源编号与每个发出的编号相加来实现)。因此,每 40 毫秒,就会发出一个新的数字序列(每个数字间隔 10 毫秒)。

结果输出如下所示:

switchMap : Observable <1> : 0
switchMap : Observable <1> : 1
switchMap : Observable <1> : 2
switchMap : Observable <1> : 3
switchMap : Observable <2> : 1
switchMap : Observable <2> : 2
switchMap : Observable <2> : 3
switchMap : Observable <2> : 4
switchMap : Observable <3> : 2
switchMap : Observable <3> : 3
switchMap : Observable <3> : 4
switchMap : Observable <3> : 5
switchMap : Observable <3> : 6
switchMap : Observable <4> : 3
.................

所有映射示例的源代码可在下载/查看 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/MappingExamples.java

分组项目

项目可以按特定属性或键分组。

首先,我们来看看groupBy()操作符,一种将源Observable实例划分为多个Observable实例的方法。每个Observable实例根据分组函数发出一些源项。

groupBy()操作符返回一个发出Observable实例的Observable实例。这些Observable实例是特殊的;它们属于GroupedObservable类型,您可以使用getKey()方法检索它们的分组密钥。一旦使用了groupBy()操作员,就可以用不同或通用的方式处理不同的组。

注意,当groupBy()操作符创建一个发出GroupedObservables实例的可观察对象时,每个实例都缓冲其项。因此,如果我们忽略其中任何一个,这个缓冲区将出现潜在的内存泄漏。

groupBy()操作员的大理石图如下所示:

Grouping items

在这里,项目的形式被用作分组的共同特征。为了更好地理解该方法的思想,我们可以查看以下示例:

List<String> albums = Arrays.asList(
  "The Piper at the Gates of Dawn",
  "A Saucerful of Secrets",
  "More", "Ummagumma",  "Atom Heart Mother",
  "Meddle", "Obscured by Clouds",
  "The Dark Side of the Moon",
  "Wish You Were Here", "Animals", "The Wall"
);
Observable
  .from(albums)
  .groupBy(album -> album.split(" ").length)
  .subscribe(obs ->
    subscribePrint(obs, obs.getKey() + " word(s)")
  );

该示例根据包含的单词数量,显示了平克·弗洛伊德的一些专辑标题和组合。例如MeddleMore在同一组中,键为1,而A Saucerful of SecretsWish You Were Here在同一组中,键为4。所有这些组都是由GroupedObservable实例表示的,所以我们可以在源Observable实例的subscribe()调用中订阅它们。不同的组根据其键使用不同的标签打印。这个小程序的输出如下:

7 word(s) : The Piper at the Gates of Dawn
4 word(s) : A Saucerful of Secrets
1 word(s) : More
1 word(s) : Ummagumma
3 word(s) : Atom Heart Mother
1 word(s) : Meddle
3 word(s) : Obscured by Clouds
6 word(s) : The Dark Side of the Moon
4 word(s) : Wish You Were Here
1 word(s) : Animals
2 word(s) : The Wall

项目发出的顺序相同,但它们由不同的GroupedObservable实例发出。而且,所有的GroupedObservable实例都是在源代码完成后完成的。

groupBy()操作符还有一个重载,需要一秒钟的时间,转换函数以某种方式转换组中的每个项。以下是一个例子:

Observable
.from(albums)
.groupBy(
 album -> album.replaceAll("[^mM]", "").length(),
 album -> album.replaceAll("[mM]", "*")
)
.subscribe(
  obs -> subscribePrint(obs, obs.getKey()+" occurences of 'm'")
);

专辑标题根据字母m在其中出现的次数进行分组。文本的转换方式是将字母的所有出现的替换为*。结果如下:

0 occurences of 'm' : The Piper at the Gates of Dawn
0 occurences of 'm' : A Saucerful of Secrets
1 occurences of 'm' : *ore
4 occurences of 'm' : U**agu**a
2 occurences of 'm' : Ato* Heart *other
1 occurences of 'm' : *eddle
0 occurences of 'm' : Obscured by Clouds
1 occurences of 'm' : The Dark Side of the *oon
0 occurences of 'm' : Wish You Were Here
1 occurences of 'm' : Ani*als
0 occurences of 'm' : The Wall

示范使用Observable.groupBy()运算符的源代码可在找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/UsingGroupBy.java

附加有用的变换运算符

还有几个值得一提的额外的转换。例如,有cast()操作符,是map(v -> someClass.cast(v))的快捷方式。

List<Number> list = Arrays.asList(1, 2, 3);
Observable<Integer> iObs = Observable
  .from(list)
  .cast(Integer.class);

这里初始的Observable实例发出Number类型的值,但它们实际上是Integer实例,所以我们可以使用cast()操作符将它们表示为Integer实例。

另一个有用的操作符是timestamp()操作符。它通过将每个发出的值转换为Timestamped<T>类的实例,向其添加时间戳。例如,如果我们想要记录Observable的输出,这将非常有用,如下所示:

List<Number> list = Arrays.asList(3, 2);
Observable<Timestamped<Number>> timestamp = Observable
  .from(list)
  .timestamp();
subscribePrint(timestamp, "Timestamps");

在本例中,每个数字都有时间戳。同样,可以使用map()操作符非常容易地实现这一点。前面示例的输出如下所示:

Timestamps : Timestamped(timestampMillis = 1431184924388, value = 1)
Timestamps : Timestamped(timestampMillis = 1431184924394, value = 2)
Timestamps : Timestamped(timestampMillis = 1431184924394, value = 3)

类似的运算符是timeInterval运算符,但它将值转换为TimeInterval<T>运算符的实例。TimeInterval<T>实例表示Observable发出的项目,以及自发出前一个项目或(如果没有前一个项目)订阅以来经过的时间量。这可用于生成统计信息,例如:

Observable<TimeInterval<Long>> timeInterval = Observable
  .timer(0L, 150L, TimeUnit.MILLISECONDS)
  .timeInterval();
subscribePrint(timeInterval, "Time intervals");

这将输出类似于以下内容的内容:

Time intervals : TimeInterval [intervalInMilliseconds=13, value=0]
Time intervals : TimeInterval [intervalInMilliseconds=142, value=1]
Time intervals : TimeInterval [intervalInMilliseconds=149, value=2]
...................................................................

我们可以看到,不同的值大约在 150 毫秒时发出,它们应该是这样的。

timeIntervaltimestamp操作员都在即时调度器上工作(参见第 6 章使用与调度器的并发性和并行性),他们都以毫秒为单位保存时间信息。

上述示例的源代码可在中找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/VariousTransformationsDemonstration.java

在第一章的无功和示例中,我们根据一种特殊模式过滤用户输入。例如,模式是a:<编号>。通常只从数据流中过滤感兴趣的数据位。例如,仅从所有按键关闭事件中筛选出<输入>按键关闭事件,或仅从文件中筛选出包含给定表达式的行是很有用的。这就是为什么不仅要能够转换我们的数据,而且还要学会如何过滤数据。

RxJava 中有很多过滤操作符。这些操作符中最重要的是filter()。其大理石图非常简单,如下所示:

Filtering data

它表明,filter()操作符通过某些属性过滤数据。在图中,它是元素的形式:它只过滤圆。与所有其他操作符一样,filter()从源创建一个新的Observable实例。此Observable实例仅发射符合filter()操作符定义的条件的项。以下代码说明了:

Observable<Integer> numbers = Observable
  .just(1, 13, 32, 45, 21, 8, 98, 103, 55);
Observable<Integer> filter = numbers
  .filter(n -> n % 2 == 0);
subscribePrint(filter, "Filter");

由于过滤条件,这将只输出个偶数号(32898)。

filter()操作符根据用户定义的函数过滤元素。还有很多额外的过滤操作符。为了理解它们,让我们看一些简单的例子:

Observable<Integer> numbers = Observable
  .just(1, 13, 32, 45, 21, 8, 98, 103, 55);
Observable<String> words = Observable
  .just(
    "One", "of", "the", "few", "of",
    "the", "crew", "crew"
  );
Observable<?> various = Observable
  .from(Arrays.asList("1", 2, 3.0, 4, 5L));

我们在示例中定义了三个Observable实例。第一个发出九个数字。第二个是一个接一个地说出一个句子中的所有单词。第三个会发出不同类型的元素,包括字符串、整数、double 和 long。

subscribePrint(numbers.takeLast(4), "Last 4");

takeLast()操作符返回一个新的Observable实例,该实例仅在完成时从源Observable实例发出最后的n项。此方法有一些重载。例如,有一个在指定的时间窗口内从源发射最后一个N或更少的项目。另一个可以接收一个Scheduler实例,以便在另一个线程上执行。

在本例中,只过滤并输出Observable实例的最后四项:

Last 4 : 8
Last 4 : 98
Last 4 : 103
Last 4 : 55
Last 4 ended!

让我们看看下面的代码片段:

subscribePrint(numbers.last(), "Last");

last()操作符创建的Observable实例,完成时只输出源Observable实例发出的最后一项。如果源没有发出项目,则会发出一个NoSuchElementException异常作为OnError()通知。它有一个重载,接收类型为T->Boolean的谓词参数。因此,它只发出源发出的最后一项,符合谓词定义的条件。在本例中,输出如下所示:

Last : 55
Last ended!

takeLastBuffer()方法的行为与takeLast()方法非常相似,但它创建的Observable实例只会发出一个项—一个List实例,其中包含来自源的最后N项:

subscribePrint(
  numbers.takeLastBuffer(4), "Last buffer"
);

它具有与takeLast()方法类似的重载。这里的输出如下:

Last buffer : [8, 98, 103, 55]
Last buffer ended!

lastOrDefault()运算符的行为与last()运算符的行为类似,并且具有与谓词相同的重载:

subscribePrint(
  numbers.lastOrDefault(200), "Last or default"
);
subscribePrint(
  Observable.empty().lastOrDefault(200), "Last or default"
);

但是,如果源没有发出任何信息,lastOrDefault()操作符发出默认值,而不是OnError通知。此示例的输出如下所示:

Last or default : 55
Last or default ended!
Last or default : 200
Last or default ended!

skipLast()运算符与takeLast()方法正好相反;当它完成时,它从源中发出除最后一个N项之外的所有内容:

subscribePrint(numbers.skipLast(4), "Skip last 4");

它有类似的重载。本例的输出如下:

Skip last 4 : 1
Skip last 4 : 13

skip()方法与skipLast()方法相同,但跳过前N项,而不是最后一项:

subscribePrint(numbers.skip(4), "Skip 4");

这意味着示例的输出如下所示:

Skip 4 : 21
Skip 4 : 8
Skip 4 : 98
Skip 4 : 103
Skip 4 : 55
Skip 4 ended!

take()操作符与takeLast()操作符类似,但它不是源的最后N项,而是发出第一个N项。

subscribePrint(numbers.take(4), "First 4");

这是一个常用的运算符,比takeLast()运算符便宜,因为takeLast()运算符缓冲其项并等待源完成。此运算符不缓冲其项,但在接收到它们时发出它们。它对于限制无限Observable实例非常有用。上述示例的输出如下所示:

First 4 : 1
First 4 : 13
First 4 : 32
First 4 : 45
First 4 ended!

让我们看看下面的代码片段:

subscribePrint(numbers.first(), "First");

first()运算符与last()运算符类似,但仅发射源发射的第一项。如果没有第一项,则发出相同的OnError通知。它的谓词形式有一个别名--takeFirst()操作符。该操作符还有一个firstOrDefault()操作符表单。此示例的输出很清楚:

First : 1
First ended!

让我们看看下面的代码片段:

subscribePrint(numbers.elementAt(5), "At 5");

elementAt()运算符类似于first()last()运算符,但没有谓词形式。不过有一张表格。它只在项目序列中指定索引处发出元素,由源Observable实例发出。此示例输出以下内容:

At 5 : 8
At 5 ended!

让我们看看下面的代码片段:

subscribePrint(words.distinct(), "Distinct");

distinct()操作符生成的Observable实例从源中发出项目,重复的项目除外。有一个重载可以接收函数,返回一个键或散列码值,用于确定一个项是否与另一个项不同:

Distinct : One
Distinct : of
Distinct : the
Distinct : few
Distinct : crew
Distinct ended!
subscribePrint(
  words.distinctUntilChanged(), "Distinct until changed"
);

distinctUntilChanged()操作符类似于distinct()方法,但它返回的Observable实例会发出源Observable实例发出的所有项目,这些项目与它们的前一个不同。因此,在本例中,它将发出除最后一个单词crew之外的所有单词。

subscribePrint( // (13)
  various.ofType(Integer.class), "Only integers"
);

ofType()操作符创建一个Observable实例,该实例只发出给定类型的源发出的项目。它基本上是这个呼叫的快捷方式:filter(v -> Class.isInstance(v))。在本例中,输出如下所示:

Only integers : 2
Only integers : 4
Only integers ended!

所有这些示例的源代码可在查看/下载 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/FilteringExamples.java

这些是 RxJava 提供的最常用的过滤操作符。在后面的示例中,我们将大量使用其中的一些。

我们将在本章中讨论的last运算符是一个转换运算符,但有点特殊。它可以使用以前累积的状态!让我们进一步了解它。

scan(Func2)运算符将具有两个参数的函数作为参数。其结果是一个Observable实例。scan()方法的结果发出的第一项是源Observable实例的第一项。通过将传递给scan()方法的函数应用于结果Observable实例发出的前一项和源Observable实例发出的第二项,创建了发出的第二项。由scan()方法结果发出的第三项是通过应用函数创建的,传递给scan()方法的前一项由其发出,第三项由源Observable实例发出。此模式继续以创建由scan()方法创建的Observable实例发出的序列的其余部分。传递给scan()方法的函数称为一个累加器

我们来看看scan(Func2)方法的大理石图:

Accumulating data

scan()方法发出的项可以通过累计状态生成。在图中,圆在三角形中累加,然后这个三角形圆在正方形中累加。

这意味着我们可以发出一系列整数的和,例如:

Observable<Integer> scan = Observable
  .range(1, 10)
  .scan((p, v) -> p + v);
subscribePrint(scan, "Sum");
subscribePrint(scan.last(), "Final sum");

第一个订阅将输出所有排放:1、3(1+2)、6(3+3)、10(6+4)。。55。但在大多数情况下,我们只对最后一项感兴趣,即最终的总和。我们可以使用一个只发出最后一个元素的Observable实例,使用last()过滤操作符。值得一提的是,这里有一个reduce(Func2)操作符,是scan(Func2).last()的别名。

scan()操作员有一个过载,可与种子/初始参数一起使用。在这种情况下,传递给scan(T, Func2)运算符的函数将应用于源发出的第一项和该种子参数。

Observable<String> file = CreateObservable.from(
  Paths.get("src", "main", "resources", "letters.txt")
);
scan = file.scan(0, (p, v) -> p + 1);
subscribePrint(scan.last(), "wc -l");

本例统计文件中的行数。文件Observable实例逐个发出给定路径指定的文件行。我们使用具有种子0scan(T, Func2)运算符,通过在每行的累积计数上加一来对行进行计数。

我们将用一个例子来结束本章,这个例子使用了本章中介绍的许多操作符。让我们来看看:

Observable<String> file = CreateObservable.from(
  Paths.get("src", "main", "resources", "operators.txt")
);
Observable<String> multy = file
  .flatMap(line -> Observable.from(line.split("\\."))) // (1)
  .map(String::trim) // (2)
  .map(sentence -> sentence.split(" ")) // (3)
  .filter(array -> array.length > 0) // (4)
  .map(array -> array[0]) // (5)
  .distinct() // (6)
  .groupBy(word -> word.contains("'")) //(7)
  .flatMap(observable -> observable.getKey() ? observable : // (8)
    observable.map(Introspector::decapitalize))
  .map(String::trim) // (9)
  .filter(word -> !word.isEmpty()) // (10)
  .scan((current, word) -> current + " " + word) // (11)
  .last() // (12)
  .map(sentence -> sentence + "."); // (13)
subscribePrint(multy, "Multiple operators"); // (14)

这段代码使用大量运算符过滤并组合隐藏在文件中的句子。该文件由一个Observable实例表示,该实例一行一行地发出其中包含的所有行。

  1. 我们不想只在不同的线路上运行;我们希望发出文件中包含的所有句子。因此,我们使用flatMap操作符创建一个Observable实例,该实例逐句发出文件语句(由dot确定)。
  2. 我们使用map()操作符修剪这些句子。它们可能包含一些前导或尾随空格。
  3. 我们希望对包含在句子项中的不同单词进行操作,因此我们使用map()操作符和String::split参数将它们转换为单词数组。
  4. 我们不关心空句子(如果有),所以我们使用filter()操作符过滤掉它们。
  5. 我们只需要句子中的第一个单词,所以我们使用map()操作符来获取它们。生成的Observable实例发出文件中每个句子的第一个单词。
  6. 我们不需要重复的单词,所以我们使用distinct()操作符来去除它们。
  7. 现在我们想用一种不同的方式来分支我们的逻辑,其中一些词被区别对待。因此,我们使用groupBy()操作符和Boolean键将单词分成两个Observable实例。所选单词的关键是True,其他所有单词的关键是False
  8. 使用flatMap操作符,我们将分开的单词连接起来,但只有选择的单词(键为True)保持不变。其余的被斩首
  9. 我们使用map()操作符从前导/尾随空格中修剪所有不同的单词。
  10. 我们使用filter()操作符过滤掉空的。
  11. 使用scan()操作符,我们用空格作为分隔符连接单词。
  12. 使用last()操作符,我们得到的Observable实例将只发出最后一个串联,包含所有单词。
  13. 最后一个对map()操作符的调用是通过添加一个点从我们连接的单词中创建一个句子。
  14. 如果我们输出这个Observable实例发出的单个项目,我们将得到一个由初始文件中包含的所有句子的第一个单词组成的句子(跳过重复的单词)!

输出如下:

Multiple operators : I'm the one who will become RX.
Multiple operators ended!

上述示例可在中找到 https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter04/VariousTransformationsDemonstration.java

本章结尾的示例演示了我们到目前为止学到的知识。我们可以通过使用各种操作符链接Observable实例来编写复杂的逻辑。我们可以使用map()flatMap()运算符转换传入数据,并可以使用groupBy()filter()运算符或不同的flatMap()运算符进行分支逻辑。我们可以使用flatMap()操作符再次加入这些分支。我们可以借助不同的过滤器选择部分数据,并使用scan()操作符进行累积。使用所有这些运算符,我们可以以可读且简单的方式编写相当不错的程序。程序的复杂性并不影响代码的复杂性。

下一步是学习如何以更直接的方式组合逻辑的分支。我们还将学习如何组合来自不同来源的数据。让我们继续下一章!

教程来源于Github,感谢apachecn大佬的无私奉献,致敬!

技术教程推荐

邱岳的产品手记 -〔邱岳〕

微服务架构核心20讲 -〔杨波〕

微服务架构实战160讲 -〔杨波〕

MySQL实战45讲 -〔林晓斌〕

从0打造音视频直播系统 -〔李超〕

恋爱必修课 -〔李一帆〕

Go 语言项目开发实战 -〔孔令飞〕

B端产品经理入门课 -〔董小圣〕

程序员职业规划手册 -〔雪梅〕