1. RxJS #

1.1 基本概念 #

1.2 参考链接 #

1.3 Observable #

import { Observable } from 'rxjs'
const observable = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  subscriber.complete()
})
observable.subscribe({
  next: value => console.log('next value:', value),
  complete: () => {
    console.log('complete')
  }
})
observable.subscribe(value => console.log('next value:', value))

1.4 Subject #

src\Subject.js

import { Subject } from 'rxjs';
const source = new Subject();
source.subscribe({ next: data => console.log(`Subject 第一次订阅: ${data}`) });
source.next(1);
source.next(2);
source.subscribe({ next: data => console.log(`Subject 第二次订阅: ${data}`) });
source.next(3);
source.next(4);

1.5 API.js #

const express = require('express');
const cors = require('cors');
const morgan = require('morgan');
const bodyParser = require('body-parser');
const app = express();
app.use(morgan('dev'));
// npm install express cors morgan  body-parser --save
app.use(cors(
    {
        origin: 'http://localhost:3000',
        credentials: true
    }
));
app.use(bodyParser.json());
const users = [];
app.get('/api/user/1', (req, res) => {
     setTimeout(() => {
        res.json({ name: '张三' });
    }, 3000);
});
app.get('/api/search', (req, res) => {
    const q = req.query.q;
    const data = [];
    for (let i = 1; i <= 10; i++) {
        data.push(q + i);
    }
    res.json(data);
});
app.post('/api/user', (req, res) => {
   const user = req.body;
   user.id = Date.now();
   users.push(user);
   res.json(user)
});
app.delete('/api/user/1', (req, res) => {
   res.status(500).json({message:'删除失败'});
});
app.listen(8080, () => {
    console.log('server start at 8080');
});

1.6 interval #

import { interval } from 'rxjs';
const timer = interval(1000);
timer.subscribe(num => {
  console.log(num);
});

1.7 buffer #

1.7.1 bufferTime #

import { interval } from 'rxjs';
import { bufferTime } from 'rxjs/operators';
const timer = interval(500);
const bufferedTimer = timer.pipe(bufferTime(1000));
bufferedTimer.subscribe(arr => {
  console.log(arr);
});

1.7.2 bufferCount #

import { interval } from 'rxjs';
import { bufferCount } from 'rxjs/operators';
const timer = interval(500);
const bufferedTimer = timer.pipe(bufferCount(3));
bufferedTimer.subscribe(arr => {
  console.log(arr);
});

1.8 map #

1.8.1 map #

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

const source = of(1, 2, 3);
const example = source.pipe(
  map(val => val * 2)
);
const subscribe = example.subscribe(val => console.log(val));

1.8.2 switchMap #

import { interval, switchMap, from, take } from 'rxjs';
//interval 操作符会创建一个可观察对象,它会每隔一段时间发出一个数字
//在这个例子中,它会每隔 1000 毫秒发出一个数字
const source$ = interval(1000)
//使用 take 操作符限制了 source$ 只会发出 3 个值(0、1、2)
.pipe(take(3));
const switch$ = source$.pipe(
    //switchMap 操作符。它会将每个值映射成一个新的可观察对象,并且在这个新的可观察对象发出值之前,会取消订阅之前的可观察对象
    //在这个例子中,每个值都会被映射成一个 Promise,该 Promise 在 2 秒后被解析
    //最后使用 from 操作符将 Promise 转换为可观察对象
    switchMap(n => from(new Promise(resolve => {
        setTimeout(() => resolve(n), 2000)
    })))
);
switch$.subscribe(n => console.log(n));

1.8.3 mergeMap #

source$.pipe(
    mergeMap(project: function(value: T, index: number): ObservableInput, concurrent: number): Observable)

src\mergeMap.js

import { interval, of, mergeMap } from 'rxjs';
//源Observablesource$在每秒发出一个数字
const source$ = interval(1000);
const merged$ = source$.pipe(
    //使用mergeMap操作符来映射每个数字到一个新的Observable,并将它们合并到一起
    mergeMap(n => of(n * 2))
);
merged$.subscribe(n => console.log(n));
// 输出: 0, 2, 4, 6, 8, ...

1.8.4 map区别 #

1.9 takeUntil #

