简介

ReactiveX,通过 (streams) 、可观察对象 (observables) 和操作符 (operators) 完成对响应式编程的一种实现,具有多种语言实现,包括 RxJs、RxJava、Rx.NET、RxPy 和 RxSwift。

ReactiveX 是一种使用可观察流 (observable streams) 进行异步编程的 API 。

可观察流类似事件发射器 (event emitters) ,可以发出三个事件:下一个 (next) 、错误 (error) 和完成 (complete) 。

一个 observable 会一直发出 next 事件,直到它发出 error 事件或 complete 事件。但是,此时它不会再发出任何事件,除非它再次被订阅 (subscribed) 。

ReactiveX 结合了观察者模式和迭代器模式以及函数式编程 (主要指:集合管道模式) 的优秀特性。

下面以 Javascript 版本的 RxJs 进行讲解,它被称为事件的“Lodash”。

通知

Observable 通知

Observable 通过以下通知 (Notifications) 与它的 Observer(s) 通信:

  • OnNext :将 Observable 发出的项目传达给 Observer;
  • OnCompleted :表示 Observable 已成功完成,它将不再发射任何项目;
  • OnError :表示 Observable 已因指定的错误条件而终止,并且它将不再发出任何项目;
  • OnSubscribe (可选) :表示 Observable 已准备好接受来自观察者的请求通知。

在 RxJs 中,Observable 通过下面方式通知 Observer:

// 发送下一个项目
observer.next(value)
// 发送失败通知
observer.error(err)
// 发送完成通知
observer.complete()

Observer 通知

Observer 通过以下通知与其 Observable 进行通信:

  • Subscribe :表示 Observer 准备好接收来自 Observable 的通知;
  • Unsubscribe :表示 Observer 不再想接收来自 Observable 的通知;
  • Request (可选) :表示 Observer 希望来自 Observable 的额外 OnNext 通知不超过指定数量。

在 RxJs 中,Observer 通过下面方式通知 Observable:

// 订阅
observable.subscribe()
// 取消订阅
observable.unsubscribe()

约定下治理通知

一个 Observable 可以发出零个或多个 OnNext 通知,每个通知代表一个发射的项目,然后它可以通过 OnCompletedOnError 通知跟随这些发射通知,但不能同时发出。

在发出 OnCompletedOnError 通知后,它可能不会再发出任何进一步的通知。

Observable 可能根本不发出任何项目。

Observable 也可能永远不会以 OnCompletedOnError 通知来终止。也就是说,Observable 不发出通知、只发出 OnCompletedOnError 通知或只发出 OnNext 通知是合适的。

Observables 必须串行 (而不是并行) 向观察者发出通知。他们可能会从不同的线程发出这些通知,但通知之间必须有正式的Happened-before关系。

Happened-before 表示两个事件的结果之间的关系,因此如果一个事件应该在另一个事件之前发生,则结果必须反映这一点,即使这些事件实际上是无序执行的 (通常是为了优化程序流程) 。

Observable 和 Observer

观察者 (Observer) 订阅一个可观察 (Observable) 序列,该序列通常通过调用提供的回调函数发送项目 (Item) 给 observer。

如果许多事件异步进入,它们必须存储在队列中或丢弃。在 ReactiveX 中,observer 永远不会被乱序的项目调用或 (在多线程上下文中) 在回调返回前一个项目之前调用。异步调用保持异步,可以通过返回一个 observable 来处理。

它类似于迭代器模式,如果发生致命错误,它会单独通知 observer (通过调用第二个函数) 。当所有项目都发送完毕后,它就完成了 (并通过调用第三个函数通知 observer) 。Reactive Extensions API 还从其他编程语言的迭代器操作符中借用了许多操作符。

Observable 的终止

如果一个 Observable 未曾发出一个 OnCompletedOnError 通知,那么一个 observer 可能认为该 Observable 仍然是活跃状态的 (即便当前没有发送项目) ,并且可能发给它一些通知 (例如:取消订阅或请求的通知) 。

