1.RxJS #

1.1 RxJS 介绍 #

1.2 基本概念 #

1.3 参考链接 #

2.Observable #

2.1 Pull 和 Push #

function* generator() {
  yield 1
  yield 2
  yield 3
}

const iterator = generator()
console.log(iterator.next().value) // 1
console.log(iterator.next().value) // 2
console.log(iterator.next().value) // 3
button.addEventListener("click", function () {
  console.log("Button was clicked!")
})
生产者 消费者
Pull 被动:在请求时生成数据 主动:决定何时请求数据
Push 主动:以自己的速度生成数据 被动:对收到的数据做出反应

2.2 Stream(流) #

2.3 Observable #

2.4 src\index.js #

src\index.js

import { Observable } from "./rxjs"
const observable = new Observable((subscriber) => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  subscriber.complete()
})
observable.subscribe({
  next: (value) => console.log("next value:", value),
  complete: () => {
    console.log("complete")
  },
})
observable.subscribe((value) => console.log("next value:", value))

2.5 rxjs\index.js #

src\rxjs\index.js

export { Observable } from "./internal/Observable"

2.6 Observable.js #

src\rxjs\internal\Observable.js

import { Subscriber } from "./Subscriber"

export class Observable {
  constructor(subscribe) {
    if (subscribe) {
      this._subscribe = subscribe
    }
  }
  subscribe(observerOrNext) {
    const subscriber = new Subscriber(observerOrNext)
    this._subscribe(subscriber)
    return subscriber
  }
}

2.7 Subscriber.js #

src\rxjs\internal\Subscriber.js

import { isFunction } from "./util/isFunction"
export class Subscriber {
  isStopped = false
  constructor(observerOrNext) {
    let observer
    if (isFunction(observerOrNext)) {
      observer = {
        next: observerOrNext,
      }
    } else {
      observer = observerOrNext
    }
    this.destination = observer
  }
  next(value) {
    if (!this.isStopped) {
      this.destination.next(value)
    }
  }
  complete() {
    if (!this.isStopped) {
      this.isStopped = true
      this.destination.complete?.()
    }
  }
}

2.8 isFunction.js #

src\rxjs\internal\util\isFunction.js

export function isFunction(value) {
  return typeof value === "function"
}

3. of #

3.1 src\index.js #

src\index.js

import { of, from } from "./rxjs"
const arrayLike = of(1, 2, 3)
arrayLike.subscribe({
  next: (value) => console.log(`arrayLike:`, value),
  complete: () => console.log("arrayLike done"),
})

const promiseLike = from(Promise.resolve(4))

promiseLike.subscribe({
  next: (value) => console.log(`promiseLike:`, value),
  complete: () => console.log("promiseLike done"),
})

3.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
+export { of } from './internal/observable/of';
+export { from } from './internal/observable/from';

2.3 of.js #

src\rxjs\internal\observable\of.js

import { from } from "./from"
export function of(...args) {
  return from(args)
}

3.4 from.js #

src\rxjs\internal\observable\from.js

import { innerFrom } from "./innerFrom"
export function from(input) {
  return innerFrom(input)
}

3.5 innerFrom.js #

src\rxjs\internal\observable\innerFrom.js

import { isArrayLike } from "../util/isArrayLike"
import { isPromise } from "../util/isPromise"
import { Observable } from "../Observable"
export function innerFrom(input) {
  if (input instanceof Observable) {
    return input
  }
  if (input != null) {
    if (isArrayLike(input)) {
      return fromArrayLike(input)
    }
    if (isPromise(input)) {
      return fromPromise(input)
    }
  }
}
export function fromArrayLike(array) {
  return new Observable((subscriber) => {
    for (let i = 0; i < array.length; i++) {
      subscriber.next(array[i])
    }
    subscriber.complete()
  })
}
export function fromPromise(promise) {
  return new Observable((subscriber) => {
    promise.then((value) => {
      subscriber.next(value)
      subscriber.complete()
    })
  })
}

3.6 isArrayLike.js #