import { interval } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const source$ = interval(1000);
const stop$ = new Subject();
//这段代码会每隔 1000 毫秒在控制台输出一个数字,直到 stop$ Observable 发出值
//一旦 stop$ Observable 发出值,就会停止发出数字,并取消订阅源 Observable。
const result$ = source$.pipe(
  takeUntil(stop$)
);

result$.subscribe(x => console.log(x));

// 在某个时刻停止发出数字
stop$.next();

1.10 withLatestFrom #

first$:  -----0-----1-----2-----3-----4-----5|
second$: -----------------0-----------------1|
result$: -----------------[2,0]--[3,0]-[4,0]-[5,1]|

withLatestFrom.js

import {interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';
const first$ = interval(1000);
const second$ = interval(3000);
const result$ = first$.pipe(
  withLatestFrom(second$)
);
result$.subscribe(([first, second]) => console.log(first, second));

1.11 debounce #

1.11.1 debounceTime #

debounceTime.js

import { debounceTime } from 'rxjs/operators';
// 假设我们有一个名为 input$ 的 Observable,代表文本框的输入事件
input$.pipe(debounceTime(500)).subscribe(val => {
  // 在用户输入完文本后的 500 毫秒内,如果没有新的输入事件发生,
  // 就会在这里执行搜索请求
  search(val);
});

1.11.2 debounce #

import { debounce, timer } from 'rxjs';
input$.pipe(debounce(() => timer(500))).subscribe(val => {
  // 在用户输入完文本后的 500 毫秒内,如果没有新的输入事件发生,
  // 就会在这里执行搜索请求
  search(val);
});

1.12 lastValueFrom #

import { lastValueFrom,interval,take } from 'rxjs';
const source = interval(1000);
const lastValue = lastValueFrom(source.pipe(
    take(5),
));
lastValue.then(console.log);

1.13 share #

import { share } from 'rxjs/operators';
import { fromFetch } from 'rxjs/fetch';
const sharedObservable = fromFetch('http://localhost:8080/api/user/1')
.pipe(share())
sharedObservable.subscribe(res=>res.json().then(res=>console.log(res)));
sharedObservable.subscribe(res=>res.json().then(res=>console.log(res)));

1.14 fromFetch #

fromFetch('http://localhost:8080/api/user/1')
  .pipe(
    switchMap(response => {
      if (response.ok) {
        return response.json();
      } else {
        throw new Error('Api request failed');
      }
    }),
  )
  .subscribe({
    next: response => console.log(response),
    error: error => console.error(error),
  });

1.15 merge #

const first = of(1, 2, 3);
const second = of(4, 5, 6);
merge(first, second).subscribe(value => console.log(value));
// Output: 1, 4, 2, 5, 3, 6

1.16 error #

catchError

import { Observable ,of} from 'rxjs';
import { catchError } from 'rxjs/operators';
const source$ = new Observable(subscriber=>{
    setTimeout(()=>{
        subscriber.error(new Error('发生了错误'));
    },1000);
});
source$.pipe(
    catchError(error => of('正常值')),
).subscribe({
    next: value => console.log('next',value),
    error: error => console.error('error',error),
    complete: () => console.log('complete'),
});
import { Observable ,of, throwError} from 'rxjs';
import { catchError } from 'rxjs/operators';
const source$ = new Observable(subscriber=>{
    subscriber.error({success:false});
});
source$.pipe(
    catchError(error => {
        //return ["hello"]
        return throwError(()=>error);
    }),
).subscribe({
    next: value => console.log('next',value),
    error: error => console.error('error',error),
    complete: () => console.log('complete'),
});

1.17 filter #

import { of } from 'rxjs';
of(1, 2, 3, 4, 5)
  .pipe(
    filter(value => value % 2 === 0),
  )
  .subscribe(value => console.log(value));
// Output: 2, 4

1.18 throwIfEmpty #

import { Observable, throwIfEmpty } from 'rxjs';
const source$ = new Observable(subscriber => {
    subscriber.next(1);
    subscriber.complete();
});
source$
    .pipe(throwIfEmpty())
    .subscribe({
        next: user => console.log(user),
        error: error => console.error(error),
    })

2.缓存 #

2.2 bufferTime.js #

src\bufferTime.js

import { interval, bufferTime } from 'rxjs';
//interval 函数每隔一段时间(这里是 1000 毫秒)发出一个数字,表示过了多长时间。
//bufferTime 操作符会收集来自源 Observable(这里是 interval 发出的 Observable)的值,
//每隔一段时间(这里是 2000 毫秒)就把这些值当做数组发出。
const messageBox = document.getElementById('messageBox');
const source$ = interval(1000);
source$.pipe(bufferTime(2000))
    .subscribe((messages) => {
        messageBox.innerHTML += messages.map(item => `<li>Message ${item}</li>`)
            .join('\n')
    })

2.1 bufferCount.js #

src\bufferCount.js

import { interval, bufferCount } from 'rxjs';
const messageBox = document.getElementById('messageBox');
const source$ = interval(1000);
//bufferCount 操作符会收集来自源 Observable(这里是 interval 发出的 Observable)的值,
//当收集到了足够多(这里是 3 个)的值时,就把这些值当做数组发出。
source$.pipe(bufferCount(3))
    .subscribe((messages) => {
        messageBox.innerHTML += messages.map(item => `<li>Message ${item}</li>`)
            .join('\n')
    })

3.拖拽 #

3.1 public\index.html #

public\index.html

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="utf-8" />
  <meta name="viewport" content="width=device-width, initial-scale=1" />
  <meta name="theme-color" content="#000000" />
  <meta name="description" content="Web site created using create-react-app" />
  <title>React App</title>
  <style>
    #draggable {
      width: 100px;
      height: 100px;
      background-color: red;
      position: absolute;
      top: 0;
      left: 0;
    }
  </style>
