有几种方法可以被视为一种方便的异常引发方式:
使用处理元素Flux/Mono.handle
可以简化可能导致错误或空流的元素处理的方法之一是 运算符 。handle
下面的代码显示了我们如何使用它来解决我们的问题:
Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})
正如我们所看到的,运算符需要传递才能处理元素。在这里,我们在双消费器中有两个参数。第一个是来自上游的元素,第二个是帮助我们同步向下游提供元素。这种技术扩展了提供元素处理的不同结果的能力。例如,如果元素无效,我们可以向相同的元素提供误差,这将抵消上游并向下游产生信号。反过来,我们可以使用相同的运算符“过滤”。一旦执行了句柄并且没有提供任何元素,Reactor就会将其视为一种过滤,并将为我们请求一个额外的元素。最后,如果元素有效,我们可以简单地调用并传播我们的元素下游,或者对其应用一些映射,因此我们将在这里作为运算符。此外,我们可以安全地使用该运算符,而不会影响性能,并提供复杂的元素验证,例如元素验证或错误发送到下游。.handle
BiConsumer<T, SynchronousSink<>
SynchronousSink
SycnchronousSync
onError
handle
BiConsumer
SynchronousSink#next
handle
map
投掷使用#concatMap
+ Mono.error
在映射期间引发异常的选项之一是替换为 。从本质上讲,几乎做同样的事情。唯一的区别是一次只允许一个子流。这种行为大大简化了内部实现,并且不会影响性能。因此,我们可以使用以下代码,以便以更实用的方式引发异常:map
concatMap
concatMap
flatMap
concatMap
Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})
在上面的示例中,如果用户无效,我们使用 返回异常。我们可以使用以下方法对助焊剂执行相同的操作:Mono.error
Flux.error
Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})
请注意,在这两种情况下,我们都返回只有一个元素的冷流。在 Reactor 中,有一些优化可以提高性能,以防返回的流是冷标量流。因此,建议使用 Flux/Mono + , ,因此当我们需要更复杂的映射时,最终可能会得到 或 。concatMap
.just
empty
error
return null
throw new ...
注意力!永远不要检查传入元素的可空性。Reactor 项目永远不会为您发送 null
值,因为这违反了 Reactive Streams 规范(参见规则 2.13),因此,如果返回 null,Reactor 将为您抛出 NullPointerException。repo.findById
等等,为什么比更好?concatMap
flatMap
实质上,它被设计为合并一次执行的多个子流中的元素。这意味着flatMap下面应该有异步流,因此,它们可能会在多个线程上处理数据,或者可能是多个网络调用。随后,这种期望对实现的影响很大,因此应该能够处理来自多个流的数据(意味着并发数据结构的使用),如果从另一个流中耗尽元素,则对元素进行排队(意味着为每个子流分配额外的内存),并且不违反Reactive Streams规范规则(意味着非常复杂的实现)。计算所有这些事实以及我们将普通操作(同步)替换为更方便的方式来抛出异常(这不会改变执行的同步性)的事实导致这样一个事实,即我们不需要如此复杂的运算符,我们可以使用更简单的运算符,该运算符旨在一次异步处理单个流,并且具有一些优化以处理标量,冷流。flatMap
flatMap
Thread
Queue
map
Flux/Mono.error
concatMap
使用引发异常switchIfEmpty
因此,当结果为空时引发异常的另一种方法是运算符。下面的代码演示了如何使用这种方法:switchIfEmpty
Mono.just(userId)
.flatMap(repo::findById)
.switchIfEmpty(Mono.error(new UserNotFoundExeception()))
正如我们所看到的,在这种情况下应该有 的作为返回类型。因此,如果找不到实例,则结果流将为空。因此,反应器将调用一个替代 ,指定为参数。repo::findById
Mono
User
User
Mono
switchIfEmpty
按原样抛出异常(例如,在您的 、 和其他类似运算符中)map
filter
它可以被算作一个可读性较差的代码或糟糕的做法(我自己的观点),但你可以把你的异常(例如)扔给Project Reactor。即使,在某种程度上这样做可能会违反Reactive Streams规范(在这种情况下,从语义角度来看是违反的,因为你的运算符在引擎盖下是s链中的一个,因此 - 从语义上讲,在lambda中抛出异常可以映射到在违反规范规则2.13的方法中抛出异常).但是,由于 Reactor 会为您捕获抛出的异常,然后将其作为信号传播到下游,因此不禁止这样做。.map(v -> throw ...)
Subscriber
Subscriber
onNext
onError
要点
- 使用运算符以提供复杂的元素处理
.handle
- 当我们需要在映射过程中引发异常时使用 +,但这种技术最适合异步元素处理的情况。
concatMap
Mono.error
- 当我们已经到位时,使用+
flatMap
Mono.error
flatMap
-
Null
由于返回类型是被禁止的,所以而不是在你的下游,你会得到意想不到的null
map
onError
NullPointerException
- 在需要发送错误信号的所有情况下使用,如果调用某些特定函数的结果与空流一起完成
switchIfEmpty