src\rxjs\internal\util\isArrayLike.js

export const isArrayLike = (x) =>
  x && typeof x.length === "number" && typeof x !== "function"

3.7 isPromise.js #

src\rxjs\internal\util\isPromise.js

import { isFunction } from "./isFunction"
export function isPromise(value) {
  return isFunction(value?.then)
}

4. fromEvent #

4.1 src\index.js #

src\index.js

import { fromEvent } from "./rxjs"
const source = fromEvent(document, "click")
const subscriber = source.subscribe(console.log)
setTimeout(() => {
  subscriber.unsubscribe()
}, 1000)

4.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
+export { fromEvent } from './internal/observable/fromEvent';

4.3 fromEvent.js #

src\rxjs\internal\observable\fromEvent.js

import { Observable } from "../Observable"
export function fromEvent(target, eventName) {
  return new Observable((subscriber) => {
    const handler = (...args) => subscriber.next(...args)
    target.addEventListener(eventName, handler)
    return () => target.removeEventListener(eventName, handler)
  })
}

4.4 Subscriber.js #

src\rxjs\internal\Subscriber.js

import { isFunction } from './util/isFunction';
+import { Subscription } from './Subscription';
+export class Subscriber extends Subscription {
    isStopped = false;
    constructor(observerOrNext) {
        super();
        let observer;
        if (isFunction(observerOrNext)) {
            observer = {
                next: observerOrNext
            };
        } else {
            observer = observerOrNext;
        }
        this.destination = observer;
    }
    next(value) {
        if (!this.isStopped) {
            this.destination.next(value);
        }
    }
    complete() {
        if (!this.isStopped) {
            this.isStopped = true;
            this.destination.complete?.();
        }
    }
}

4.5 Subscription.js #

src\rxjs\internal\Subscription.js

export class Subscription {
  _finalizers = []
  unsubscribe() {
    const { _finalizers } = this
    if (_finalizers) {
      for (const finalizer of _finalizers) {
        finalizer()
      }
    }
  }
  add(teardown) {
    this._finalizers.push(teardown)
  }
}

4.6 Observable.js #

src\rxjs\internal\Observable.js

import { Subscriber } from './Subscriber';
export class Observable {
    constructor(subscribe) {
        if (subscribe) {
            this._subscribe = subscribe;
        }
    }
    subscribe(observerOrNext) {
        const subscriber = new Subscriber(observerOrNext);
+       const teardown = this._subscribe(subscriber)
+       subscriber.add(teardown)
        return subscriber;
    }
}

5. map&filter #

5.1 Operators #

5.2 src\index.js #

src\index.js

import { of, map, filter } from "./rxjs"
const subscriber = of(1, 2, 3)
  .pipe(map((val) => val * 2)) // [2,4,6]
  .pipe(filter((val) => val > 3)) //[4,6]
  .pipe(map((data) => data + 1)) //[5,7]
subscriber.subscribe(console.log)

5.3 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
+export { filter } from './internal/operators/filter';
+export { map } from './internal/operators/map';

5.4 Observable.js #

src\rxjs\internal\Observable.js

import { Subscriber } from './Subscriber';
export class Observable {
    constructor(subscribe) {
        if (subscribe) {
            this._subscribe = subscribe;
        }
    }
    subscribe(observerOrNext) {
        const subscriber = new Subscriber(observerOrNext);
        const teardown = this._subscribe(subscriber)
        subscriber.add(teardown)
        return subscriber;
    }
+   pipe(operation) {
+       return operation(this);
+   }
}

5.5 map.js #

src\rxjs\internal\operators\map.js

import { Observable } from "../Observable"
export function map(project) {
  return (source) => {
    const observable = new Observable(function (subscriber) {
      return source.subscribe({
        // 从 subscriber 订阅对象 中获取 观察者 对象
        ...subscriber.destination,
        next: (value) => {
          subscriber.next(project(value))
        },
      })
    })
    return observable
  }
}

5.6 filter.js #

src\rxjs\internal\operators\filter.js