</head>
<body>
  <ul id="messageBox"></ul>
  <div id="draggable"></div>
</body>
</html>

3.2 src\drag.js #

import { fromEvent } from 'rxjs';
import { withLatestFrom, takeUntil,switchMap} from 'rxjs/operators';
function startDragging(element) {
    const mouseDown = fromEvent(element, 'mousedown')
    const mouseUp = fromEvent(document, 'mouseup')
    const mouseMove = fromEvent(document, 'mousemove')
    mouseDown.pipe(
        switchMap(() => mouseMove.pipe(takeUntil(mouseUp))),
        withLatestFrom(mouseDown, (moveEvent, downEvent) => {
            return {
                left: moveEvent.clientX - downEvent.offsetX,
                top: moveEvent.clientY - downEvent.offsetY
            }
        })
    ).subscribe(({left,top}) => {
        element.style.left = left + 'px';
        element.style.top = top + 'px';
    })
}

const draggable = document.getElementById('draggable');
startDragging(draggable);

4.并发请求 #

4.1 src\multiRequest.js #

src\multiRequest.js

import { mergeMap, from } from 'rxjs';
/**
  实现一个批量并发请求函数 request(urls, concurrent),要求如下:
  1.要求最大并发数 concurrent
  2.每当有一个请求返回,就进行新的请求
  3.所有请求完成后,结果按照urls里面的顺序依次打出
 */
function fetchData(url) {
  return new Promise(resolve => setTimeout(() => resolve(url), 3000));
}
const urls = [
  '/api/user/1',
  '/api/user/2',
  '/api/user/3'
];
const start = Date.now();
function request(urls, concurrent) {
  from(urls)
    .pipe(mergeMap(fetchData, concurrent))
    .subscribe(val => {
      console.log(`耗时: ${parseInt((Date.now() - start) / 1000)}s`);
      console.log(val)
    });
}
request(urls, 2)

5.竞态 #

5.1 src/race.js #

import { Subject, switchMap } from 'rxjs';
function fetchData(id) {
    return new Promise(resolve => setTimeout(() => resolve(id), 1000 * id));
}
const search = new Subject()
search.pipe(switchMap(fetchData))
    .subscribe(console.log);
search.next(3);
setTimeout(() => {
    search.next(1);
}, 1000);

6.suggests #

6.1 src\suggests.js #

import { fromEvent, of, timer } from 'rxjs';
import { debounce, debounceTime, switchMap } from 'rxjs/operators';

const inputElement = document.querySelector('#keyword');
const wordsElement = document.querySelector('#words');

