Реактивное программирование
Обработка ошибок в реактивных стримах
Представьте ошибки как пороги в потоке событий: без обработки они останавливают течение, но с правильными операторами — поток продолжается, минимизируя простои. Это решает проблемы из первого поста: вместо жёстких сбоев в потоках или Future, где ошибка рушит всё, реактивный подход даёт контроль и устойчивость.
Обработка ошибок в Reactor строится на жизненном цикле: когда исключение возникает в потоке (в map, flatMap или источнике), срабатывает onError, прерывая onNext и onComplete. Но вместо того чтобы "падать", вы можете трансформировать ошибку в данные, повторить попытку или логировать.
Операторы — декларативные: добавляете в цепочку, и Reactor управляет асинхронностью, backpressure и распространением ошибок. Это делает код resilient (устойчивым): приложение не крашится, а адаптируется.
Базовая реакция: doOnError и onErrorMap
Сначала — простые операторы для наблюдения и модификации ошибок, без изменения потока.
- doOnError: дополнительная реакция на ошибку, как "хук" (зацепка). Полезен для логирования или метрик, не влияет на основной onError.
- onErrorMap: преобразует исключение в другое, для кастомизации (например, оборачивает в бизнес-ошибку).
Пример на Flux:
На Mono: аналогично, но для одиночного элемента.
Эти операторы решают проблему традиционных try-catch: вместо разбросанных блоков, всё в конвейере, читаемо и централизовано.
Восстановление: onErrorReturn и onErrorResume
Когда ошибка — не конец света, используйте fallback.
- onErrorReturn: возвращает фиксированное значение вместо ошибки. Простой запасной вариант.
- onErrorResume: более гибкий — заменяет ошибку новым Publisher (Mono/Flux). Можно генерировать динамически, в зависимости от исключения.
Пример с onErrorReturn:
С onErrorResume — условно:
onErrorResume позволяет ветвление: проверь тип ошибки и верни альтернативный поток. Это асинхронно: если fallback — Mono.delay, оно подождёт без блокировки.
Почему лучше CompletableFuture.handle?
Нет вложенных колбэков — цепочка линейна, ошибки интегрированы в конвейер.
#Java #middle #Reactor #doOnError #onErrorMap
Обработка ошибок в реактивных стримах
Представьте ошибки как пороги в потоке событий: без обработки они останавливают течение, но с правильными операторами — поток продолжается, минимизируя простои. Это решает проблемы из первого поста: вместо жёстких сбоев в потоках или Future, где ошибка рушит всё, реактивный подход даёт контроль и устойчивость.
Обработка ошибок в Reactor строится на жизненном цикле: когда исключение возникает в потоке (в map, flatMap или источнике), срабатывает onError, прерывая onNext и onComplete. Но вместо того чтобы "падать", вы можете трансформировать ошибку в данные, повторить попытку или логировать.
Операторы — декларативные: добавляете в цепочку, и Reactor управляет асинхронностью, backpressure и распространением ошибок. Это делает код resilient (устойчивым): приложение не крашится, а адаптируется.
Базовая реакция: doOnError и onErrorMap
Сначала — простые операторы для наблюдения и модификации ошибок, без изменения потока.
- doOnError: дополнительная реакция на ошибку, как "хук" (зацепка). Полезен для логирования или метрик, не влияет на основной onError.
- onErrorMap: преобразует исключение в другое, для кастомизации (например, оборачивает в бизнес-ошибку).
Пример на Flux:
import reactor.core.publisher.Flux;
import java.io.IOException;
Flux<String> riskyFlux = Flux.just("данные1", "данные2").map(data -> {
if (data.equals("данные2")) throw new IOException("Сбой ввода-вывода");
return data.toUpperCase();
});
riskyFlux
.doOnError(e -> System.err.println("Лог: " + e.getMessage())) // Логируем
.onErrorMap(e -> new RuntimeException("Обёрнутая ошибка: " + e)) // Преобразуем
.subscribe(
System.out::println,
error -> System.err.println("Финальная ошибка: " + error) // onError в подписке
);
// Вывод: "ДАННЫЕ1", потом лог "Сбой ввода-вывода", и финальная "Обёрнутая ошибка: ..."
Здесь doOnError срабатывает перед onErrorMap, а подписка ловит модифицированную ошибку. Это асинхронно: если ошибка в асинхронном подпотоке (flatMap), Reactor передаёт её downstream (дальше по цепи) без блокировок.
На Mono: аналогично, но для одиночного элемента.
Эти операторы решают проблему традиционных try-catch: вместо разбросанных блоков, всё в конвейере, читаемо и централизовано.
Восстановление: onErrorReturn и onErrorResume
Когда ошибка — не конец света, используйте fallback.
- onErrorReturn: возвращает фиксированное значение вместо ошибки. Простой запасной вариант.
- onErrorResume: более гибкий — заменяет ошибку новым Publisher (Mono/Flux). Можно генерировать динамически, в зависимости от исключения.
Пример с onErrorReturn:
Mono<String> httpMono = Mono.fromCallable(() -> {
// Симулируем HTTP-запрос
throw new RuntimeException("Сервер не отвечает");
}).onErrorReturn("Кэшированные данные");
httpMono.subscribe(System.out::println); // Вывод: "Кэшированные данные", потом onComplete
Здесь ошибка преобразуется в значение, поток завершается успешно.
С onErrorResume — условно:
Flux<Integer> calcFlux = Flux.range(1, 5).map(i -> {
if (i == 3) throw new ArithmeticException("Деление на ноль");
return 10 / (i - 3); // Симуляция
}).onErrorResume(e -> {
if (e instanceof ArithmeticException) {
return Flux.just(0, 0); // Fallback на нули
} else {
return Flux.error(e); // Пропустить другие ошибки
}
});
calcFlux.subscribe(System.out::println); // Вывод: элементы до ошибки, потом 0, 0, onComplete
onErrorResume позволяет ветвление: проверь тип ошибки и верни альтернативный поток. Это асинхронно: если fallback — Mono.delay, оно подождёт без блокировки.
Почему лучше CompletableFuture.handle?
Нет вложенных колбэков — цепочка линейна, ошибки интегрированы в конвейер.
#Java #middle #Reactor #doOnError #onErrorMap
👍2
Retry: повтор попыток при ошибке
Retry — для transient (временных) ошибок: сеть, временный сбой. Повторяет upstream (источник) заданное число раз.
- retry(long times): простая повторка.
- retryWhen(Retry strategy): с кастомной логикой (backoff — задержка, условия).
Пример базовый:
С retryWhen для экспоненциальной задержки (backoff):
Это предотвращает "молоток" (hammering) сервера: ждёт перед retry. Асинхронно: задержки не блокируют поток.
Retry решает боли блокировок: вместо цикла с sleep в традиционном коде, декларативно и эффективно.
Практические советы и подводные камни
- Комбинируйте: riskyFlux.doOnError(log).onErrorResume(fallback).retry(2) — лог + retry + fallback.
- Условия: в onErrorResume используйте instanceof для типов ошибок, чтобы не catch всё подряд.
- Глобально: используйте Hooks.onErrorDropped для непойманных ошибок (редко).
- Камень: retry на бесконечных потоках — вечный цикл; добавьте timeout() или maxAttempts.
- Тестирование: StepVerifier.create(flux).expectErrorMatches(e -> e instanceof IOException).verify();
В практике: в WebFlux — контроллер возвращает Mono с retry для внешних API, onErrorReturn для кэша.
плавному восстановлению, экономя ресурсы и упрощая код.
#Java #middle #Reactor #doOnError #onErrorMap
Retry — для transient (временных) ошибок: сеть, временный сбой. Повторяет upstream (источник) заданное число раз.
- retry(long times): простая повторка.
- retryWhen(Retry strategy): с кастомной логикой (backoff — задержка, условия).
Пример базовый:
Mono<String> flakyMono = Mono.defer(() -> {
if (Math.random() > 0.3) throw new RuntimeException("Временный сбой");
return Mono.just("Успех");
}).retry(3); // Повторить 3 раза
flakyMono.subscribe(System.out::println, Throwable::printStackTrace);
Здесь retry повторяет весь Mono при ошибке, до успеха или исчерпания попыток. Если все попытки fail — финальный onError.
С retryWhen для экспоненциальной задержки (backoff):
import reactor.util.retry.Retry;
flakyMono.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 3 попытки с задержкой 1с, 2с, 4с
Это предотвращает "молоток" (hammering) сервера: ждёт перед retry. Асинхронно: задержки не блокируют поток.
Retry решает боли блокировок: вместо цикла с sleep в традиционном коде, декларативно и эффективно.
Практические советы и подводные камни
- Комбинируйте: riskyFlux.doOnError(log).onErrorResume(fallback).retry(2) — лог + retry + fallback.
- Условия: в onErrorResume используйте instanceof для типов ошибок, чтобы не catch всё подряд.
- Глобально: используйте Hooks.onErrorDropped для непойманных ошибок (редко).
- Камень: retry на бесконечных потоках — вечный цикл; добавьте timeout() или maxAttempts.
- Тестирование: StepVerifier.create(flux).expectErrorMatches(e -> e instanceof IOException).verify();
В практике: в WebFlux — контроллер возвращает Mono с retry для внешних API, onErrorReturn для кэша.
плавному восстановлению, экономя ресурсы и упрощая код.
#Java #middle #Reactor #doOnError #onErrorMap
👍2