RxJS 是一个使用可观察序列组合异步和基于事件的程序的库Observable,以及卫星类型(Observer,Schedulers,Subjects)和操作符,这些操作符受到 Array 方法(map,filter,reduce,every 等)的启发,可以将异步事件处理为集合ReactiveX 结合了观察者模式和迭代器模式,并将函数式编程与集合相结合,以满足对理想管理事件序列的需求Observable 的执行,主要用于取消执行map、filter、concat、reduce 等)处理集合时具有函数式编程风格EventEmitter,是将值或事件多播到多个 Observer 的唯一方法Observables(可观察对象)是懒惰的多个值的 Push 集合,可观察对象是一种异步数据流,它可以在将来推送多个值。它们是惰性的,因为它们不会立即开始发送值,直到有人订阅它们Pull 和 Push 是两种不同的协议,用于描述数据生产者如何与数据消费者进行通信Pull 系统中,消费者决定何时从数据生产者接收数据。生产者本身并不知道何时将数据传递给消费者JavaScript函数都是Pull系统。函数是数据的生产者,而调用函数的代码通过拉出单个返回值从其调用中消费它function* generator() {
yield 1
yield 2
yield 3
}
const iterator = generator()
console.log(iterator.next().value) // 1
console.log(iterator.next().value) // 2
console.log(iterator.next().value) // 3
Push系统中,生产者决定何时将数据发送给消费者。消费者不知道何时会收到该数据Promise是 JavaScript 中最常见的推送系统。Promise向已注册的回调函数(Consumers)提供已解析的值RxJS引入了可观察对象,这是一种新的 JavaScript 推送系统。可观察对象是多个值的生产者,将它们推送到观察者(Consumers)button.addEventListener("click", function () {
console.log("Button was clicked!")
})
| 生产者 | 消费者 | |
|---|---|---|
| Pull | 被动:在请求时生成数据 | 主动:决定何时请求数据 |
| Push | 主动:以自己的速度生成数据 | 被动:对收到的数据做出反应 |