// 监听输入框的输入事件
const input$ = fromEvent(inputElement, 'input');
input$.subscribe(event => { console.log(event) });

// 对输入事件进行处理,延迟 100 毫秒后发送请求
const search$ = input$.pipe(
    debounceTime(100),
    //debounce 操作符,它可以根据一个指定的函数来判断是否发出事件
    //debounce 操作符的好处在于,它可以根据我们提供的函数动态地决定是否发出事件
    debounce((event) => event.target.value.length > 3 ? of(event) : timer(3000)), // 延迟 100 毫秒
    switchMap(event => fetch(`http://localhost:8080/api/search?q=${event.target.value}`)) // 发送请求
);
// 处理请求结果,渲染到页面
search$.subscribe(response => {
    response.json().then(data => {
        // 将响应结果渲染到页面
        wordsElement.innerHTML = data.map(item => `<li>${item}</li>`).join('');
    });
});

7.fetch封装 #

7.1 实现GET请求 #

7.1.1 src\index.js #

src\index.js

import {http} from './fetch/http';
http.request({ 
    url: 'http://localhost:8080/api/user/1',
    method: 'GET'
}).then(response => {
    console.log(response)
})

7.1.2 http.js #

src\fetch\http.js

import { lastValueFrom, share } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
export class Http {
  request(options) {
    return  lastValueFrom(
      fromFetch(options.url, options)
      .pipe(share())
    )
  }
}
export const http = new Http();

7.2 实现POST请求 #

7.2.1 src\index.js #

src\index.js

import { http } from './fetch/http';
http.post('http://localhost:8080/api/user', { name: 'zhangsan' })
    .then(res => res.json())
    .then(response => {
        console.log(response)
    })

7.2.2 http.js #

src\fetch\http.js

import { lastValueFrom, share } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import { getUrlFromOptions, getInitFromOptions } from './utils';
export class Http {
  request(options) {
    const url = getUrlFromOptions(options);
    const init = getInitFromOptions(options);
    return  lastValueFrom(
      fromFetch(url, init)
      .pipe(share())
    )
  }
+ delete(url) {
+   return this.request({ method: 'DELETE', url });
+ }
+ put(url, data) {
+   return this.request({ method: 'PUT', url, data, headers: { "Content-Type": "application/json" } });
+ }
+ post(url, data) {
+   return this.request({ method: 'POST', url, data, headers: { "Content-Type": "application/json" } });
+ }
}
export const http = new Http();

7.2.3 utils.js #

src\fetch\utils.js

export function getUrlFromOptions(options) {
    let { url, params={}, method='GET' } = options;
    let queryString = "";
    params = Object.keys(params).reduce((filteredParams, key) => {
        if (params[key] !== "") {
            filteredParams[key] = params[key];
        }
        return filteredParams;
    }, {});
    queryString = Object.keys(params).map(key => {
        return encodeURIComponent(key) + "=" + encodeURIComponent(params[key]);
    }).join("&");
    if (method === "GET" || method === "DELETE") {
        url += "?" + queryString;
    }
    return url;
}

export function getInitFromOptions(options) {
    let method = options.method || 'GET';
    let headers = options.headers || {};
    let isJSONBody = headers['Content-Type'] === 'application/json';
    let body = options.data;
    if(body&& Object.keys(body).length>0){
        if (isJSONBody) {
            body = JSON.stringify(body);
        } else {
            body = new URLSearchParams(body);
        }
    }
    let credentials = options.credentials || 'omit';//omit,same-origin,include
    return { method, headers, body, credentials };
}

7.3 解析响应体 #

7.3.1 src\index.js #

src\index.js

import { http } from './fetch/http';
http.post('http://localhost:8080/api/user', { name: 'zhangsan' })
    .then(response => {
        console.log(response.data)
    })

7.3.2 src\fetch\http.js #

src\fetch\http.js