import { Observable } from "../Observable"
export function filter(predicate) {
  return (source) => {
    const observable = new Observable(function (subscriber) {
      return source.subscribe({
        // 从 subscriber 订阅对象 中获取 观察者 对象
        ...subscriber.destination,
        next: (value) => {
          predicate(value) && subscriber.next(value)
        },
      })
    })
    return observable
  }
}

6. pipe #

6.1 src\index.js #

src\index.js

import { of, map, filter } from "./rxjs"
const subscriber = of(1, 2, 3).pipe(
  map((val) => val * 2),
  filter((val) => val > 3),
  map((data) => data + 1)
)
subscriber.subscribe(console.log)

6.2 Observable.js #

src\rxjs\internal\Observable.js

import { Subscriber } from './Subscriber';
+import { pipeFromArray } from './util/pipe';
export class Observable {
    constructor(subscribe) {
        if (subscribe) {
            this._subscribe = subscribe;
        }
    }
    subscribe(observerOrNext) {
        const subscriber = new Subscriber(observerOrNext);
        const teardown = this._subscribe(subscriber)
        subscriber.add(teardown)
        return subscriber;
    }
+   pipe(...operations) {
+       return pipeFromArray(operations)(this);
+   }
}

6.3 pipe.js #

src\rxjs\internal\util\pipe.js

import { identity } from "./identity"
export function pipeFromArray(fns) {
  if (fns.length === 0) {
    return identity
  }
  if (fns.length === 1) {
    return fns[0]
  }
  return function piped(input) {
    return fns.reduce((prev, fn) => fn(prev), input)
  }
}

6.4 identity.js #

src\rxjs\internal\util\identity.js

export function identity(x) {
  return x
}

7.asyncScheduler #

7.1 src\index.js #

src\index.js

import { asyncScheduler } from "./rxjs"
function task(state) {
  console.log("state: ", state)
  if (state < 5) {
    this.schedule(state + 1, 1000)
  }
}
asyncScheduler.schedule(task, 1000, 0)

7.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
+export { asyncScheduler } from './internal/scheduler/async';

7.3 Scheduler.js #

src\rxjs\internal\Scheduler.js

export class Scheduler {
  constructor(schedulerActionCtor) {
    this.schedulerActionCtor = schedulerActionCtor
  }
  schedule(work, delay = 0, state) {
    return new this.schedulerActionCtor(work).schedule(state, delay)
  }
}

7.4 AsyncAction.js #

src\rxjs\internal\scheduler\AsyncAction.js

export class AsyncAction {
  pending = false
  constructor(work) {
    this.work = work
  }
  schedule(state, delay = 0) {
    this.state = state
    this.delay = delay
    if (this.id != null) {
      this.id = this.recycleAsyncId(this.id)
    }
    this.pending = true
    this.id = this.requestAsyncId(delay)
    return this
  }
  requestAsyncId(delay = 0) {
    return setInterval(this.execute.bind(this), delay)
  }
  execute() {
    this.pending = false
    this.work(this.state)
    if (this.pending === false && this.id !== null) {
      this.id = this.recycleAsyncId(this.id)
    }
  }
  recycleAsyncId(id) {
    if (id !== null) {
      clearInterval(id)
    }
    return null
  }
}

7.5 async.js #

src\rxjs\internal\scheduler\async.js

import { AsyncAction } from "./AsyncAction"
import { Scheduler } from "../Scheduler"
export const asyncScheduler = new Scheduler(AsyncAction)

8.timer #

8.1 src\index.js #

src\index.js

import { timer } from "./rxjs"
timer(1000).subscribe(() => console.log("timer"))

8.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
+export { timer } from './internal/observable/timer';

8.3 timer.js #

src\rxjs\internal\observable\timer.js

import { Observable } from "../Observable"
import { asyncScheduler } from "../scheduler/async"
export function timer(dueTime = 0, scheduler = asyncScheduler) {
  return new Observable((subscriber) => {
    let n = 0
    return scheduler.schedule(function () {
      subscriber.next(n++)
    }, dueTime)
  })
}