Observer(观察者) 是由可观察对象传递的值的消费者。观察者仅仅是一组回调,每种类型的通知由可观察对象传递:next,error 和 completeObserver(观察者),请将其提供给可观察对象的subscribeRxJS中的观察者也可能是部分可选的的。如果不提供其中一个回调,可观察对象的执行仍然会正常进行,但是某些类型的通知将被忽略,因为观察者中没有相应的回调next回调作为参数提供,而不必附加到观察者对象上,在 observable.subscribe 内部,它将使用回调参数作为next处理程序创建观察者对象调用或订阅是一个隔离的操作:两次函数调用会触发两个单独的副作用,两次可观察对象订阅会触发两个单独的副作用。与事件发射器(EventEmitters)不同,事件发射器共享副作用并且无论是否存在订阅者都有急切执行,而可观察对象没有共享的执行并且是懒惰的Observables 可以使用 new Observable 或创建操作符创建,使用观察者订阅,执行以向观察者发送 next / error / complete 通知,并且可以对其执行进行处理ObservablesObservablesObservablesObservablessrc\index.js
import { Observable } from "./rxjs"
const observable = new Observable((subscriber) => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
subscriber.complete()
})
observable.subscribe({
next: (value) => console.log("next value:", value),
complete: () => {
console.log("complete")
},
})
observable.subscribe((value) => console.log("next value:", value))
src\rxjs\index.js
export { Observable } from "./internal/Observable"
src\rxjs\internal\Observable.js
import { Subscriber } from "./Subscriber"
export class Observable {
constructor(subscribe) {
if (subscribe) {
this._subscribe = subscribe
}
}
subscribe(observerOrNext) {
const subscriber = new Subscriber(observerOrNext)
this._subscribe(subscriber)
return subscriber
}
}
src\rxjs\internal\Subscriber.js
import { isFunction } from "./util/isFunction"
export class Subscriber {
isStopped = false
constructor(observerOrNext) {
let observer
if (isFunction(observerOrNext)) {
observer = {
next: observerOrNext,
}
} else {
observer = observerOrNext
}
this.destination = observer
}
next(value) {
if (!this.isStopped) {
this.destination.next(value)
}
}
complete() {
if (!this.isStopped) {
this.isStopped = true
this.destination.complete?.()
}
}
}
src\rxjs\internal\util\isFunction.js
export function isFunction(value) {
return typeof value === "function"
}
of 操作符允许你创建一个 Observable,它发出一组项目,然后完成Observable 中,例如,你可以使用它来将一个数字数组转换为 Observableof 操作符是同步的,意味着它会立即发出所有的值,并立即完成,如果你需要异步发出值,你可以使用 from 操作符from 操作符允许你将多种不同的数据类型转换为 Observable,包括数组、类数组对象(如 arguments 对象)、迭代器和可观察对象from 操作符是异步的,意味着它会在内部使用内置的调度src\index.js
import { of, from } from "./rxjs"
const arrayLike = of(1, 2, 3)
arrayLike.subscribe({
next: (value) => console.log(`arrayLike:`, value),
complete: () => console.log("arrayLike done"),
})
const promiseLike = from(Promise.resolve(4))
promiseLike.subscribe({
next: (value) => console.log(`promiseLike:`, value),
complete: () => console.log("promiseLike done"),
})
src\rxjs\index.js
export { Observable } from './internal/Observable';
+export { of } from './internal/observable/of';
+export { from } from './internal/observable/from';
src\rxjs\internal\observable\of.js
import { from } from "./from"
export function of(...args) {
return from(args)
}
src\rxjs\internal\observable\from.js
import { innerFrom } from "./innerFrom"
export function from(input) {
return innerFrom(input)
}
src\rxjs\internal\observable\innerFrom.js
import { isArrayLike } from "../util/isArrayLike"
import { isPromise } from "../util/isPromise"
import { Observable } from "../Observable"
export function innerFrom(input) {
if (input instanceof Observable) {
return input
}
if (input != null) {
if (isArrayLike(input)) {
return fromArrayLike(input)
}
if (isPromise(input)) {
return fromPromise(input)
}
}
}
export function fromArrayLike(array) {
return new Observable((subscriber) => {
for (let i = 0; i < array.length; i++) {
subscriber.next(array[i])
}
subscriber.complete()
})
}
export function fromPromise(promise) {
return new Observable((subscriber) => {
promise.then((value) => {
subscriber.next(value)
subscriber.complete()
})
})
}
src\rxjs\internal\util\isArrayLike.js
export const isArrayLike = (x) =>
x && typeof x.length === "number" && typeof x !== "function"
src\rxjs\internal\util\isPromise.js
import { isFunction } from "./isFunction"
export function isPromise(value) {
return isFunction(value?.then)
}
fromEvent 函数允许你将浏览器事件转换为 Observable。它接受两个参数:window对象click或scrollsrc\index.js
import { fromEvent } from "./rxjs"
const source = fromEvent(document, "click")
const subscriber = source.subscribe(console.log)
setTimeout(() => {
subscriber.unsubscribe()
}, 1000)
src\rxjs\index.js
export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
+export { fromEvent } from './internal/observable/fromEvent';
src\rxjs\internal\observable\fromEvent.js
import { Observable } from "../Observable"
export function fromEvent(target, eventName) {
return new Observable((subscriber) => {
const handler = (...args) => subscriber.next(...args)
target.addEventListener(eventName, handler)
return () => target.removeEventListener(eventName, handler)
})
}
src\rxjs\internal\Subscriber.js
import { isFunction } from './util/isFunction';
+import { Subscription } from './Subscription';
+export class Subscriber extends Subscription {
isStopped = false;
constructor(observerOrNext) {
super();
let observer;
if (isFunction(observerOrNext)) {
observer = {
next: observerOrNext
};
} else {
observer = observerOrNext;
}
this.destination = observer;
}
next(value) {
if (!this.isStopped) {
this.destination.next(value);
}
}
complete() {
if (!this.isStopped) {
this.isStopped = true;
this.destination.complete?.();
}
}
}
src\rxjs\internal\Subscription.js
export class Subscription {
_finalizers = []
unsubscribe() {
const { _finalizers } = this
if (_finalizers) {
for (const finalizer of _finalizers) {
finalizer()
}
}
}
add(teardown) {
this._finalizers.push(teardown)
}
}
src\rxjs\internal\Observable.js
import { Subscriber } from './Subscriber';
export class Observable {
constructor(subscribe) {
if (subscribe) {
this._subscribe = subscribe;
}
}
subscribe(observerOrNext) {
const subscriber = new Subscriber(observerOrNext);
+ const teardown = this._subscribe(subscriber)
+ subscriber.add(teardown)
return subscriber;
}
}
map操作符允许你对Observable中的每个值进行转换,并返回一个新的 Observable。它接受一个函数作为参数,该函数定义如何转换每个值。filter操作符允许你选择性地过滤 Observable 中的值。它接受一个函数作为参数,该函数定义如何过滤值