+import { lastValueFrom, share ,mergeMap} from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import { getUrlFromOptions, getInitFromOptions } from './utils';
export class Http {
  request(options) {
    const url = getUrlFromOptions(options);
    const init = getInitFromOptions(options);
    return  lastValueFrom(
      fromFetch(url, init)
      .pipe(
+       mergeMap(async response => {
+         if (response.ok) {
+           return {data: await response.json(),status: response.status};
+         } else {
+           return Promise.reject({data: await response.json(),status: response.status});
+         }
+       }),
        share())
    )
  }
  post(url, data) {
    return this.request({ method: 'POST', url, data, headers: { "Content-Type": "application/json" } });
  }
  delete(url) {
    return this.request({ method: 'DELETE', url });
  }
  put(url, data) {
    return this.request({ method: 'PUT', url, data, headers: { "Content-Type": "application/json" } });
  }
  get(url, params) {
    return this.request({ method: 'GET', url, params });
  }
}
export const http = new Http();

7.4 错误处理 #

7.4.1 src\index.js #

src\index.js

import { http } from './fetch/http';
+http.delete('http://localhost:8080/api/user/1', { name: 'zhangsan' })
    .then(response => {
        console.log(response.data)
+    },error=>console.error(error))

7.4.2 http.js #

src\fetch\http.js

import { lastValueFrom, share, mergeMap, filter, merge } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import { getUrlFromOptions, getInitFromOptions } from './utils';
export class Http {
  request(options) {
    const url = getUrlFromOptions(options);
    const init = getInitFromOptions(options);
+   const fetchStream = fromFetch(url, init).pipe(share())
+   const successStream = fetchStream.pipe(
+     filter(response => response.ok),
+     mergeMap(async response => {
+       return { data: await response.json(), status: response.status };
+     }),
+   );
+   const failureStream = fetchStream.pipe(
+     filter(response => !response.ok),
+     mergeMap(async response => {
+       return Promise.reject({ error: await response.json(), status: response.status })
+     }),
+   );
+   const mergedStream = merge(successStream, failureStream)
+   return lastValueFrom(mergedStream);
  }
  delete(url) {
    return this.request({ method: 'DELETE', url });
  }
  put(url, data) {
    return this.request({ method: 'PUT', url, data, headers: { "Content-Type": "application/json" } });
  }
  post(url, data) {
    return this.request({ method: 'POST', url, data, headers: { "Content-Type": "application/json" } });
  }
}
export const http = new Http();

7.5 取消任务 #

7.5.1 src\index.js #

src\index.js

import { http } from './fetch/http';
http.delete('http://localhost:8080/api/user/1')
    .then(response => {
        console.log(response.data)
    },error=>console.error(error))

7.5.2 http.js #

src\fetch\http.js

+import { lastValueFrom, share, mergeMap, filter, merge,catchError,throwError,takeUntil,throwIfEmpty,Subject } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import { getUrlFromOptions, getInitFromOptions } from './utils';
export class Http {
+ cancleRequests = new Subject();
+ cancel(requestId) { 
+   this.cancleRequests.next(requestId);
+ }
  request(options) {
    const url = getUrlFromOptions(options);
    const init = getInitFromOptions(options);
    const fetchStream = fromFetch(url, init).pipe(share())
    const successStream = fetchStream.pipe(
      filter(response => response.ok),
      mergeMap(async response => {
        return { data: await response.json(), status: response.status };
      }),
    );
    const failureStream = fetchStream.pipe(
      filter(response => !response.ok),
      mergeMap(async response => {
        return Promise.reject({ error: await response.json(), status: response.status })
      }),
    );
+   const mergedStream = merge(successStream, failureStream).pipe(
+     catchError(error => throwError(()=>({...error,url}))),
+     takeUntil(
+       this.cancleRequests.pipe(
+         filter(requestId => options.requestId === requestId))
+     )
+     ,throwIfEmpty(() => ({
+       type: 'cancel',
+       cancelled: true,
+       data: null,
+       status: -1,
+       statusText: '请求被取消',
+       config: options
+     }))
+   );
+   return lastValueFrom(mergedStream);
  }
  delete(url) {
    return this.request({ method: 'DELETE', url });
  }
  put(url, data) {
    return this.request({ method: 'PUT', url, data, headers: { "Content-Type": "application/json" } });
  }
  post(url, data) {
    return this.request({ method: 'POST', url, data, headers: { "Content-Type": "application/json" } });
  }
  get(url, params) {
    return this.request({ method: 'GET', url, params });
  }
}
export const http = new Http();