当一个 Observable 发出一个 OnCompletedOnError 通知时,Observable 可能会释放其资源并终止,并且它的 observers 不应尝试与它进一步通信。

OnError 通知必须包含错误的原因 (也就是说使用 null 值 调用 OnError 是无效的) 。

在 Observable 终止之前,它必须首先向订阅它的所有 observers 发出 OnCompletedOnError 通知。

Observer 的订阅和退订

Observable 可以在 Observable 收到观察者的订阅 (Subscribe) 通知后,立即开始向该观察者发出通知。

当观察者向 Observable 发出取消订阅 (Unsubscribe) 通知时,Observable 将尝试停止向它发出通知。但是,不能保证 Observable 在观察者发出取消订阅通知后不会再向观察者发出任何通知。

当 Observable 向其观察者发出 OnErrorOnComplete 通知时,这将结束订阅,而观察者不再需要以发出取消订阅通知这种方式来结束订阅。

多个 Observers

如果第二个 observer 订阅了一个已经在向第一个 observer 发送项目的 Observable,那么 observer 会接收到什么项目将由 Observable 来决定。

Observable 来决定它是否会向每个 observer 发送相同的项目 (items) ,或是否会从头到尾重播的完整项目序列给第二个 observer,又或者它是否会向第二个 observer 发出完全不同的项目序列。

不能保证同一个 Observable 的两个观察者会看到相同的项目序列。

“Hot” 和 “Cold” Observables

Observable 什么时候开始发射它的项目序列?这取决于 Observable。

Hot 的 Observable 可能会在创建后立即开始发射项目,因此任何后来订阅该 Observable 的观察者都可以在中间的某个位置开始观察序列

另一方面,一个 Cold 的 Observable 会等到 Observer 订阅它之后才开始发射项目,因此这样的 Observer 可以保证从一开始就看到整个序列

在 ReactiveX 的一些实现中,还有一个叫做 Connectable 的 Observable 。这样的 Observable 在调用其 Connect 方法之前不会开始发射项目 ,无论是否有任何 Observer 订阅了它。

通过 Observable 操作符组合

Observables 和 Observables 只是 ReactiveX 的开始。它们本身只不过是标准观察者模式轻微扩展,更适合处理一系列事件而不是单个回调。

ReactiveX 的真正能力是允许您转换 (transform) 、组合 (combine) 、操作 (manipulate) 和使用 Observables 发出的项目序列的操作符。

这些 Rx 操作符允许您以声明 (declarative) 的方式将异步序列组合在一起,具有回调的所有效率优势,但没有嵌套回调处理程序通常与异步系统相关联的弊端。

背压 (Backpressure)

在软件世界中,背压是从流体动力学中借用的概念。

WIKI 百科上定义为:

背压是与通过管道的所需流体流动相反的阻力或力,导致摩擦损失和压降。

在软件的上下文中,可以调整定义以引用软件内的数据流:

与期望相反的阻力或力量经过软件的数据流。

背压并不是一种「机制」,也不是一种「策略」,是一种现象:在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 backpressure 溢出

背压策略

  • 控制生产者 (减速/加速由消费者决定)
  • 缓冲区 (临时累积传入数据峰值)
  • 丢弃 (对传入数据的百分比进行采样)

从技术上讲,还有第四种选择——忽略背压——老实说,如果背压没有引起严重问题,这不是一个坏主意。引入更多复杂性也是有代价的

ReactiveX 中的背压

背压是可选的,并非所有的 ReactiveX 实现都包含背压,并且在那些实现中,并非所有的 Observable 或操作符都支持背压。

如果 Observable 检测到其观察者实现了 Request 通知并理解 OnSubscribe 通知,则它可能会背压。

如果 Observable 实现了背压并且其观察者使用了背压,则 Observable 不会在订阅后立即开始向观察者发送项目。相反,它将向观察者发出 OnSubscribe 通知。

在收到 OnSubscribe 通知后的任何时候,观察者都可以向其订阅的 Observable 发出 Request 通知。此通知请求特定数量的项目。

