Flux

Mono

Examples

println("start")
val list: List<Mono<Int>> = listOf(
    Mono.just(1),
    Mono.just(2),
    Mono.just(3),
    Mono.just(4),
    Mono.just(5),
    Mono.just(6)
)

list.forEach { el ->
    Thread.sleep(500)
    println(el.block())
}
println("end")
result
start
1
2
3
4
5
6
end
Tip
Mono.error

Mono.error가 반횐되었을 떄 block() 호출하면 안에서 에러를 던짐(throw)

  • blockingGet 이 호출되고 내부적으로 error 필드가 있으므로 에러를 throw함

Error Handling

  • otherwiseReturn: 3.0.7에서 deprecated 되었고, onErrorReturn 사용 권장

onErrorReturn

  • 에러 발생하면 기본 값 사용

.onErrorReturn(-1)

onErrorResume

  • 에러 발생하면 다른 시퀀스나 다른 에러로 변경

.onErrorResume { err ->
  if (true) {
    Flux.just(1, 2, 3)
  } else {
    Flux.error(errr)
  }
}

onErrorMap

  • 에러를 다른 에러로 변경하기

.onErrorMap(IllegalArgumentException(it))

onErrorContinue

  • 스트림 처리 중 에러 발생 시 에러 발생 시키고 다음 스트림을 이어서 처리함.

onErrorStop

subscribe vs. subscribeOn

  • subscribe 는 현재 스레드에서 시작됨.

  • subscribeOn 은 실행할 스레드를 지정함.

Note
Lettuce의 ReactiveRedisTemplate 사용시 비동기 처리

Redis를 Lettuce 라이브러리 사용하여 ReactiveRedisTemplate 로 통신시 subscribeOn 없이도 비동기로 실행되는 것을 볼 수 있다. 이는 해당 라이브러이에서 netty EventLoop에서 실행하도록 구현해두었기 때문이다. (참고링크)

Etc