Java for Beginner
745 subscribers
714 photos
201 videos
12 files
1.16K links
Канал от новичков для новичков!
Изучайте Java вместе с нами!
Здесь мы обмениваемся опытом и постоянно изучаем что-то новое!

Наш YouTube канал - https://www.youtube.com/@Java_Beginner-Dev

Наш канал на RUTube - https://rutube.ru/channel/37896292/
Download Telegram
Реактивное программирование

Обработка ошибок в реактивных стримах

Представьте ошибки как пороги в потоке событий: без обработки они останавливают течение, но с правильными операторами — поток продолжается, минимизируя простои. Это решает проблемы из первого поста: вместо жёстких сбоев в потоках или Future, где ошибка рушит всё, реактивный подход даёт контроль и устойчивость.

Обработка ошибок в Reactor строится на жизненном цикле: когда исключение возникает в потоке (в map, flatMap или источнике), срабатывает onError, прерывая onNext и onComplete. Но вместо того чтобы "падать", вы можете трансформировать ошибку в данные, повторить попытку или логировать.
Операторы — декларативные: добавляете в цепочку, и Reactor управляет асинхронностью, backpressure и распространением ошибок. Это делает код resilient (устойчивым): приложение не крашится, а адаптируется.


Базовая реакция: doOnError и onErrorMap

Сначала — простые операторы для наблюдения и модификации ошибок, без изменения потока.

- doOnError: дополнительная реакция на ошибку, как "хук" (зацепка). Полезен для логирования или метрик, не влияет на основной onError.
-
onErrorMap: преобразует исключение в другое, для кастомизации (например, оборачивает в бизнес-ошибку).

Пример на Flux:
import reactor.core.publisher.Flux;
import java.io.IOException;

Flux<String> riskyFlux = Flux.just("данные1", "данные2").map(data -> {
if (data.equals("данные2")) throw new IOException("Сбой ввода-вывода");
return data.toUpperCase();
});

riskyFlux
.doOnError(e -> System.err.println("Лог: " + e.getMessage())) // Логируем
.onErrorMap(e -> new RuntimeException("Обёрнутая ошибка: " + e)) // Преобразуем
.subscribe(
System.out::println,
error -> System.err.println("Финальная ошибка: " + error) // onError в подписке
);

// Вывод: "ДАННЫЕ1", потом лог "Сбой ввода-вывода", и финальная "Обёрнутая ошибка: ..."

Здесь doOnError срабатывает перед onErrorMap, а подписка ловит модифицированную ошибку. Это асинхронно: если ошибка в асинхронном подпотоке (flatMap), Reactor передаёт её downstream (дальше по цепи) без блокировок.


На Mono: аналогично, но для одиночного элемента.

Эти операторы решают проблему традиционных try-catch: вместо разбросанных блоков, всё в конвейере, читаемо и централизовано.


Восстановление: onErrorReturn и onErrorResume

Когда ошибка — не конец света, используйте fallback.

- onErrorReturn: возвращает фиксированное значение вместо ошибки. Простой запасной вариант.
- onErrorResume: более гибкий — заменяет ошибку новым Publisher (Mono/Flux). Можно генерировать динамически, в зависимости от исключения.


Пример с onErrorReturn:
Mono<String> httpMono = Mono.fromCallable(() -> {
// Симулируем HTTP-запрос
throw new RuntimeException("Сервер не отвечает");
}).onErrorReturn("Кэшированные данные");

httpMono.subscribe(System.out::println); // Вывод: "Кэшированные данные", потом onComplete

Здесь ошибка преобразуется в значение, поток завершается успешно.


С onErrorResume — условно:
Flux<Integer> calcFlux = Flux.range(1, 5).map(i -> {
if (i == 3) throw new ArithmeticException("Деление на ноль");
return 10 / (i - 3); // Симуляция
}).onErrorResume(e -> {
if (e instanceof ArithmeticException) {
return Flux.just(0, 0); // Fallback на нули
} else {
return Flux.error(e); // Пропустить другие ошибки
}
});

calcFlux.subscribe(System.out::println); // Вывод: элементы до ошибки, потом 0, 0, onComplete


onErrorResume позволяет ветвление: проверь тип ошибки и верни альтернативный поток. Это асинхронно: если fallback — Mono.delay, оно подождёт без блокировки.

Почему лучше CompletableFuture.handle?
Нет вложенных колбэков — цепочка линейна, ошибки интегрированы в конвейер.



#Java #middle #Reactor #doOnError #onErrorMap
👍1
Retry: повтор попыток при ошибке

Retry — для transient (временных) ошибок: сеть, временный сбой. Повторяет upstream (источник) заданное число раз.

- retry(long times): простая повторка.
- retryWhen(Retry strategy): с кастомной логикой (backoff — задержка, условия).


Пример базовый:

Mono<String> flakyMono = Mono.defer(() -> {
if (Math.random() > 0.3) throw new RuntimeException("Временный сбой");
return Mono.just("Успех");
}).retry(3); // Повторить 3 раза

flakyMono.subscribe(System.out::println, Throwable::printStackTrace);

Здесь retry повторяет весь Mono при ошибке, до успеха или исчерпания попыток. Если все попытки fail — финальный onError.


С retryWhen для экспоненциальной задержки (backoff):
import reactor.util.retry.Retry;

flakyMono.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 3 попытки с задержкой 1с, 2с, 4с


Это предотвращает "молоток" (hammering) сервера: ждёт перед retry. Асинхронно: задержки не блокируют поток.

Retry решает боли блокировок: вместо цикла с sleep в традиционном коде, декларативно и эффективно.



Практические советы и подводные камни

- Комбинируйте: riskyFlux.doOnError(log).onErrorResume(fallback).retry(2) — лог + retry + fallback.
- Условия: в onErrorResume используйте instanceof для типов ошибок, чтобы не catch всё подряд.
- Глобально: используйте Hooks.onErrorDropped для непойманных ошибок (редко).
- Камень: retry на бесконечных потоках — вечный цикл; добавьте timeout() или maxAttempts.
- Тестирование: StepVerifier.create(flux).expectErrorMatches(e -> e instanceof IOException).verify();

В практике: в WebFlux — контроллер возвращает Mono с retry для внешних API, onErrorReturn для кэша.
плавному восстановлению, экономя ресурсы и упрощая код.



#Java #middle #Reactor #doOnError #onErrorMap
👍1