Observable 通过向观察者发出的项目不超过观察者请求的项目数来响应这样的请求。然而,Observable 还可以发出 OnCompletedOnError 通知,它甚至可以在观察者请求任何项目之前发出这样的通知。

未实现背压的 Observable 应通过发出指示不支持背压的 OnError 通知来响应来自观察者的 Request 通知。

Requests 是累积的。例如,如果观察者向 Observable 发出三个 Request 通知,分别针对 3、5 和 10 个项目,则该 Observable 可能会向观察者发出多达 18 个项目,无论这些 Request 通知何时到达相对于 Observable 发出的时间作为回应的项目。

如果 Observable 产生的项目比观察者请求的多,则取决于 Observable 是否会丢弃多余的项目,将它们存储以稍后发出,或者使用其他策略来处理溢出 (overflow) 。

拉取 vs 推送

拉取 (Pull) 和推送 (Push) 是两种不同的协议,用来描述数据生产者 (Producer) 与数据消费者 (Consumer) 之间如何进行通信的。

通信方式生产者消费者
Pull被动 (Passive) :在请求时产生数据。主动 (Active) :决定何时请求数据。
Push主动 (Active) :按照自己的节奏产生数据。被动 (Passive) :对接收到的数据做出反应。

什么是推送?在推送系统中,生产者决定何时向消费者发送数据,而消费者不知道何时会收到该数据。

Promise 是当今 JavaScript 中最常见的推送系统类型。Promise (生产者) 向注册的回调 (消费者) 传递解析值,但与函数不同的是,Promise 负责准确确定何时将该值“推送”到回调。

操作符

Reactive X 里的操作符 (operator) 本质上是一个纯函数 (pure function),它接收一个 Observable 作为入参,并生成一个新的 Observable 作为出参。它将一个 observable () 作为其第一个参数并返回另一个 observable (目标外部可观察对象) 。

function myOperator(observable) {
  // ...
  return newObservable
}

弹珠图

要解释操作符是如何工作的,文字描述通常是不足以描述清楚的。许多操作符都是跟时间相关的,它们可能会以不同的方式延迟 (delay) 、取样 (sample) 、节流 (throttle) 或去抖动值 (debonce) 。

图表通常是更适合的工具。弹珠图 (Marble diagrams) 是操作符运行方式的视觉表示,其中包含输入 Obserable(s) (输入可能是多个 Observable )、操作符及其参数输出 Observable 。

在下图中可以看到解剖过的弹珠图。

在 RxJs 中内置了一些操作符,比如 map(...)filter(...)merge(...) 等等。

操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变。

你可以自定义一个操作符:

const rxjs = require("rxjs");

