Rxjs
最近在看 Rxjs 时,一直在思考如此陡峭的学习曲线,究竟能给业务开发带来多大提升,通过这篇文章到底什么时候用得上rxjs,总结6个高频rxjs应用场景提供的场景发现 Rxjs 的能力还是很强的
学习参考:https://juejin.cn/post/7331676854913351730?searchId=202410120957427D63EE074118EF76217B
安装
pnpm i rxjs
基本概念
角色关系
import {map, Observable} from "rxjs";
let count=0
// 1、可观察对象 Observable,产生数据流 ⚠️ 回调函数observer才是可观察对象
const observable$=new Observable(observer => {
setInterval(() => {
observer.next(count++)
},1000)
})
// 2、pipe 管道处理数据流
observable$.pipe(
tap(()=>{ console.log('日志') }), // tap 不做任何数据处理,通常用来输出日志信息
map(val => val * 2), // map 是操作符,用于映射新的数据值
)
// 3、观察者 Observer ⚠️:subscribe的入参对象是才是观察者
observable$.subscribe({
next(val){console.log(val)},
complete(){ clearInterval(timer)},
error(err){console.log('error',err)}
})
总结
Observable
可通过 next 方法可主动发送值。rxjs 中提供了大量创建Observable 的方式
pipe
管道中可传入多个 Operator(操作符),数据流按Operator 的顺序处理数据
Observer
可通过next、complete、error 接收Observable 发出的数据、结束、错误信息
观察者类型
Cold Observable
⚠️:上例Observable 属于 code Observabel
除去下面提到的 Hot Observables
,其余都属于 Cold Observables
,即每次订阅时都会重新创建它的生产者,导致重新输出数据流
import { timer } from "rxjs";
import { tap } from "rxjs/operators";
const cold$ = timer(1000)
cold$.subscribe(val => console.log(`Observer 1: ${val}`));
cold$.subscribe(val => console.log(`Observer 2: ${val}`));
// 自这个 Cold Observable 启动,副作用就执行两次,一次是给 subscription
// 输出
// Observer 1: 0
// Observer 2: 0
Hot Observable
如果 hotObservable
不管我们订阅了多少次,副作用将只执行一次
在 rxjs 中
- 提供 Subject 类型创建Hot Observable
- 也可以在 pipe 中经过特定操作符将Cold Observable 转化为 Hot Observable
参考:https://juejin.cn/post/7174713617596547132?searchId=2024102515585998DCC4BEE8933D7A7766
可观察对象(Observable)
import {Observable} from "rxjs";
let timer=null
let count=0
// 创建 Observable 对象
const observer=new Observable((subscriber)=>{
timer=setInterval(()=>{
subscriber.next(count++)
},1000)
})
// 订阅 Observable 对象
const subscription=observer.subscribe({
next(val){console.log(val)},
complete(){ clearInterval(timer)},
error(err){console.log('error',err)}
})
setTimeout(()=>{
subscription.unsubscribe()
clearTimeout(timer)
},10*1000)
⚠️ 在 RxJS 中,Observable 是惰性的(lazy)。这意味着在没有订阅的情况下,Observable 不会开始发出数据。只有当某个订阅者调用 subscribe
方法时,Observable 才会开始执行其内部的逻辑并发出值。
管道(pipe)
pipe
是Observable的实例方法,入参是 1 个或多个操作符,可以用于操作数据流
import {map, Observable} from "rxjs";
let count=0
const observer=new Observable(observer => {
setInterval(() => {
observer.next(count++)
},1000)
}).pipe(
map(val => val * 2), // map 是操作符,用于映射新的数据值
)
observer.subscribe({
next(val){
console.log('next -->',val)} // 0 2 6 ...
})
实现原理
function pipe(...operators){
return function(source){
return operators.reduce((pre,operator)=>operator(pre),source)
}
}
操作符(Operator)
自定义操作符
实现一个自动去除首位空格的操作符
import {Observable} from "rxjs";
function customOperator(input){
return new Observable(observer => {
observer.next(input.replace(/^\s|\s$/,''))
})
}
customOperator(' hello ').subscribe({
next(val){
console.log('next -->',val)}
})
创建流
返回值是 Observable 类型
时间
interval
创建的 Observable 每个时间间隔都发出 1 个自增数(从 0 开始)
import {interval} from "rxjs";
const subscription=interval(2000).subscribe({ // ⚠️ 与 JS 一致,先等 2s 才输出第一个元素0
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
setTimeout(()=>{
subscription.unsubscribe()
},10*1000)
// 主动取消订阅不会执行 complete
timer
创建的 Observable 根据入参不同有些差异
import {timer} from "rxjs";
// 1 个参数是时间间隔。等待时间间隔后输出 0
timer(3000).subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
}) // 3s 后输出 0 complete
// 参数 1 是起始时间、参数 2 是时间间隔。 Observable 会立即输出 0,后续每隔固定时间发出值(自增 1)
defer
cold observable只有被订阅时才返回数据流
但是普通 observable 都是单播的,即每个订阅者都是独立的接收值,互不影响
import { of, Observable } from 'rxjs';
let counter = 0;
const source = new Observable(subscriber => {
counter++;
subscriber.next(`Subscription ${counter}`);
});
source.subscribe(value => {
console.log(value); // 输出 "Subscription 1"
});
source.subscribe(value => {
console.log(value); // 输出 "Subscription 1"
});
虽然 defer 创建的 Observable 还是单播,但是因为其会返回新的 observable,这就使得每次订阅的都是上一次返回的流
import { defer } from 'rxjs';
let counter = 0;
const deferredSource = defer(() => {
counter++;
return of(`Subscription ${counter}`);
});
deferredSource.subscribe(value => {
console.log(value); // 输出 "Subscription 1"
});
deferredSource.subscribe(value => {
console.log(value); // 输出 "Subscription 2"
});
静态数据
⚠️:创建的流按顺序输出数据,且即使数据是异步任务也不会等待完成输出
of
import {of} from "rxjs";
const of$=of(10,20,30)
of$.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 10 20 30 complete
range
import {range} from "rxjs";
const range$=range(0,3)
range$.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
}) // 输出 0 1 2 complete
generate
import {generate} from "rxjs";
const result$=generate({
initialState:0,
condition(val){
return val<3
},
iterate(val){
return val+1
},
resultSelector(val){
return val*1000
}
})
result$.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
}) // 输出 0 1000 2000 complete
from
注意:form 支持将数组、Promise、Iterable 转为 Observable
import {from} from "rxjs";
const result$=from([1,2,3]) // from(fetch('xxx').then(res=>res.json())) 支持把请求转化为 Observable
result$.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
}) // 输出 1 2 3 complete
事件
fromEvent
import {fromEvent} from "rxjs";
import {EventEmitter} from 'emitter' // 浏览器端没有emitter,可以安装 npm i emitter
// Node端可以使用自带的event库 ,import EventEmitter from 'events'
const emitter=new EventEmitter()
const result$=fromEvent(emitter, 'some-event')
result$.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
}) // 输出 1 2 3 complete
emitter.emit('some-event', 1)
fromEventPattern(待补充)
请求
ajax
只能在浏览器里用
import {ajax} from "rxjs/ajax";
ajax.getJSON('https://api.github.com/users?per_page=1').subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 打印接口返回值 ( 没有http 请求接口,仅仅是接口数据)
回调
bindCallback
将回调函数转变为 Observable
import {bindCallback} from "rxjs";
function someFunc(n,callback){
console.log('传入数据n -->',n)
setTimeout(()=>{
callback('回调数据')
},2000)
}
const boundFunc=bindCallback(someFunc)
const funcObservable=boundFunc(100)// 传入回调函数参数
funcObservable .subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 传入数据n --> 100 回调数据 complete
bindNodeCallback
将 Node 的回调 API 转化为Observable
import {bindNodeCallback} from 'rxjs'
import fs from 'fs'
const boundReadFile=bindNodeCallback(fs.readFile)
const readFileAsObservable=boundReadFile('./tmp.txt','utf-8')
readFileAsObservable.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 文件内容 complete
empty
import {EMPTY} from 'rxjs'
EMPTY.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 complete
throwError
import {throwError} from 'rxjs'
const errorObservable = throwError(()=>{
throw new Error('error')
})
errorObservable.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 error Error 信息
iff
通过第一个函数返回值判断走哪个分支,类似 if
import {iif,of} from 'rxjs'
let controller
const businessProcess = iif(
()=>controller,
of('分支1'),
of(' 分支2')
)
// 如果这里订阅就是走 分支1
controller = true
businessProcess.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 分支1 complete
过滤流数据
下面这些操作符会对每一个observable发出的值就行处理
audit
包括:audit、auditTime
从输出的数据流中指定采样频率
// 通过其他Observable函数
interval(1000).pipe(
audit(()=>interval(2000)),
).subscribe((value)=>{
console.log(value)
})
// 按时间周期取样
interval(1000).pipe(
auditTime(2000),
).subscribe((value)=>{
console.log(value)
})
delay
为 Observable 发出的每个一个值增加延时
import { of } from 'rxjs';
import { delay } from 'rxjs/operators';
const source = of('Hello', 'World');
source.pipe(
delay(1000) // 延迟 1000 毫秒
).subscribe(value => {
console.log(value); // 1秒后输出 "Hello", 再过1秒输出 "World"
});
debounce、throttle
防抖包括:debounce入参是函数返回Observable控制防抖、debounceTime支持传入时间
import {debounce, debounceTime, fromEvent} from "rxjs";
import EventEmitter from 'events' // 浏览器端没有emitter,可以安装 npm i emitter
const emitter=new EventEmitter()
//.pipe(debounce(()=>timer(300)))
fromEvent(emitter, 'some-event').pipe(debounceTime(300)).subscribe((value)=>{
console.log(value)
}) // 输出 3
emitter.emit('some-event', 1)
emitter.emit('some-event', 2)
emitter.emit('some-event', 3)
distinct
数据去重,包括distinct、distinctUntilChanged、distinctUntilKeyChanged
// ----- distinct 保证整体唯一性,id 为 1 的只出现一次 -----
from([{id:1,name:'jack'},{id:1,name:'tom'},{id:2,name:'hdd'},{id:1,name:'mars'}]).pipe(
distinct(item=>item.id),
).subscribe(val=>console.log(val))
// 输出:{ id: 1, name: 'jack' } { id: 2, name: 'hdd' }
// ---- distinctUntilChanged 只和前一个对比,id 为 1 的出现了多次 -----
from([{id:1,name:'jack'},{id:1,name:'tom'},{id:2,name:'hdd'},{id:1,name:'mars'}]).pipe(
distinctUntilChanged((pre,cur)=>pre.id===cur.id), // 函数返回 true 则过滤
).subscribe(val=>console.log(val))
// 输出:{ id: 1, name: 'jack' } { id: 2, name: 'hdd' } { id: 1, name: 'mars' }
// distinctUntilKeyChanged 也是对比前一个,只不过入参可以直接指定key
from([{id:1,name:'jack'},{id:1,name:'tom'},{id:2,name:'hdd'},{id:1,name:'mars'}]).pipe(
distinctUntilKeyChanged('id'),
).subscribe(val=>console.log(val))
sample
用来控制事件流的频率
只要notifier
Observable 发出一个值, sample
会去源 Observable 取出最新的一个值
interval(1000).pipe(
// 0-2s 之间是第一个采样区间,区间内第一个值就是采样点
sample(timer(0,2000)), // timer 从 0s 触发后,每隔 2s 触发一次。
).subscribe(val=>console.log(val)) // 输出 0 2 4
take、skip
take系列用于控制输出流的元素数量。包括:控制take、takeLast、takeUtil、takeWhile
⚠️ 取出对应数据后 Observable 流就结束了,会触发 complete 事件
js// take 取出前两个数据 interval(1000).pipe(take(2)).subscribe((value)=> console.log(value)); // takeLast 取出后两个数据 (如果从 interval 取 后两个数据呢? 会一直阻塞因为定时器会一直运行) from([1,2,3,4]).pipe(takeLast(2)).subscribe((value)=> console.log(value)); // takeUntil 一直取值,直到 notify Observable 流输出时结束 interval(1000).pipe( takeUntil(timer(3000)) ).subscribe((value)=>console.log(value)) // 传入函数,根据条件取值 interval(1000).pipe( takeWhile(val=>val<5) ).subscribe((value)=>console.log(value))
skip 跳过与 take 类似,也包括 skip、skipLast、skipUtil、skipWhile
first、last
选取第一个/最后一个元素后结束数据流
from([1,2,3]).pipe(
first(),
).subscribe({
next:val=>console.log(val),
complete:()=>console.log('complete')
})
// 输出 1 complete
处理流(高级用法)
buffer类
包括:buffer 、bufferWhen、bufferCount、bufferTime、bufferToggle
指定缓冲区输出时机,将数据存储在缓冲区(数组),输出后 buffer 可重现装载数据
import {buffer,interval} from "rxjs";
// 主流。应该是 1s 输出一次
const mainObservable=interval(1000)
//----------- buffer-----------
// 由其他流通知buffer收集数据结束,输出数据 (closeNotifyObservable每次输出就是一次通知)
const closeNotifyObservable=interval(2500) // 通知结束流
mainObservable.pipe(buffer(closeNotifyObservable)).subscribe((value)=>{
console.log(value)
})
// 输出 [ 0, 1 ] [ 2, 3 ] [ 4, 5, 6 ] ...
//----------- bufferWhen-----------
// 支持通过函数通知缓存结束
const closeNotifyObservableFun=()=>timer(3000)
mainObservable.pipe(bufferWhen(closeNotifyObservableFun)).subscribe((value)=>{
console.log('value -->',value)
})
//----------- bufferCount-----------
// 指定 buffer 缓存数据容量,满了就输出
mainObservable.pipe(bufferCount(3)).subscribe((value)=>{
console.log(value)
}) // 输出 [ 0, 1, 2 ] [ 3, 4, 5 ] ...
//----------- bufferTime-----------
// 由时间控制 buffer 输出
mainObservable.pipe(bufferTime(1500)).subscribe((value)=>{
console.log(value)
})
//----------- bufferToggle-----------
// 指定buffer通知开始、通知结束的流。这期间数据组成的数组
// 这是唯一可以指定开始缓存 buffer位置的操作符
const openNotifyObservable=timer(1000)
const closeNotifyObservable=()=>timer(3000)
mainObservable.pipe(bufferToggle(openNotifyObservable,closeNotifyObservable)).subscribe((value)=>{
console.log(value)
}) // 输出 [0,1,2]
window 类
包括:window 、windowWhen、windowCount、windowTime、windowToggle
其函数形式与 buffer 一致,区别是 buffer 是将数据放在数组中输出,而 window 将数据放在新 Observable 中输出
map 类(最常用)
switchMap
每个值映射为一个新流,只要有一个新的流就立即订阅,并取消上一个流
jsfromEvent(inputElement, 'input') .pipe( debounceTime(300), // 延迟300毫秒后才发射值,防止用户快速打字导致频繁请求 map(event => event.target.value), switchMap(searchText => searchText ? axios.get(`https://api.example.com/search?q=${searchText}`) : Observable.empty() ) ) .subscribe(response => { console.log('Search results:', response.data); });
mergeMap 并发
每个值映射为一个新流,并发处理流,最后会合并为一个流
js// 适合并发处理,最后收集结果 const MAX_CONCURRENT_REQUESTS=10 // 并发量 from([1,2,3]).pipe( mergeMap(val=>of(val).pipe(delay(1000)),MAX_CONCURRENT_REQUESTS), ).subscribe(val=>console.log(val)) // 等待 1s,同时输出 1 2 3
concatMap 串行
与mergeMap 一样都是将每个值映射为一个新流分别做处理后最后合并为一个流,但是 mergeMap 是并发处理每个流,而concat是按照输入流的元素顺序执行完一个再进行下一个
⚠️串行调用一旦报错会立即传播到最终的订阅者,并且整个链路会被中断,后续的 Observables 将不会被执行
jsfrom([1,2,3]).pipe( concatMap(val=>of(val).pipe(delay(1000))), // delay 是延迟操作符 ).subscribe(val=>console.log(val)) // 每隔 1s 输出一个值
exhaustMap
jsmap、mapTo
js
其他
expand、groupBy、pairwise、partition、pluck、scan
Join操作符
流错误处理
retry、retryWhen
retry用于在Observable 抛出错误时进行重试。retry会重新订阅原始Observable
处理多个流
combineLatest
合并多个Observable 的流。⚠️只有最后一个 Observable 结束才触发complete
import {combineLatest,of} from 'rxjs'
const observable1 = of(1)
const observable2 = of(2)
combineLatest([observable1, observable2]).subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 [ 1, 2 ] complete
// 合并接口
import {combineLatest,from} from 'rxjs'
const url='https://api.github.com/users?per_page=1'
const observable1=from(fetch(url).then(res=> res.json()))
const observable2=from(fetch(url).then(res=> res.json()))
combineLatest([observable1, observable2]).subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 [ 接口返回值, 接口返回值 ] complete
concat
串行合并 Observable,只有前一个 Observable 完成才会触发下一个 Obsverable
import {concat, from, interval, take} from 'rxjs'
const url='https://api.github.com/users?per_page=1'
const observable1=interval(1000).pipe(take(3)) // 输出 3 个值后终止
const observable2=from(fetch(url).then(res=> res.json()))
concat(observable1, observable2).subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 0 1 2 接口返回值 complete (输出 3 个数花费 3s 后,才请求接口)
forkJoin、race
forkJoin 类似 Promise.all,所有 Observable 并发执行,全部结束才输出
race 类似 Promise.race,返回 最先输出数据的Observable
import { forkJoin , interval, take} from 'rxjs'
const url='https://api.github.com/users?per_page=1'
const observable1=interval(1000).pipe(take(1))
const observable2=interval(1000).pipe(take(2))
forkJoin([observable1, observable2]).subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 2s后输出 [ 0, 1 ] complete
merge
将多个 Observable 流合并为一个流,按照数据输出的顺序,合并后的 Observable依次输出
import {merge, interval, take, map} from 'rxjs'
const observable1=interval(700).pipe(map(x=>`observable1 返回值${x}`),take(2))
const observable2=interval(1000).pipe(map(x=>`observable2 返回值${x}`),take(2))
const subscription=merge(observable1,observable2).subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出
// observable1 返回值0
// observable2 返回值0
// observable1 返回值1
// observable2 返回值1
// complete
partition
用于将 Observable 拆分为两个 Observable,一个满足特定条件,另一个不满足该条件。
返回数组,第一个Observable是挑出来的数据;剩下来的数据放到第二个 Observable。
数组 filter
import {from, partition} from 'rxjs'
const observable=from([1,2,3,4,5])
const [even$,odd$]=partition(observable,val=>val%2===0)
even$.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
odd$.subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 输出 2 4 complete 1 3 5 complete
接口按成功、失败分流
import {from, mergeMap, partition} from 'rxjs'
const url='https://api.github.com/users?per_page=1'
const observable=from(fetch(url))
const [success$,fail$]=partition(observable,response=>response.status>=200 && response.status<300)
success$.pipe(
mergeMap(res=>res.json()),
).subscribe({
next(val){console.log('success',val)},
complete(){console.log('success complete')},
error(err){console.log('success error',err)}
})
fail$.pipe(
mergeMap(res=>res.json()),
).subscribe({
next(val){console.log('fail',val)},
complete(){console.log('fail complete')},
error(err){console.log('fail error',err)}
})
// 输出 fail complete success 接口返回值 success complete
zip
将多个 Observables 的数据配对合并成一个新的 Observable,从数据角度,等待每个输入 Observable 相应位置的输出数据组成一个数组就先返回。直到其中一个结束了,整体就结束
看起来可以用来同步多个输出流的数据
import {from, of, timer, zip} from 'rxjs'
const url='https://api.github.com/users?per_page=1'
const observable1=timer(3000)
const observable2=from(['A','B'])
zip(observable1,observable2).subscribe({
next(val){console.log(val)},
complete(){console.log('complete')},
error(err){console.log('error',err)}
})
// 3s后输出 [ 0, 'A' ] complete
// observable1只输出了一个数据 0,所以observable2 只有 A 被组装成一个数组
总结
mergeMap、 concatMap 是将 Observable 输出数据分解成多个Observable 后再合并,其回调函数返回 Observable
map 是将Observable 的数据映射为新的数据,其回调函数返回 值
如果通过throwError 手动抛出错误只能用mergeMap、 concatMap
通过 throw new Error 手动抛出错误可以用 map 等其他操作符
from([1,2,3]).pipe(
concatMap(val=>{
return val>2?throwError(()=>new Error('大于2,重试')):of(val)
}),
retry(3)
).subscribe({
next:val=>console.log(val),
complete:()=>console.log('complete'),
error:err=>console.log(err)
})
// 输出 1 2 1 2 1 2 Error: 大于2,重试 报错栈信息
// 第1次执行+2次重试
实践
取消订阅
如果 Observable 自动走到 complelte 就不用取消订阅,但是为了避免内存泄露,最佳实践还是手动取消订阅
observable$.unsubscribe()
并发及重试
import {delay, from, lastValueFrom, mergeMap, of, retry, tap} from "rxjs";
function makeLimitedConcurrentRequests(filePathList) {
// 并发
const MAX_CONCURRENT_REQUESTS =2 ;
// 重试5次
const RETRY_COUNT = 5;
const makeUpload = (file) => of(file).pipe(
tap(() => console.log(`开始上传 ${file.path}`)),
delay(file.delay), // 加了延迟模拟耗时
tap(() => console.log(`完成上传 ${file.path}`)),
retry(RETRY_COUNT) // 重试过程是吧上传重新执行一遍,不会有任何提示,直到重试超过次数到 error、或者成功到 complete
);
const concurrentUpload = from(filePathList).pipe(
mergeMap(url => makeUpload(url), MAX_CONCURRENT_REQUESTS) // 并发请求
);
return lastValueFrom(concurrentUpload,{ defaultValue: 'No value emitted' });
}
makeLimitedConcurrentRequests([
{ path: 'pathA', delay: 1000},
{ path: 'pathB', delay: 4000},
{ path: 'pathC', delay: 2500},
]).then((res) => {
console.log(res);
}).catch((err) => {console.log(err)})
// 测并发。pathA、pathB执行,pathA先执行结束,这个坑位执行pathC,最后pathB 结束
开始上传 pathA
开始上传 pathB
完成上传 pathA
开始上传 pathC
完成上传 pathC
完成上传 pathB
{ path: 'pathB', delay: 4000 }
lastValueFrom
// 将 Observable 转换为一个 Promise,并返回最后一个发出的值
// 如何没有发出值,可以通过defaultValue指定
// 注意:如果 Observable 在完成前抛出了一个错误 或者 Observable 没有发出任何值且没有提供默认值,返回 rejected Promise
// return lastValueFrom(concurrentUpload,{ defaultValue: 'No value emitted' });
缓存
场景:页面需要定位信息、定位改变页面需要能收到通知
以前想的是一个单例+订阅发布,页面订阅单例,如果定位变化就能通知页面了
但是订阅发布存在一个弊端:订阅必须在发布之前,假设冷启动获取定位后立即通知页面,这是正常的,如果再进入下一个页面呢?下一个页面的定位信息从哪里来?
订阅发布存在时效性,rxjs 有能力缓存上一次的状态 (replayPublish 操作符)
参考:https://juejin.cn/post/7174713617596547132?searchId=2024102515585998DCC4BEE8933D7A7766 参考:https://juejin.cn/post/7114106871249633311?searchId=2024111515204767EF933AEB1ECABFAEDA#heading-1
限位窗口
场景:请求 A 拿到页面刷新数据,如果返回数据数据耗时 requestTime 在 500ms - 1000ms 时间区间返回立即停止刷新样式 ; requestTime < 500ms 则在 500ms 时停止刷新样式;requestTime > 1000ms 则在 1000ms时停止刷新样式,且数据即使返回也直接丢弃 场景:loading 页加载数据也是这种逻辑,有上限超时时间,但又为了用户能感知 loading 页所以设置下限时间
function mockRequest(delay){
return timer(delay??Math.random()*10*1000).pipe(map(val=> ({name:'tom'})))
}
from([
timer(3000),
mockRequest(5000),
timer(7000),
]).pipe(
mergeMap((observable)=>observable),
bufferCount(2),
take(1),
filter(value => value!==0)
).subscribe((value)=>{
// [ 0, 0 ] 超时
// [ { name: 'tom' }, 0 ] 3s之前返回数据
// [ 0, { name: 'tom' } ] 3 - 7s 之内返回数据
console.log(value)
// 如果是做 mockRequest 是其他耗时操作,无需返回值就不用处理
// 如果需要返回值,就需要提在这里提取数据了
})
竞争态问题
switchMap 操作符非常适合处理一些特定的场景,例如:
搜索框输入:用户快速输入时,只需要处理最新的输入并发起请求,而忽略之前的输入发起的请求。(之前的并不是取消了,而是忽略了之前请求的结果)
fromEvent(inputElement, 'input')
.pipe(
debounceTime(300), // 延迟300毫秒后才发射值,防止用户快速打字导致频繁请求
map(event => event.target.value),
switchMap(searchText =>
searchText ? axios.get(`https://api.example.com/search?q=${searchText}`) : Observable.empty()
)
)
.subscribe(response => {
console.log('Search results:', response.data);
});
远程协作:本地修改、远程修改以最新的作基准
const merged$ = merge(localInput$, remoteInput$);
// 处理合并后的事件流
merged$.pipe(
switchMap(data => updateUI(data)) // 更新页面
).subscribe(value => {
console.log(value)
});