9.interval #

9.1 src\index.js #

src\index.js

import { interval } from "./rxjs"
interval(1000).subscribe((v) => console.log(v))

9.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
export { timer } from './internal/observable/timer';
+export { interval } from './internal/observable/interval';

9.3 interval.js #

src\rxjs\internal\observable\interval.js

import { asyncScheduler } from "../scheduler/async"
import { timer } from "./timer"
export function interval(period = 0, scheduler = asyncScheduler) {
  return timer(period, period, scheduler)
}

9.4 timer.js #

src\rxjs\internal\observable\timer.js

import { Observable } from '../Observable';
import { asyncScheduler } from '../scheduler/async';
+export function timer(dueTime = 0, interval, scheduler = asyncScheduler) {
  return new Observable(subscriber => {
    let n = 0;
    return scheduler.schedule(function () {
      subscriber.next(n++);
+    if (interval >= 0) {
+      this.schedule(undefined, interval);
+    } else {
+      subscriber.complete();
+    }
    }, dueTime);
  });
}

10.take #

10.1 src\index.js #

src\index.js

import { interval, take } from "./rxjs"
interval(500).pipe(take(3)).subscribe(console.log)
// 0
// 1
// 2

10.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
export { timer } from './internal/observable/timer';
export { interval } from './internal/observable/interval';
+export { take } from './internal/operators/take';

10.3 take.js #

src\rxjs\internal\operators\take.js

import { Observable } from "../Observable"
export function take(count) {
  return (source) => {
    let seen = 0
    const observable = new Observable(function (subscriber) {
      return source.subscribe({
        ...subscriber,
        next: (value) => {
          seen++
          if (seen <= count) {
            subscriber.next(value)
            if (seen >= count) {
              subscriber.complete()
            }
          }
        },
      })
    })
    return observable
  }
}

11.Subject #

11.1 Cold Observable 和 Hot Observable #

11.1.1 Cold Observable #

import { Observable } from "./rxjs"
const source = new Observable((subscriber) => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  subscriber.complete()
})

source.subscribe((data) => console.log(`subscriberA: ${data}`))
// 1, 2, 3
source.subscribe((data) => console.log(`subscriberB: ${data}`))
// 1, 2, 3

11.1.2 Hot Observable #

import { Subject } from "./rxjs"
const source = new Subject()
source.subscribe({ next: (data) => console.log(`Subject 第一次订阅: ${data}`) })
source.next(1)
source.next(2)
source.subscribe({ next: (data) => console.log(`Subject 第二次订阅: ${data}`) })
source.next(3)
source.next(4)

11.1 src\index.js #

src\index.js

import { Subject } from "./rxjs"
const subject = new Subject()

subject.subscribe({ next: (data) => console.log("observerA: ", data) })
subject.subscribe({ next: (data) => console.log("observerB: ", data) })

subject.next(1)
subject.next(2)

// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

11.2 rxjs\index.js #

src\rxjs\index.js

export { Observable } from './internal/Observable';
export { of } from './internal/observable/of';
export { from } from './internal/observable/from';
export { fromEvent } from './internal/observable/fromEvent';
export { filter } from './internal/operators/filter';
export { map } from './internal/operators/map';
export { asyncScheduler } from './internal/scheduler/async';
export { timer } from './internal/observable/timer';
export { interval } from './internal/observable/interval';
export { take } from './internal/operators/take';
+export { Subject } from './internal/Subject';

11.3 Subject.js #

src\rxjs\internal\Subject.js

import { Subscriber } from "./Subscriber"
export class Subject extends Subscriber {
  observers = []
  subscribe(subscriber) {
    const { observers } = this
    observers.push(subscriber)
  }
  next(value) {
    const copy = this.observers.slice()
    for (const observer of copy) {
      observer.next(value)
    }
  }
  complete() {
    const { observers } = this
    while (observers.length) {
      observers.shift().complete?.()
    }
  }
}