function counter(input) {
  let output = rxjs.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (number) => observer.next(number + 1),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

使用该操作符:

// input 是个 observable
let input = rxjs.from([1, 2, 3, 4]);
// output 是个新的 observable
let output = counter(input);

output.subscribe((x) => console.log(x));

输出:

2
3
4
5

链式调用

大多数操作符对 Observable 进行操作并返回一个新的 Observable。这允许您在一个链中一个接一个地应用这些操作符。

还有其他模式,例如 Builder 模式,其中特定类的各种方法通过方法的操作修改该对象来操作同一类的项目。这些模式还允许您以类似的方式链接方法。

但是在 Builder 模式中,方法出现在链中的顺序通常并不重要,但 Observable 操作符的顺序很重要

一连串的 Observable 操作符不会独立地对起源于该链的原始 Observable 进行操作,而是轮流操作,每个操作符都对链中前一个操作符生成的 Observable 进行操作。

在 RxJs 7.2 版本后,以前在 rxjs/operators 里的操作符直接改为由 rxjs 导出,并且提供了管道操作符来支持链式调用:

const { map, reduce } = require("rxjs");
const rxjs = require("rxjs");
let sourceObservable = rxjs.from([1, 2, 3, 4, 5]);
sourceObservable.pipe(
    map(function(number) { return number * 2; }),
    reduce(function(sum, number) { return sum + number; }, 0) 
  )
  .subscribe(function(number) {
    console.log(number);
  }, function(error) {
    console.error(error);
  }, function() {
    console.log('done');
  })

输出:

30
done

pipe 是 observable 的一个方法,我们看下源码实现:

  ...
  pipe(...operations) {
      if (operations.length === 0) {
          return this;
      }
      return pipeFromArray(operations)(this);
  }
  ...

我们发现 pipeFromArray 是一个柯里化函数,这是函数式编程的一个特征。

接下来,在 es6/util/pipe.js 中,我们找到 pipeFromArray 的实现:

import { noop } from './noop';
/* tslint:enable:max-line-length */
export function pipe(...fns) {
    return pipeFromArray(fns);
}
/* @internal */
export function pipeFromArray(fns) {
    if (!fns) {
        return noop;
    }
    if (fns.length === 1) {
        return fns[0];
    }
    return function piped(input) {
        return fns.reduce((prev, fn) => fn(prev), input);
    };
}

piped 是个函数,它的入参 input 也是我们的 sourceObservable,出参也是个 observable,所以也是个操作符。

map(function(number) { return number * 2; })reduce(function(sum, number) { return sum + number; }, 0) 都会返回一个操作符 (函数) ,同样他们以 observable 作为入参,并返回一个新 observable。

创建 observable

然后对于源 observable 发出的每个项目,它会对该项目应用一个函数,然后在目标 Observable 上发出它。它甚至可以在目标 observable 上发出另一个 observable。这称为内部可观察对象

在 RxJs 中有多种方式创建 observable。

通过 create 方法创建 observable:

const rxjs = require("rxjs");
let observable = rxjs.Observable.create(function (observer) {
   observer.next();
});

一个格式良好 (well-formed) 的有限 (finite) Observable,必须尝试精准地调用 observer 的 onCompletedonError 的其中一个方法一次,并且此后禁止尝试调用 observer 的任何其他方法。

通过 from 方法将一些其他对象或数据结构转换为 observables:

const rxjs = require("rxjs");
let observable = rxjs.from([1, 2, 3, 4]);

调度器

如果你想在你的 Observable 操作符级联中引入多线程,你可以通过指示这些操作符 (或特定的 Observables) 对特定的调度器 (Schedulers) 进行操作来实现。

在支持多线程语言的实现中 (例如:Rx.NET) ,调度器允许你调度繁重的工作给一个线程池或者专门的线程,并且在 UI 线程 运行最后的订阅 (subscription) ,从而更新 UI。

在 RxJs 中,实际上很少需要担心大多数方法的调度器参数。由于 JavaScript 本质上是单线程的,因此调度选项并不多,默认调度程序通常是正确的选择。

RxJs 中的调度类型

调度器目的
null通过不传递任何调度程序,通知以同步和递归方式传递。将此用于恒定时间 (constant-time) 操作或尾递归 (tail recursive) 操作。
queueScheduler在当前事件帧 (event frame) (蹦床[trampoline]调度器) 中的队列上调度。将其用于迭代操作。
asapScheduler微任务 (micro task) 队列上调度,这与用于承诺的队列相同。基本上在当前工作之后,但在下一个工作之前。将此用于异步转换 (asynchronous conversions) 。
asyncScheduler使用 setInterval 调度。将此用于基于时间的操作。
animationFrameScheduler安排将在下一次浏览器内容重绘之前发生的任务。可用于创建流畅的浏览器动画。

例子:

const { Observable, observeOn, asyncScheduler } = require("rxjs");

const observable = new Observable((observer) => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
}).pipe(observeOn(asyncScheduler));

console.log("just before subscribe");
observable.subscribe({
  next(x) {
    console.log("got value " + x);
  },
  error(err) {
    console.error("something wrong occurred: " + err);
  },
  complete() {
    console.log("done");
  }
});
console.log("just after subscribe");

输出:

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

参考资料:

> https://medium.com/@jayphelps/backpressure-explained-the-flow-of-data-through-software-2350b3e77ce7

> https://blog.csdn.net/weixin_47898971/article/details/121606582

> https://stackoverflow.com/questions/28145890/what-is-a-scheduler-in-rxjs