Observable,控制流的状态,是它的基石,但最有用的是它的operator,operator允许复杂的异步代码以声明的方式进行轻松组合的基础单元。 operator主要作用是操作、组合流中的数据of、from、timer、interval和fromEvent等map等Observable 合并成一个新的 Observable。这些操作符可以帮助你创建复杂的数据流,并控制它们之间的关系,像merge和concat等filter等Observable共享给多个观察者(Observer)。这些操作符可以帮助你控制 Observable 的行为,并有效地利用资源。像share等src\index.js
import { of, map, filter } from "./rxjs"
const subscriber = of(1, 2, 3)
.pipe(map((val) => val * 2)) // [2,4,6]
.pipe(filter((val) => val > 3)) //[4,6]
.pipe(map((data) => data + 1)) //[5,7]
subscriber.subscribe(console.log)
src\rxjs\index.js
export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
+export { filter } from './internal/operators/filter';
+export { map } from './internal/operators/map';
src\rxjs\internal\Observable.js
import { Subscriber } from './Subscriber';
export class Observable {
constructor(subscribe) {
if (subscribe) {
this._subscribe = subscribe;
}
}
subscribe(observerOrNext) {
const subscriber = new Subscriber(observerOrNext);
const teardown = this._subscribe(subscriber)
subscriber.add(teardown)
return subscriber;
}
+ pipe(operation) {
+ return operation(this);
+ }
}
src\rxjs\internal\operators\map.js
import { Observable } from "../Observable"
export function map(project) {
return (source) => {
const observable = new Observable(function (subscriber) {
return source.subscribe({
// 从 subscriber 订阅对象 中获取 观察者 对象
...subscriber.destination,
next: (value) => {
subscriber.next(project(value))
},
})
})
return observable
}
}
src\rxjs\internal\operators\filter.js
import { Observable } from "../Observable"
export function filter(predicate) {
return (source) => {
const observable = new Observable(function (subscriber) {
return source.subscribe({
// 从 subscriber 订阅对象 中获取 观察者 对象
...subscriber.destination,
next: (value) => {
predicate(value) && subscriber.next(value)
},
})
})
return observable
}
}
src\index.js
import { of, map, filter } from "./rxjs"
const subscriber = of(1, 2, 3).pipe(
map((val) => val * 2),
filter((val) => val > 3),
map((data) => data + 1)
)
subscriber.subscribe(console.log)
src\rxjs\internal\Observable.js
import { Subscriber } from './Subscriber';
+import { pipeFromArray } from './util/pipe';
export class Observable {
constructor(subscribe) {
if (subscribe) {
this._subscribe = subscribe;
}
}
subscribe(observerOrNext) {
const subscriber = new Subscriber(observerOrNext);
const teardown = this._subscribe(subscriber)
subscriber.add(teardown)
return subscriber;
}
+ pipe(...operations) {
+ return pipeFromArray(operations)(this);
+ }
}
src\rxjs\internal\util\pipe.js
import { identity } from "./identity"
export function pipeFromArray(fns) {
if (fns.length === 0) {
return identity
}
if (fns.length === 1) {
return fns[0]
}
return function piped(input) {
return fns.reduce((prev, fn) => fn(prev), input)
}
}
src\rxjs\internal\util\identity.js
export function identity(x) {
return x
}
src\index.js
import { asyncScheduler } from "./rxjs"
function task(state) {
console.log("state: ", state)
if (state < 5) {
this.schedule(state + 1, 1000)
}
}
asyncScheduler.schedule(task, 1000, 0)
src\rxjs\index.js
export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
+export { asyncScheduler } from './internal/scheduler/async';
src\rxjs\internal\Scheduler.js
export class Scheduler {
constructor(schedulerActionCtor) {
this.schedulerActionCtor = schedulerActionCtor
}
schedule(work, delay = 0, state) {
return new this.schedulerActionCtor(work).schedule(state, delay)
}
}
src\rxjs\internal\scheduler\AsyncAction.js
export class AsyncAction {
pending = false
constructor(work) {
this.work = work
}
schedule(state, delay = 0) {
this.state = state
this.delay = delay
if (this.id != null) {
this.id = this.recycleAsyncId(this.id)
}
this.pending = true
this.id = this.requestAsyncId(delay)
return this
}
requestAsyncId(delay = 0) {
return setInterval(this.execute.bind(this), delay)
}
execute() {
this.pending = false
this.work(this.state)
if (this.pending === false && this.id !== null) {
this.id = this.recycleAsyncId(this.id)
}
}
recycleAsyncId(id) {
if (id !== null) {
clearInterval(id)
}
return null
}
}
src\rxjs\internal\scheduler\async.js
import { AsyncAction } from "./AsyncAction"
import { Scheduler } from "../Scheduler"
export const asyncScheduler = new Scheduler(AsyncAction)
timer 函数是一个工厂函数,可以创建一个发出数字的 Observable,每个数字增加1。它接受两个参数:起始值和间隔时间src\index.js
import { timer } from "./rxjs"
timer(1000).subscribe(() => console.log("timer"))
src\rxjs\index.js
export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
+export { timer } from './internal/observable/timer';
src\rxjs\internal\observable\timer.js
import { Observable } from "../Observable"
import { asyncScheduler } from "../scheduler/async"
export function timer(dueTime = 0, scheduler = asyncScheduler) {
return new Observable((subscriber) => {
let n = 0
return scheduler.schedule(function () {
subscriber.next(n++)
}, dueTime)
})
}
interval 函数是一个工厂函数,可以创建一个发出数字的 Observable,每个数字增加 1。它接受一个间隔时间参数,表示每次发送之间的时间间隔interval 函数会一直发送数字,直到你取消订阅。你可以使用 take 操作符限制发送的数字数量src\index.js
import { interval } from "./rxjs"
interval(1000).subscribe((v) => console.log(v))
src\rxjs\index.js
export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
export { timer } from './internal/observable/timer';
+export { interval } from './internal/observable/interval';
src\rxjs\internal\observable\interval.js
import { asyncScheduler } from "../scheduler/async"
import { timer } from "./timer"
export function interval(period = 0, scheduler = asyncScheduler) {
return timer(period, period, scheduler)
}
src\rxjs\internal\observable\timer.js
import { Observable } from '../Observable';
import { asyncScheduler } from '../scheduler/async';
+export function timer(dueTime = 0, interval, scheduler = asyncScheduler) {
return new Observable(subscriber => {
let n = 0;
return scheduler.schedule(function () {
subscriber.next(n++);
+ if (interval >= 0) {
+ this.schedule(undefined, interval);
+ } else {
+ subscriber.complete();
+ }
}, dueTime);
});
}
take 操作符会从 Observable 中取出前 N 个值,然后完成。它是一个过滤操作符,可以用来限制 Observable 发送的值的数量take 操作符会在 Observable 发送完 N 个值之后立即完成,因此你不需要使用 unsubscribe 方法取消订阅src\index.js
import { interval, take } from "./rxjs"
interval(500).pipe(take(3)).subscribe(console.log)
// 0
// 1
// 2
src\rxjs\index.js
export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
export { timer } from './internal/observable/timer';
export { interval } from './internal/observable/interval';
+export { take } from './internal/operators/take';
src\rxjs\internal\operators\take.js
import { Observable } from "../Observable"
export function take(count) {
return (source) => {
let seen = 0
const observable = new Observable(function (subscriber) {
return source.subscribe({
...subscriber,
next: (value) => {
seen++
if (seen <= count) {
subscriber.next(value)
if (seen >= count) {
subscriber.complete()
}
}
},
})
})
return observable
}
}
Hot Observable 和 Cold Observable是指两种不同类型的 Observable,它们在执行时的行为有所不同Cold Observable 是一种会在每个观察者订阅时重新开始发出数据的 Observable。每个观察者都有自己的数据流,即使多个观察者订阅同一个 Cold Observable,它们也会收到完全独立的数据流。例如,当你订阅一个 Cold Observable 时,它会从头开始发出数据,不会丢失任何信息Hot Observable是一种在发出数据时无论是否有观察者订阅都会继续发出数据的Observable。每个观察者都会收到相同的数据流,并且会收到所有之前发出的数据。例如,当你订阅一个Hot Observable时,它可能会丢失一些信息,因为它在你订阅之前就已经开始发出数据了Cold Observable 适用于那些需要每个观察者都收到完整数据流的场景,而 Hot Observable 适用于那些数据流是连续不断的,不需要每个观察者都收到完整数据流的场景producer来自Observable内部。将会推送什么样的值在Observable创建时被定义下来,不会改变producer与observer是一对一的关系,即是 unicast (单播)的observer订阅时,producer会把预先定义好的若干值依次推送给每个observerCold Observable每次订阅后就只会有一个观察者, 下一个观察者要进行订阅时是一次新的数据流程,因此Cold Observable与observer是一对一关系operators基本上都是属于Cold Observableimport { Observable } from "./rxjs"
const source = new Observable((subscriber) => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
subscriber.complete()
})
source.subscribe((data) => console.log(`subscriberA: ${data}`))
// 1, 2, 3
source.subscribe((data) => console.log(`subscriberB: ${data}`))
// 1, 2, 3
producer来自observable外部,何时推送以及推送什么样的值在创建时都是未知的。producer与observer是一对多的关系,即multicast (多播)的observer订阅时,会将observer注册到观察者列表中producer被触发或执行时,会将值同时推送给所有的observer
import { Subject } from "./rxjs"
const source = new Subject()
source.subscribe({ next: (data) => console.log(`Subject 第一次订阅: ${data}`) })
source.next(1)
source.next(2)
source.subscribe({ next: (data) => console.log(`Subject 第二次订阅: ${data}`) })
source.next(3)
source.next(4)
src\index.js
import { Subject } from "./rxjs"
const subject = new Subject()
subject.subscribe({ next: (data) => console.log("observerA: ", data) })
subject.subscribe({ next: (data) => console.log("observerB: ", data) })
subject.next(1)
subject.next(2)
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
src\rxjs\index.js
export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
export { timer } from './internal/observable/timer';
export { interval } from './internal/observable/interval';
export { take } from './internal/operators/take';
+export { Subject } from './internal/Subject';
src\rxjs\internal\Subject.js
import { Subscriber } from "./Subscriber"
export class Subject extends Subscriber {
observers = []
subscribe(subscriber) {
const { observers } = this
observers.push(subscriber)
}
next(value) {
const copy = this.observers.slice()
for (const observer of copy) {
observer.next(value)
}
}
complete() {
const { observers } = this
while (observers.length) {
observers.shift().complete?.()
}
}
}