本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor Operators
本节的内容来自我翻译的Reactor 3 参考文档——如何选择操作符。由于部分朋友打开github.io网速比较慢或上不去,贴出来方便大家查阅。
如果一个操作符是专属于
Flux或Mono的,那么会给它注明前缀。
公共的操作符没有前缀。如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现,
会以一个点(.)开头,并将参数置于圆括号内,比如:.methodCall(parameter)。
1)创建一个新序列,它...
T,我已经有了:just
Optional<T>:Mono#justOrEmpty(Optional<T>)null 的 T:Mono#justOrEmpty(T)T,且还是由 just 方法返回
Mono#fromSupplier 或用 defer 包装 justT,这些元素我可以明确列举出来:Flux#just(T...)Flux#fromArrayFlux#fromIterableFlux#rangeStream 提供给每一个订阅:Flux#fromStream(Supplier<Stream>)Supplier<T>:Mono#fromSupplierMono#fromCallable,Mono#fromRunnableCompletableFuture<T>:Mono#fromFutureemptyerror
Throwable:error(Supplier<Throwable>)neverdeferusingFlux#generateFlux#createMono#create 也是异步的,只不过只能发一个)2)对序列进行转化
我想转化一个序列:
mapcastFlux#indexflatMap + 使用一个工厂方法handleflatMap + 一个异步的返回类型为 Publisher 的方法Mono.empty()Flux#flatMapSequential(对每个元素的异步任务会立即执行,但会将结果按照原序列顺序排序)Mono#flatMapMany我想添加一些数据元素到一个现有的序列:
Flux#startWith(T...)Flux#concatWith(T...)我想将 Flux 转化为集合(一下都是针对 Flux 的)
collectList,collectSortedListcollectMap,collectMultiMapcollectcountreducescanallanyhasElementshasElement我想合并 publishers...
Flux#concat 或 .concatWith(other)Flux#concatDelayErrorFlux#mergeSequentialFlux#merge / .mergeWith(other)Flux#zip / Flux#zipWithTuple2:Mono#zipWithMono#zipMono<Void>:Mono#andMono<Void>:Mono#whenFlux#zipFlux#combineLatestFlux#first,Mono#first,mono.or<br/>(otherMono).or(thirdMono),`flux.or(otherFlux).or(thirdFlux)flatMap,不过“喜新厌旧”):switchMapswitchOnNext我想重复一个序列:repeat
Flux.interval(duration).flatMap(tick -> myExistingPublisher)我有一个空序列,但是...
defaultIfEmptyswitchIfEmpty我有一个序列,但是我对序列的元素值不感兴趣:ignoreElements
Mono 来表示序列已经结束:thenthenEmptyMono:Mono#then(mono)Mono#thenReturn(T)Flux:thenMany我有一个 Mono 但我想延迟完成...
Mono#delayUntilOtherMono#delayUntil(Function)expand(Function)expandDeep(Function)3)“窥视”(只读)序列
再不对序列造成改变的情况下,我想:
doOnNextFlux#doOnComplete,Mono#doOnSuccessdoOnErrordoOnCanceldoOnSubscribedoOnRequestdoOnTerminate(Mono的方法可能包含有结果)
doAfterTerminateSignal):Flux#doOnEachdoFinallylogsingle 对象:doOnEachsingle 对象:materialize
dematerializelog4)过滤序列
我想过滤一个序列
filterfilterWhenofTypeignoreElementsFlux#distinctFlux#distinctUntilChanged我只想要一部分序列:
Flux#take(long)
Flux#take(Duration)Mono 中返回:Flux#next()request(N) 而不是取消:Flux#limitRequest(long)Flux#takeLastFlux#takeUntil(基于判断条件),Flux#takeUntilOther(基于对 publisher 的比较)Flux#takeWhileFlux#elementAt.takeLast(1)
Flux#last()Flux#last(T)Flux#skip(long)
Flux#skip(Duration)Flux#skipLastFlux#skipUntil(基于判断条件),Flux#skipUntilOther (基于对 publisher 的比较)Flux#skipWhileFlux#sample(Duration)
sampleFirstFlux#sample(Publisher)Flux#sampleTimeout (每一个元素会触发一个 publisher,如果这个 publisher 不被下一个元素触发的 publisher 覆盖就发出这个元素)Flux#single()Flux#single(T)Flux#singleOrEmpty5)错误处理
我想创建一个错误序列:error...
Flux:.concat(Flux.error(e))Mono:.then(Mono.error(e))timeouterror(Supplier<Throwable>)我想要类似 try/catch 的表达方式:
erroronErrorReturnFlux 或 Mono:onErrorResume.onErrorMap(t -> new RuntimeException(t))doFinallyusing 工厂方法我想从错误中恢复...
onErrorReturnPublisher:Flux#onErrorResume 和 Mono#onErrorResumeretryretryWhenIllegalStateException:Flux#onBackpressureErrorFlux#onBackpressureDropFlux#onBackpressureLatestFlux#onBackpressureBufferFlux#onBackpressureBuffer 带有策略 BufferOverflowStrategy6) 基于时间的操作
我想将元素转换为带有时间信息的 Tuple2<Long, T>...
elapsedtimestamp如果元素间延迟过长则中止序列:timeout
以固定的周期发出元素:Flux#interval
在一个给定的延迟后发出 0:static Mono.delay.
Mono#delayElement,Flux#delayElementsdelaySubscription7)拆分 Flux
我想将一个 Flux<T> 拆分为一个 Flux<Flux<T>>:
window(int)window(int, int)window(Duration)window(Duration, Duration)windowTimeout(int, Duration)windowUntilcutBefore 变量):.windowUntil(predicate, true)windowWhile (不满足条件的元素会被丢弃)window(Publisher),windowWhen我想将一个 Flux<T> 的元素拆分到集合...
List:buffer(int)
buffer(int, int)buffer(Duration)
buffer(Duration, Duration)bufferTimeout(int, Duration)bufferUntil(Predicate)
.bufferUntil(predicate, true)bufferWhile(Predicate)buffer(Publisher),bufferWhenbuffer(int, Supplier<C>)Flux<T> 中具有共同特征的元素分组到子 Flux:groupBy(Function<T,K>)(注意返回值是 Flux<GroupedFlux<K, T>>,每一个 GroupedFlux 具有相同的 key 值 K,可以通过 key() 方法获取)。8)回到同步的世界
我有一个 Flux<T>,我想:
Flux#blockFirstFlux#blockFirst(Duration)Flux#blockLastFlux#blockLast(Duration)Iterable<T>:Flux#toIterableStream<T>:Flux#toStreamMono<T>,我想:
Mono#blockMono#block(Duration)CompletableFuture<T>:Mono#toFuture附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器
原文地址:http://blog.51cto.com/liukang/2094073