Жизненный цикл: этапы от старта до финиша
Жизненный цикл в Reactive Streams — последовательность вызовов: onSubscribe → (onNext)* → (onError | onComplete). Это правило: после onError или onComplete ничего не будет, и поток считается завершённым.
onSubscribe(Subscription s): первый вызов после subscribe(). Здесь подписчик получает Subscription для контроля (request(n) для backpressure или cancel()). Без request() данные не потекут — это защита от перегрузки.
onNext(T item): для каждого элемента. Здесь основная логика: обработка, логирование, трансформация. Может вызываться много раз (в Flux) или раз/никогда (в Mono).
Важно: держите onNext быстрым и неблокирующим — если медленный, перенесите на отдельный планировщик (Schedulers).
onError(Throwable t): если ошибка (исключение). Поток прерывается, onComplete не сработает. Обработайте, чтобы не потерять: логируйте, retry (повторите) или fallback (запасной вариант).
onComplete(): успешное завершение. Нет элементов после, но сигнал, что всё ок.
Пример полного цикла с кастомным подписчиком (BaseSubscriber в Reactor упрощает):
Обработка ошибок и завершения: стратегии для устойчивости
Ошибки — часть жизни: сеть упала, БД не ответила. В Reactor onError — ваш щит. Не игнорируйте: используйте операторы для восстановления.
doOnError(Consumer<Throwable>): дополнительная реакция перед onError подписчика.
onErrorReturn(T value): fallback — верни значение вместо ошибки.
onErrorResume(Function<Throwable, Publisher<T>>): замени на другой поток.
retry(long times): повтори попытку.
Пример с восстановлением:
Практические советы и подводные камни
Всегда реализуйте все методы в Subscriber: иначе дефолтные могут "проглотить" ошибки (onError кидает RuntimeException, если не переопределён).
Используйте Hooks: doOnSubscribe, doOnNext для логирования без изменения потока.
Отмена: dispose() не гарантирует мгновенную остановку — upstream (источник) может продолжить, но данные не дойдут.
Камень: в onNext избегайте блокировок (sleep, IO) — используйте publishOn(Schedulers.boundedElastic()) для переноса.
Тестирование: StepVerifier.create(flux).expectSubscription().expectNext(1,2).expectError().verify(); — проверяет цикл.
В практике: в WebFlux сервис возвращает Flux, клиент subscribe() в контроллере — реакции на события в реальном времени.
#Java #middle #Reactor #Reactive_Streams_API #onNext #onError #onComplet
Жизненный цикл в Reactive Streams — последовательность вызовов: onSubscribe → (onNext)* → (onError | onComplete). Это правило: после onError или onComplete ничего не будет, и поток считается завершённым.
onSubscribe(Subscription s): первый вызов после subscribe(). Здесь подписчик получает Subscription для контроля (request(n) для backpressure или cancel()). Без request() данные не потекут — это защита от перегрузки.
onNext(T item): для каждого элемента. Здесь основная логика: обработка, логирование, трансформация. Может вызываться много раз (в Flux) или раз/никогда (в Mono).
Важно: держите onNext быстрым и неблокирующим — если медленный, перенесите на отдельный планировщик (Schedulers).
onError(Throwable t): если ошибка (исключение). Поток прерывается, onComplete не сработает. Обработайте, чтобы не потерять: логируйте, retry (повторите) или fallback (запасной вариант).
onComplete(): успешное завершение. Нет элементов после, но сигнал, что всё ок.
Пример полного цикла с кастомным подписчиком (BaseSubscriber в Reactor упрощает):
import reactor.core.publisher.BaseSubscriber;
Flux<Integer> numbersFlux = Flux.range(1, 10).map(i -> {
if (i == 5) throw new RuntimeException("Ошибка на 5"); // Симулируем ошибку
return i;
});
numbersFlux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Подписка готова");
request(3); // Запрашиваем первые 3
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Элемент: " + value);
request(1); // Запрашиваем по одному дальше
}
@Override
protected void hookOnError(Throwable throwable) {
System.err.println("Ошибка: " + throwable.getMessage());
}
@Override
protected void hookOnComplete() {
System.out.println("Цикл завершён");
}
});
Вывод: "Подписка готова", "Элемент: 1", "Элемент: 2", "Элемент: 3", "Элемент: 4", "Ошибка: Ошибка на 5". onComplete не сработает, потому что ошибка. Без ошибки — все 10 элементов и "Цикл завершён".
Это демонстрирует контроль: request() управляет темпом, как в backpressure (пост 5). Если не request() — только "Подписка готова".
Обработка ошибок и завершения: стратегии для устойчивости
Ошибки — часть жизни: сеть упала, БД не ответила. В Reactor onError — ваш щит. Не игнорируйте: используйте операторы для восстановления.
doOnError(Consumer<Throwable>): дополнительная реакция перед onError подписчика.
onErrorReturn(T value): fallback — верни значение вместо ошибки.
onErrorResume(Function<Throwable, Publisher<T>>): замени на другой поток.
retry(long times): повтори попытку.
Пример с восстановлением:
Mono<String> riskyMono = Mono.fromCallable(() -> {
if (Math.random() > 0.5) throw new RuntimeException("Сбой");
return "Успех";
}).onErrorReturn("Fallback").retry(2); // Retry 2 раза
riskyMono.subscribe(
System.out::println,
error -> System.out.println("Не удалось: " + error),
() -> System.out.println("Завершено")
);
Если сбой — retry, потом fallback. onComplete сработает только при успехе или fallback.
Для onComplete: используйте doFinally(Runnable) — сработает всегда, после onComplete или onError. Полезно для закрытия ресурсов: doFinally(() -> connection.close()).
Практические советы и подводные камни
Всегда реализуйте все методы в Subscriber: иначе дефолтные могут "проглотить" ошибки (onError кидает RuntimeException, если не переопределён).
Используйте Hooks: doOnSubscribe, doOnNext для логирования без изменения потока.
Отмена: dispose() не гарантирует мгновенную остановку — upstream (источник) может продолжить, но данные не дойдут.
Камень: в onNext избегайте блокировок (sleep, IO) — используйте publishOn(Schedulers.boundedElastic()) для переноса.
Тестирование: StepVerifier.create(flux).expectSubscription().expectNext(1,2).expectError().verify(); — проверяет цикл.
В практике: в WebFlux сервис возвращает Flux, клиент subscribe() в контроллере — реакции на события в реальном времени.
#Java #middle #Reactor #Reactive_Streams_API #onNext #onError #onComplet
👍3
Реактивное программирование
Базовые операторы в Reactor: map, filter, flatMap
Операторы — это методы на Mono/Flux, которые позволяют строить конвейеры: преобразовывать, фильтровать и комбинировать данные асинхронно. Представьте их как звенья в цепи: каждый берёт входной поток, меняет его и передаёт дальше. Сегодня разберём три фундаментальных: map (преобразование элементов), filter (фильтрация) и flatMap (плоское преобразование, для слияния подпотоков). Эти операторы — основа для сложных сценариев, они решают проблемы из первого поста, позволяя писать декларативный код вместо ручных циклов и ожиданий.
Операторы в Reactor — декларативные: вы описываете, что делать с данными, а библиотека заботится об асинхронности, backpressure и ошибках. Они не меняют исходный поток (иммутабельны), а создают новый. Это делает код читаемым и тестируемым.
Map: простое преобразование элементов
Map — оператор для изменения каждого элемента потока. Он берёт входной элемент, применяет функцию и выдаёт результат. Синхронный: функция должна быть быстрой и без блокировок. Идеален для конвертации типов, вычислений или форматирования.
Пример на Flux:
На Mono:
Почему map полезен? В традиционных подходах (как в CompletableFuture.thenApply) вы строите цепочки, но рискуете вложенностью. В Reactor map делает конвейер линейным: читается как последовательный код, но работает асинхронно. Поддерживает backpressure: если подписчик запрашивает n, map передаёт запрос upstream (источнику).
Filter: отбор элементов по условию
Filter — для пропуска только нужных элементов. Принимает предикат (функцию, возвращающую true/false) и пропускает те, для которых true. Остальные игнорируются — поток "сужается".
Пример на Flux:
На Mono:
Filter экономит ресурсы: ненужные элементы не обрабатываются дальше в цепи. В отличие от императивных циклов (где вы фильтруете в for с if), здесь всё асинхронно и с backpressure — запросы передаются источнику только для прошедших элементов.
Комбинация с map: numbers.filter(num -> num > 5).map(num -> num * 10).subscribe(); // 60, 70, 80, 90, 100
Это строит конвейер: фильтр → преобразование, без ручных переменных.
#Java #middle #Reactor #map #filter #flatMap
Базовые операторы в Reactor: map, filter, flatMap
Операторы — это методы на Mono/Flux, которые позволяют строить конвейеры: преобразовывать, фильтровать и комбинировать данные асинхронно. Представьте их как звенья в цепи: каждый берёт входной поток, меняет его и передаёт дальше. Сегодня разберём три фундаментальных: map (преобразование элементов), filter (фильтрация) и flatMap (плоское преобразование, для слияния подпотоков). Эти операторы — основа для сложных сценариев, они решают проблемы из первого поста, позволяя писать декларативный код вместо ручных циклов и ожиданий.
Операторы в Reactor — декларативные: вы описываете, что делать с данными, а библиотека заботится об асинхронности, backpressure и ошибках. Они не меняют исходный поток (иммутабельны), а создают новый. Это делает код читаемым и тестируемым.
Map: простое преобразование элементов
Map — оператор для изменения каждого элемента потока. Он берёт входной элемент, применяет функцию и выдаёт результат. Синхронный: функция должна быть быстрой и без блокировок. Идеален для конвертации типов, вычислений или форматирования.
Пример на Flux:
import reactor.core.publisher.Flux;
Flux<String> originalFlux = Flux.just("яблоко", "банан", "вишня");
Flux<String> transformed = originalFlux.map(fruit -> fruit.toUpperCase()); // Преобразование в верхний регистр
transformed.subscribe(System.out::println); // Вывод: "ЯБЛОКО", "БАНАН", "ВИШНЯ"
Здесь map применяет лямбду к каждому элементу последовательно. Если ошибка в функции — сработает onError.
На Mono:
Mono<Integer> num = Mono.just(5).map(x -> x * 2); // Результат: 10
Почему map полезен? В традиционных подходах (как в CompletableFuture.thenApply) вы строите цепочки, но рискуете вложенностью. В Reactor map делает конвейер линейным: читается как последовательный код, но работает асинхронно. Поддерживает backpressure: если подписчик запрашивает n, map передаёт запрос upstream (источнику).
Filter: отбор элементов по условию
Filter — для пропуска только нужных элементов. Принимает предикат (функцию, возвращающую true/false) и пропускает те, для которых true. Остальные игнорируются — поток "сужается".
Пример на Flux:
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evenNumbers = numbers.filter(num -> num % 2 == 0); // Только чётные
evenNumbers.subscribe(System.out::println); // Вывод: 2, 4, 6, 8, 10
Если поток пустой или ничего не проходит — onComplete сработает без onNext.
На Mono:
Mono<String> word = Mono.just("привет").filter(w -> w.length() > 7); // Не пройдёт — пустой Mono
Filter экономит ресурсы: ненужные элементы не обрабатываются дальше в цепи. В отличие от императивных циклов (где вы фильтруете в for с if), здесь всё асинхронно и с backpressure — запросы передаются источнику только для прошедших элементов.
Комбинация с map: numbers.filter(num -> num > 5).map(num -> num * 10).subscribe(); // 60, 70, 80, 90, 100
Это строит конвейер: фильтр → преобразование, без ручных переменных.
#Java #middle #Reactor #map #filter #flatMap
👍3
FlatMap: плоское преобразование для асинхронных подпотоков
FlatMap — мощный оператор для случаев, когда из одного элемента нужно создать подпоток (Publisher), и слить их в плоский результат. Это как map, но для асинхронных или множественных выходов: он "разворачивает" вложенные потоки. Полезен для запросов в цикле: например, для каждого пользователя — асинхронно запросить данные.
Пример на Flux:
Асинхронный пример: симулируем API-запросы.
Почему flatMap решает проблемы? В традиционных подходах (циклы с Future) вы ждёте каждый запрос, блокируя. Здесь — асинхронное слияние, без ожиданий и callback-ада: цепочка читаема.
Практические советы и подводные камни
Читаемость: цепочки операторов пишите по строкам для ясности: flux.filter(...).map(...).flatMap(...);
Ошибки: если в map/flatMap исключение — onError. Используйте handle() для условной обработки.
Производительность: в flatMap устанавливайте concurrency (default 256) для контроля параллелизма: flatMap(func, 4) — max 4 подпотока одновременно.
Камень: блокирующий код в лямбдах — сломает асинхронность. Для IO — используйте flatMap с Mono.fromCallable и publishOn(Schedulers.boundedElastic()).
Тестирование: StepVerifier.create(flux.map(...)).expectNext("ЯБЛОКО").verifyComplete();
#Java #middle #Reactor #map #filter #flatMap
FlatMap — мощный оператор для случаев, когда из одного элемента нужно создать подпоток (Publisher), и слить их в плоский результат. Это как map, но для асинхронных или множественных выходов: он "разворачивает" вложенные потоки. Полезен для запросов в цикле: например, для каждого пользователя — асинхронно запросить данные.
Пример на Flux:
Flux<String> fruits = Flux.just("яблоко", "банан");
Flux<Character> letters = fruits.flatMap(fruit -> Flux.fromArray(fruit.toCharArray())); // Из строки — поток символов
letters.subscribe(System.out::println); // Вывод: я, б, л, о, к, о, б, а, н, а, н (в возможном перемешанном порядке, если асинхронно)
Здесь flatMap берёт строку, создаёт Flux из символов и сливает всё в один поток. В отличие от map (который вернул бы Flux<Flux<Character>> — вложенный), flatMap "сплющивает".
Асинхронный пример: симулируем API-запросы.
import java.time.Duration;
Flux<String> users = Flux.just("user1", "user2");
Flux<String> data = users.flatMap(user -> Mono.just("Данные для " + user).delayElement(Duration.ofSeconds(1))); // Асинхронный подпоток с задержкой
data.subscribe(System.out::println); // Вывод через секунды: "Данные для user1", "Данные для user2" (параллельно, если scheduler позволяет)
FlatMap уважает backpressure: запрашивает у подпотоков по мере нужды. Но осторожно: если подпотоки бесконечные — рискуете перегрузкой. Параметр concurrency (flatMap(func, concurrency)) ограничивает параллелизм.
Почему flatMap решает проблемы? В традиционных подходах (циклы с Future) вы ждёте каждый запрос, блокируя. Здесь — асинхронное слияние, без ожиданий и callback-ада: цепочка читаема.
Практические советы и подводные камни
Читаемость: цепочки операторов пишите по строкам для ясности: flux.filter(...).map(...).flatMap(...);
Ошибки: если в map/flatMap исключение — onError. Используйте handle() для условной обработки.
Производительность: в flatMap устанавливайте concurrency (default 256) для контроля параллелизма: flatMap(func, 4) — max 4 подпотока одновременно.
Камень: блокирующий код в лямбдах — сломает асинхронность. Для IO — используйте flatMap с Mono.fromCallable и publishOn(Schedulers.boundedElastic()).
Тестирование: StepVerifier.create(flux.map(...)).expectNext("ЯБЛОКО").verifyComplete();
#Java #middle #Reactor #map #filter #flatMap
👍3
Реактивное программирование
Комбинации потоков в Reactor: concat, merge и другие
Комбинации потоков — это как сборка пазла: вы берёте отдельные потоки событий и сливаете в один, управляя порядком, параллелизмом и обработкой. Операторы уважают жизненный цикл и обратное давление: если подписчик не успевает, запросы распределяются по источникам. Это делает системы масштабируемыми — под нагрузкой не тонут в очередях.
Concat: последовательное объединение потоков
Concat — оператор для слияния потоков по очереди: сначала все элементы первого, потом второго и так далее. Он ждёт завершения предыдущего (onComplete), прежде чем перейти к следующему. Идеален, когда порядок важен и параллелизм не нужен: например, загрузка данных по шагам.
Пример на Flux:
На Mono:
Почему concat полезен? В традиционных подходах (thenCompose в CompletableFuture) вы пишете цепочки вручную, рискуя callback-адом. Здесь — декларативно, с автоматическим backpressure: запросы идут к текущему потоку. Минус: медленный, если источники асинхронные — ждёт завершения.
Вариант: concatWith(other) на одном Flux для добавления.
Merge: параллельное слияние по готовности
Merge — для объединения потоков параллельно: элементы выдаются по мере готовности, без ожидания завершения. Порядок не гарантирован — зависит от скорости источников. Идеален для независимых событий: например, слияние логов из сервисов.
Пример:
На Mono: merge работает с Mono как с Flux'ом одного элемента.
Почему merge лучше потоков? В старых моделях (join в Executor) вы ждёте всех, блокируя. Здесь — асинхронно, с backpressure: merge распределяет запросы по источникам пропорционально. Параметр: merge(func, concurrency) для ограничения параллелизма.
Вариант: mergeWith(other) на одном Flux.
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
Комбинации потоков в Reactor: concat, merge и другие
Комбинации потоков — это как сборка пазла: вы берёте отдельные потоки событий и сливаете в один, управляя порядком, параллелизмом и обработкой. Операторы уважают жизненный цикл и обратное давление: если подписчик не успевает, запросы распределяются по источникам. Это делает системы масштабируемыми — под нагрузкой не тонут в очередях.
Concat: последовательное объединение потоков
Concat — оператор для слияния потоков по очереди: сначала все элементы первого, потом второго и так далее. Он ждёт завершения предыдущего (onComplete), прежде чем перейти к следующему. Идеален, когда порядок важен и параллелизм не нужен: например, загрузка данных по шагам.
Пример на Flux:
import reactor.core.publisher.Flux;
import java.time.Duration;
Flux<String> first = Flux.just("Шаг 1a", "Шаг 1b").delayElements(Duration.ofSeconds(1)); // Задержка для симуляции
Flux<String> second = Flux.just("Шаг 2a", "Шаг 2b");
Flux<String> combined = Flux.concat(first, second);
combined.subscribe(System.out::println); // Вывод: "Шаг 1a" (через 1с), "Шаг 1b" (ещё 1с), "Шаг 2a", "Шаг 2b"
Здесь concat гарантирует последовательность: второй Flux стартует только после onComplete первого. Если ошибка в первом — весь поток прервётся onError.
На Mono:
Mono<String> m1 = Mono.just("A").delayElement(Duration.ofSeconds(1));
Mono<String> m2 = Mono.just("B");
Flux<String> seq = Flux.concat(m1, m2); // Mono как Flux с одним элементом
Почему concat полезен? В традиционных подходах (thenCompose в CompletableFuture) вы пишете цепочки вручную, рискуя callback-адом. Здесь — декларативно, с автоматическим backpressure: запросы идут к текущему потоку. Минус: медленный, если источники асинхронные — ждёт завершения.
Вариант: concatWith(other) на одном Flux для добавления.
Merge: параллельное слияние по готовности
Merge — для объединения потоков параллельно: элементы выдаются по мере готовности, без ожидания завершения. Порядок не гарантирован — зависит от скорости источников. Идеален для независимых событий: например, слияние логов из сервисов.
Пример:
Flux<String> slow = Flux.just("Медленный 1", "Медленный 2").delayElements(Duration.ofSeconds(2));
Flux<String> fast = Flux.just("Быстрый A", "Быстрый B").delayElements(Duration.ofMillis(500));
Flux<String> merged = Flux.merge(slow, fast);
merged.subscribe(System.out::println); // Возможный вывод: "Быстрый A" (0.5с), "Быстрый B" (ещё 0.5с), "Медленный 1" (2с), "Медленный 2" (ещё 2с)
Здесь merge отдает элементы, как только они готовы — параллельно. Если ошибка в одном — весь поток onError (по умолчанию), но можно настроить.
На Mono: merge работает с Mono как с Flux'ом одного элемента.
Почему merge лучше потоков? В старых моделях (join в Executor) вы ждёте всех, блокируя. Здесь — асинхронно, с backpressure: merge распределяет запросы по источникам пропорционально. Параметр: merge(func, concurrency) для ограничения параллелизма.
Вариант: mergeWith(other) на одном Flux.
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
👍2
Zip: попарная комбинация элементов
Zip — объединяет элементы из потоков попарно: берёт первый от первого, первый от второго и т.д., применяя функцию для слияния. Завершается, когда любой поток исчерпан. Идеален для синхронизации: например, zip координат X и Y в точки.
Пример:
Почему zip решает проблемы? Вместо ручных семафоров или ожиданий в циклах, декларативно комбинируете асинхронные источники. Backpressure: запрашивает у всех равномерно.
CombineLatest: комбинация последних элементов
CombineLatest — выдаёт комбинацию последних элементов от каждого потока, как только любой обновляется. Не ждёт пар — всегда использует свежие. Идеален для реального времени: например, комбинация курсов валют.
Пример:
В отличие от zip (строгие пары), здесь — динамика. Backpressure: как в merge.
Другие комбинации: withLatestFrom и concatMap
WithLatestFrom: похож на combineLatest, но "master"-поток (основной) триггерит выдачу, беря последние из второстепенных. Пример: flux.withLatestFrom(other, (main, other) -> main + other).
ConcatMap: как flatMap, но последовательный (как concat внутри). Для orderly асинхронных подпотоков.
Эти дополняют: выбирайте по сценарию — последовательность (concat/concatMap), параллелизм (merge/flatMap) или синхронизация (zip/combineLatest).
Практические советы и подводные камни
Ошибки: по умолчанию onError останавливает всё — используйте onErrorResume для продолжения.
Параллелизм: в merge/flatMap устанавливайте prefetch (буфер) или concurrency для тюнинга.
Камень: бесконечные потоки в merge — рискуете OOM; добавьте take() или limitRate().
Тестирование: StepVerifier.create(Flux.merge(f1, f2)).expectNextMatches(...).verify();
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
Zip — объединяет элементы из потоков попарно: берёт первый от первого, первый от второго и т.д., применяя функцию для слияния. Завершается, когда любой поток исчерпан. Идеален для синхронизации: например, zip координат X и Y в точки.
Пример:
Flux<Integer> xCoords = Flux.just(1, 2, 3);
Flux<Integer> yCoords = Flux.just(10, 20, 30, 40); // Лишний элемент игнорируется
Flux<String> points = Flux.zip(xCoords, yCoords, (x, y) -> "(" + x + ", " + y + ")");
points.subscribe(System.out::println); // Вывод: "(1, 10)", "(2, 20)", "(3, 30)"
Здесь zip ждёт пару: если один медленный — задерживает. Для >2 потоков: zip(tuple -> ..., flux1, flux2, flux3).
Почему zip решает проблемы? Вместо ручных семафоров или ожиданий в циклах, декларативно комбинируете асинхронные источники. Backpressure: запрашивает у всех равномерно.
CombineLatest: комбинация последних элементов
CombineLatest — выдаёт комбинацию последних элементов от каждого потока, как только любой обновляется. Не ждёт пар — всегда использует свежие. Идеален для реального времени: например, комбинация курсов валют.
Пример:
Flux<String> stockA = Flux.just("A:100", "A:110").delayElements(Duration.ofSeconds(1));
Flux<String> stockB = Flux.just("B:200").delayElements(Duration.ofSeconds(2));
Flux<String> latest = Flux.combineLatest(stockA, stockB, (a, b) -> a + " + " + b);
latest.subscribe(System.out::println); // Вывод примерно: "A:100 + B:200" (после 2с), "A:110 + B:200" (ещё 1с после)
Здесь combineLatest реагирует на изменения: при обновлении A использует последний B. Для >2: combineLatest(tuple -> ..., fluxes).
В отличие от zip (строгие пары), здесь — динамика. Backpressure: как в merge.
Другие комбинации: withLatestFrom и concatMap
WithLatestFrom: похож на combineLatest, но "master"-поток (основной) триггерит выдачу, беря последние из второстепенных. Пример: flux.withLatestFrom(other, (main, other) -> main + other).
ConcatMap: как flatMap, но последовательный (как concat внутри). Для orderly асинхронных подпотоков.
Эти дополняют: выбирайте по сценарию — последовательность (concat/concatMap), параллелизм (merge/flatMap) или синхронизация (zip/combineLatest).
Практические советы и подводные камни
Ошибки: по умолчанию onError останавливает всё — используйте onErrorResume для продолжения.
Параллелизм: в merge/flatMap устанавливайте prefetch (буфер) или concurrency для тюнинга.
Камень: бесконечные потоки в merge — рискуете OOM; добавьте take() или limitRate().
Тестирование: StepVerifier.create(Flux.merge(f1, f2)).expectNextMatches(...).verify();
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
👍4
Реактивное программирование
Обработка ошибок в реактивных стримах
Представьте ошибки как пороги в потоке событий: без обработки они останавливают течение, но с правильными операторами — поток продолжается, минимизируя простои. Это решает проблемы из первого поста: вместо жёстких сбоев в потоках или Future, где ошибка рушит всё, реактивный подход даёт контроль и устойчивость.
Обработка ошибок в Reactor строится на жизненном цикле: когда исключение возникает в потоке (в map, flatMap или источнике), срабатывает onError, прерывая onNext и onComplete. Но вместо того чтобы "падать", вы можете трансформировать ошибку в данные, повторить попытку или логировать.
Операторы — декларативные: добавляете в цепочку, и Reactor управляет асинхронностью, backpressure и распространением ошибок. Это делает код resilient (устойчивым): приложение не крашится, а адаптируется.
Базовая реакция: doOnError и onErrorMap
Сначала — простые операторы для наблюдения и модификации ошибок, без изменения потока.
- doOnError: дополнительная реакция на ошибку, как "хук" (зацепка). Полезен для логирования или метрик, не влияет на основной onError.
- onErrorMap: преобразует исключение в другое, для кастомизации (например, оборачивает в бизнес-ошибку).
Пример на Flux:
На Mono: аналогично, но для одиночного элемента.
Эти операторы решают проблему традиционных try-catch: вместо разбросанных блоков, всё в конвейере, читаемо и централизовано.
Восстановление: onErrorReturn и onErrorResume
Когда ошибка — не конец света, используйте fallback.
- onErrorReturn: возвращает фиксированное значение вместо ошибки. Простой запасной вариант.
- onErrorResume: более гибкий — заменяет ошибку новым Publisher (Mono/Flux). Можно генерировать динамически, в зависимости от исключения.
Пример с onErrorReturn:
С onErrorResume — условно:
onErrorResume позволяет ветвление: проверь тип ошибки и верни альтернативный поток. Это асинхронно: если fallback — Mono.delay, оно подождёт без блокировки.
Почему лучше CompletableFuture.handle?
Нет вложенных колбэков — цепочка линейна, ошибки интегрированы в конвейер.
#Java #middle #Reactor #doOnError #onErrorMap
Обработка ошибок в реактивных стримах
Представьте ошибки как пороги в потоке событий: без обработки они останавливают течение, но с правильными операторами — поток продолжается, минимизируя простои. Это решает проблемы из первого поста: вместо жёстких сбоев в потоках или Future, где ошибка рушит всё, реактивный подход даёт контроль и устойчивость.
Обработка ошибок в Reactor строится на жизненном цикле: когда исключение возникает в потоке (в map, flatMap или источнике), срабатывает onError, прерывая onNext и onComplete. Но вместо того чтобы "падать", вы можете трансформировать ошибку в данные, повторить попытку или логировать.
Операторы — декларативные: добавляете в цепочку, и Reactor управляет асинхронностью, backpressure и распространением ошибок. Это делает код resilient (устойчивым): приложение не крашится, а адаптируется.
Базовая реакция: doOnError и onErrorMap
Сначала — простые операторы для наблюдения и модификации ошибок, без изменения потока.
- doOnError: дополнительная реакция на ошибку, как "хук" (зацепка). Полезен для логирования или метрик, не влияет на основной onError.
- onErrorMap: преобразует исключение в другое, для кастомизации (например, оборачивает в бизнес-ошибку).
Пример на Flux:
import reactor.core.publisher.Flux;
import java.io.IOException;
Flux<String> riskyFlux = Flux.just("данные1", "данные2").map(data -> {
if (data.equals("данные2")) throw new IOException("Сбой ввода-вывода");
return data.toUpperCase();
});
riskyFlux
.doOnError(e -> System.err.println("Лог: " + e.getMessage())) // Логируем
.onErrorMap(e -> new RuntimeException("Обёрнутая ошибка: " + e)) // Преобразуем
.subscribe(
System.out::println,
error -> System.err.println("Финальная ошибка: " + error) // onError в подписке
);
// Вывод: "ДАННЫЕ1", потом лог "Сбой ввода-вывода", и финальная "Обёрнутая ошибка: ..."
Здесь doOnError срабатывает перед onErrorMap, а подписка ловит модифицированную ошибку. Это асинхронно: если ошибка в асинхронном подпотоке (flatMap), Reactor передаёт её downstream (дальше по цепи) без блокировок.
На Mono: аналогично, но для одиночного элемента.
Эти операторы решают проблему традиционных try-catch: вместо разбросанных блоков, всё в конвейере, читаемо и централизовано.
Восстановление: onErrorReturn и onErrorResume
Когда ошибка — не конец света, используйте fallback.
- onErrorReturn: возвращает фиксированное значение вместо ошибки. Простой запасной вариант.
- onErrorResume: более гибкий — заменяет ошибку новым Publisher (Mono/Flux). Можно генерировать динамически, в зависимости от исключения.
Пример с onErrorReturn:
Mono<String> httpMono = Mono.fromCallable(() -> {
// Симулируем HTTP-запрос
throw new RuntimeException("Сервер не отвечает");
}).onErrorReturn("Кэшированные данные");
httpMono.subscribe(System.out::println); // Вывод: "Кэшированные данные", потом onComplete
Здесь ошибка преобразуется в значение, поток завершается успешно.
С onErrorResume — условно:
Flux<Integer> calcFlux = Flux.range(1, 5).map(i -> {
if (i == 3) throw new ArithmeticException("Деление на ноль");
return 10 / (i - 3); // Симуляция
}).onErrorResume(e -> {
if (e instanceof ArithmeticException) {
return Flux.just(0, 0); // Fallback на нули
} else {
return Flux.error(e); // Пропустить другие ошибки
}
});
calcFlux.subscribe(System.out::println); // Вывод: элементы до ошибки, потом 0, 0, onComplete
onErrorResume позволяет ветвление: проверь тип ошибки и верни альтернативный поток. Это асинхронно: если fallback — Mono.delay, оно подождёт без блокировки.
Почему лучше CompletableFuture.handle?
Нет вложенных колбэков — цепочка линейна, ошибки интегрированы в конвейер.
#Java #middle #Reactor #doOnError #onErrorMap
👍3
Retry: повтор попыток при ошибке
Retry — для transient (временных) ошибок: сеть, временный сбой. Повторяет upstream (источник) заданное число раз.
- retry(long times): простая повторка.
- retryWhen(Retry strategy): с кастомной логикой (backoff — задержка, условия).
Пример базовый:
С retryWhen для экспоненциальной задержки (backoff):
Это предотвращает "молоток" (hammering) сервера: ждёт перед retry. Асинхронно: задержки не блокируют поток.
Retry решает боли блокировок: вместо цикла с sleep в традиционном коде, декларативно и эффективно.
Практические советы и подводные камни
- Комбинируйте: riskyFlux.doOnError(log).onErrorResume(fallback).retry(2) — лог + retry + fallback.
- Условия: в onErrorResume используйте instanceof для типов ошибок, чтобы не catch всё подряд.
- Глобально: используйте Hooks.onErrorDropped для непойманных ошибок (редко).
- Камень: retry на бесконечных потоках — вечный цикл; добавьте timeout() или maxAttempts.
- Тестирование: StepVerifier.create(flux).expectErrorMatches(e -> e instanceof IOException).verify();
В практике: в WebFlux — контроллер возвращает Mono с retry для внешних API, onErrorReturn для кэша.
плавному восстановлению, экономя ресурсы и упрощая код.
#Java #middle #Reactor #doOnError #onErrorMap
Retry — для transient (временных) ошибок: сеть, временный сбой. Повторяет upstream (источник) заданное число раз.
- retry(long times): простая повторка.
- retryWhen(Retry strategy): с кастомной логикой (backoff — задержка, условия).
Пример базовый:
Mono<String> flakyMono = Mono.defer(() -> {
if (Math.random() > 0.3) throw new RuntimeException("Временный сбой");
return Mono.just("Успех");
}).retry(3); // Повторить 3 раза
flakyMono.subscribe(System.out::println, Throwable::printStackTrace);
Здесь retry повторяет весь Mono при ошибке, до успеха или исчерпания попыток. Если все попытки fail — финальный onError.
С retryWhen для экспоненциальной задержки (backoff):
import reactor.util.retry.Retry;
flakyMono.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 3 попытки с задержкой 1с, 2с, 4с
Это предотвращает "молоток" (hammering) сервера: ждёт перед retry. Асинхронно: задержки не блокируют поток.
Retry решает боли блокировок: вместо цикла с sleep в традиционном коде, декларативно и эффективно.
Практические советы и подводные камни
- Комбинируйте: riskyFlux.doOnError(log).onErrorResume(fallback).retry(2) — лог + retry + fallback.
- Условия: в onErrorResume используйте instanceof для типов ошибок, чтобы не catch всё подряд.
- Глобально: используйте Hooks.onErrorDropped для непойманных ошибок (редко).
- Камень: retry на бесконечных потоках — вечный цикл; добавьте timeout() или maxAttempts.
- Тестирование: StepVerifier.create(flux).expectErrorMatches(e -> e instanceof IOException).verify();
В практике: в WebFlux — контроллер возвращает Mono с retry для внешних API, onErrorReturn для кэша.
плавному восстановлению, экономя ресурсы и упрощая код.
#Java #middle #Reactor #doOnError #onErrorMap
👍2
Реактивное программирование
Управление потоками в Reactor: Schedulers
Schedulers — часть Reactor Core, они интегрируют с Java's ExecutorService, но абстрагируют детали: вы выбираете тип, а Reactor управляет жизненным циклом. По умолчанию Reactor использует Schedulers.parallel() для вычислений, но вы можете переключать с помощью операторов publishOn (для downstream) и subscribeOn (для upstream). Это даёт контроль: CPU-bound задачи на параллельных потоках, IO-bound — на эластичных. Давайте разберём типы schedulers, операторы и примеры — пробуйте в коде с reactor-core.
Типы Schedulers: выбор под задачу
Reactor предоставляет готовые планировщики для разных сценариев. Каждый — фабрика потоков с настройками (размер пула, демонизация).
- parallel(): Для CPU-bound задач (расчёты). Фиксированный пул = кол-во CPU ядер. Не демоны — ждёт завершения. Идеален для map с тяжёлыми вычислениями.
- boundedElastic(): Для IO-bound (сеть, файлы, БД) с блокирующими операциями. Эластичный пул: до 10*CPU, но переиспользует. Демоны — приложение не ждёт. Полезен, если в лямбде "грязный" код (sleep, blocking IO).
- single(): Один поток для всех задач. Для последовательных операций, где параллелизм не нужен (тесты, простые цепочки).
- immediate(): Выполняет на текущем потоке. Для синхронных тестов или когда асинхронность не требуется.
- fromExecutor(Executor): Кастомный из вашего пула.
Пример создания:
Операторы для переключения: publishOn и subscribeOn
- publishOn(Scheduler): Переключает downstream (операции после него) на указанный scheduler. Влияет на onNext, map, filter и т.д. после точки.
- subscribeOn(Scheduler): Переключает upstream (источник и подписку) на scheduler. Влияет на весь поток от subscribe().
Пример комбинации:
Почему важно? Без schedulers все на default (parallel), но IO может заблокировать CPU-потоки. PublishOn изолирует блокирующие части, subscribeOn — для источников (например, blocking чтение файла на отдельном потоке).
Управление контекстом и параллелизмом
Schedulers помогают с контекстом: в Reactor есть Context для хранения данных (как ThreadLocal, но реактивный). Операторы как withScheduler сохраняют его.
Для параллелизма: в flatMap добавьте concurrency: flatMap(func, concurrency, prefetch), где concurrency — max параллельных подпотоков, prefetch — буфер.
Пример с параллелизмом:
#Java #middle #Reactor #WebClient #Schedulers
Управление потоками в Reactor: Schedulers
Schedulers — часть Reactor Core, они интегрируют с Java's ExecutorService, но абстрагируют детали: вы выбираете тип, а Reactor управляет жизненным циклом. По умолчанию Reactor использует Schedulers.parallel() для вычислений, но вы можете переключать с помощью операторов publishOn (для downstream) и subscribeOn (для upstream). Это даёт контроль: CPU-bound задачи на параллельных потоках, IO-bound — на эластичных. Давайте разберём типы schedulers, операторы и примеры — пробуйте в коде с reactor-core.
Типы Schedulers: выбор под задачу
Reactor предоставляет готовые планировщики для разных сценариев. Каждый — фабрика потоков с настройками (размер пула, демонизация).
- parallel(): Для CPU-bound задач (расчёты). Фиксированный пул = кол-во CPU ядер. Не демоны — ждёт завершения. Идеален для map с тяжёлыми вычислениями.
- boundedElastic(): Для IO-bound (сеть, файлы, БД) с блокирующими операциями. Эластичный пул: до 10*CPU, но переиспользует. Демоны — приложение не ждёт. Полезен, если в лямбде "грязный" код (sleep, blocking IO).
- single(): Один поток для всех задач. Для последовательных операций, где параллелизм не нужен (тесты, простые цепочки).
- immediate(): Выполняет на текущем потоке. Для синхронных тестов или когда асинхронность не требуется.
- fromExecutor(Executor): Кастомный из вашего пула.
Пример создания:
import reactor.core.scheduler.Schedulers;
Schedulers.parallel(); // Готов к использованию
Schedulers не создают потоки заранее — ленивые, как Mono/Flux.
Операторы для переключения: publishOn и subscribeOn
- publishOn(Scheduler): Переключает downstream (операции после него) на указанный scheduler. Влияет на onNext, map, filter и т.д. после точки.
- subscribeOn(Scheduler): Переключает upstream (источник и подписку) на scheduler. Влияет на весь поток от subscribe().
Пример комбинации:
import reactor.core.publisher.Flux;
import java.time.Duration;
Flux<Integer> flux = Flux.range(1, 3)
.map(i -> { // На subscribeOn
System.out.println("Map1 на потоке: " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.boundedElastic()) // Переключаем downstream
.delayElements(Duration.ofSeconds(1)) // На boundedElastic
.map(i -> { // На boundedElastic
System.out.println("Map2 на потоке: " + Thread.currentThread().getName());
return i + 1;
});
flux.subscribeOn(Schedulers.single()) // Upstream на single
.subscribe(System.out::println);
Вывод покажет: Map1 на "single-1", delay и Map2 на "boundedElastic-X". Это асинхронно: основной поток не блокируется.
Почему важно? Без schedulers все на default (parallel), но IO может заблокировать CPU-потоки. PublishOn изолирует блокирующие части, subscribeOn — для источников (например, blocking чтение файла на отдельном потоке).
Управление контекстом и параллелизмом
Schedulers помогают с контекстом: в Reactor есть Context для хранения данных (как ThreadLocal, но реактивный). Операторы как withScheduler сохраняют его.
Для параллелизма: в flatMap добавьте concurrency: flatMap(func, concurrency, prefetch), где concurrency — max параллельных подпотоков, prefetch — буфер.
Пример с параллелизмом:
Flux.range(1, 10)
.flatMap(i -> Mono.just(i).delayElement(Duration.ofMillis(100)), 2) // Max 2 параллельно
.subscribeOn(Schedulers.parallel())
.blockLast(); // Для теста, в prod не блокируйте!
Это ограничивает: не 10 задержек сразу, а по 2.
#Java #middle #Reactor #WebClient #Schedulers
👍2
Практические советы и подводные камни
- Диагностика: используйте doOnNext(() -> Thread.currentThread().getName()) для логирования потоков.
- Shutdown: Schedulers.shutdownNow() для cleanup, но в Spring — автоматом.
- проблема: blocking в parallel() — заблокирует все CPU; всегда publishOn(boundedElastic()) перед blocking.
- Оптимизация: для веб — WebFlux использует Schedulers.default() на Netty event-loop (для IO).
- Тестирование: VirtualTimeScheduler для симуляции времени в StepVerifier.
В практике: в микросервисах — subscribeOn(parallel()) для вычислений, publishOn(boundedElastic()) для БД-запросов.
Реактивные API-запросы с WebClient
WebClient — declarative: строите запрос, получаете Mono/Flux с ответом. Под капотом — Netty или HttpClient для IO, с Schedulers.boundedElastic() по умолчанию. Давайте разберём создание, методы (GET/POST), обработку и примеры — добавьте spring-webflux в зависимости и пробуйте.
Создание и базовая настройка WebClient
WebClient.builder() для кастомизации: baseUrl, headers, client (Netty/HttpClient).
Пример:
Реактивные запросы: GET, POST и обработка
- GET: retrieve() возвращает Mono<ResponseEntity> или Flux для стриминга.
- POST: bodyValue() для данных, exchange() для полного контроля.
Пример GET с Mono:
Пример POST с Flux:
Обработка ответов и ошибок
Retrieve() даёт bodyToMono/Flux, onStatus для ошибок (4xx/5xx).
Пример с обработкой:
Практические советы и подводные камни
- Таймауты: client.mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5)))).build();
- HTTPS: автоматом, но настройте trustStore если нужно.
- Тестирование: WebClient с mock (WireMock) и StepVerifier.
В практике: в Spring Boot контроллер вызывает WebClient для агрегации от сервисов — асинхронно, без ожидания.
#Java #middle #Reactor #WebClient #Schedulers
- Диагностика: используйте doOnNext(() -> Thread.currentThread().getName()) для логирования потоков.
- Shutdown: Schedulers.shutdownNow() для cleanup, но в Spring — автоматом.
- проблема: blocking в parallel() — заблокирует все CPU; всегда publishOn(boundedElastic()) перед blocking.
- Оптимизация: для веб — WebFlux использует Schedulers.default() на Netty event-loop (для IO).
- Тестирование: VirtualTimeScheduler для симуляции времени в StepVerifier.
В практике: в микросервисах — subscribeOn(parallel()) для вычислений, publishOn(boundedElastic()) для БД-запросов.
Реактивные API-запросы с WebClient
WebClient — declarative: строите запрос, получаете Mono/Flux с ответом. Под капотом — Netty или HttpClient для IO, с Schedulers.boundedElastic() по умолчанию. Давайте разберём создание, методы (GET/POST), обработку и примеры — добавьте spring-webflux в зависимости и пробуйте.
Создание и базовая настройка WebClient
WebClient.builder() для кастомизации: baseUrl, headers, client (Netty/HttpClient).
Пример:
import org.springframework.web.reactive.function.client.WebClient;
WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader("Authorization", "Bearer token")
.build();
Это готово к запросам. Для кодеков (JSON) — добавьте Jackson или Gson.
Реактивные запросы: GET, POST и обработка
- GET: retrieve() возвращает Mono<ResponseEntity> или Flux для стриминга.
- POST: bodyValue() для данных, exchange() для полного контроля.
Пример GET с Mono:
Mono<String> responseMono = client.get()
.uri("/users/{id}", 1) // URI с параметрами
.retrieve()
.bodyToMono(String.class); // Декодируем в String
responseMono.subscribe(System.out::println); // Асинхронно: ответ придёт позже
Для JSON: bodyToMono(User.class) с Jackson.
Пример POST с Flux:
Flux<User> usersFlux = Flux.just(new User("Alice"), new User("Bob"));
Mono<Void> postMono = client.post()
.uri("/users/batch")
.body(usersFlux, User.class) // Тело как Flux
.retrieve()
.bodyToMono(Void.class);
postMono.subscribe(); // Отправка асинхронно
WebClient уважает backpressure: если сервер шлёт Flux (SSE), клиент запрашивает по мере обработки.
С Schedulers: по умолчанию на event-loop, но publishOn для кастом.
Обработка ответов и ошибок
Retrieve() даёт bodyToMono/Flux, onStatus для ошибок (4xx/5xx).
Пример с обработкой:
client.get()
.uri("/data")
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, resp -> Mono.error(new BadRequestException("Ошибка клиента")))
.bodyToFlux(Data.class)
.doOnError(e -> System.err.println("Сбой: " + e)) // Из поста 10
.retry(3) // Retry на сетевые ошибки
.subscribe(data -> process(data));
Это интегрирует с Reactor: ошибки как onError, retry для flaky API.
Для стриминга: bodyToFlux для больших данных, без загрузки всего в память.
Практические советы и подводные камни
- Таймауты: client.mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5)))).build();
- HTTPS: автоматом, но настройте trustStore если нужно.
- Тестирование: WebClient с mock (WireMock) и StepVerifier.
В практике: в Spring Boot контроллер вызывает WebClient для агрегации от сервисов — асинхронно, без ожидания.
#Java #middle #Reactor #WebClient #Schedulers
👍2
Реактивное программирование
Введение в Spring WebFlux
Spring WebFlux — это часть Spring Framework, предназначенная для создания неблокирующих, асинхронных веб-приложений. Он основан на Project Reactor и Reactive Streams API, что позволяет использовать Mono и Flux прямо в коде сервера. Это не замена Spring MVC (традиционному веб-фреймворку), а альтернатива для сценариев с высокой нагрузкой: микросервисы, реальное время, стриминг. Почему это новый подход? В традиционном веб (как Servlet с потоками на запрос) под пиковой нагрузкой сервер исчерпывает ресурсы — каждый запрос занимает поток, который висит в ожидании IO. WebFlux меняет это: использует event-loop (цикл обработки событий), где один поток обслуживает тысячи подключений, реагируя на события асинхронно. Это экономит CPU и память, делая приложения более отзывчивыми и масштабируемыми.
Почему Spring WebFlux: связь с реактивным мышлением
WebFlux воплощает принципы, которые мы разбирали: push-модель, где данные отправляются по готовности; обратное давление для контроля темпа; операторы для трансформаций. В экосистеме Spring он интегрируется с другими компонентами: Spring Boot для быстрого старта, Spring Data Reactive для баз данных (как R2DBC), Spring Security Reactive для безопасности. Ключевой выигрыш — для IO-bound задач: запросы к БД, API или файлам идут асинхронно, без блокировок. Если ваш сервис ждёт внешних ответов 90% времени, WebFlux высвобождает ресурсы, позволяя обрабатывать в 10-100 раз больше запросов на том же железе.
Чтобы начать: добавьте spring-boot-starter-webflux в зависимости (Maven/Gradle). Spring Boot автоматически настроит Reactor Netty как сервер (неблокирующий HTTP на базе Netty). Нет нужды в Tomcat — всё реактивно.
Ключевые компоненты Spring WebFlux
WebFlux предлагает два стиля разработки: аннотированный (похож на Spring MVC) и функциональный (роутеры как в Express.js). Оба используют Mono/Flux для ответов.
- Аннотированные контроллеры: @RestController с методами, возвращающими Mono или Flux.
Пример простого GET:
- Функциональные роутеры: Для маршрутизации без контроллеров.
Пример:
- WebClient: реактивный клиент для исходящих запросов, интегрируется seamlessly.
Ещё фишки: поддержка WebSockets для bidirectional стриминга, Server-Sent Events для push-уведомлений, интеграция с Schedulers для распределения задач.
Практические советы и подводные камни
- Миграция: Начните с аннотированных контроллеров — синтаксис похож на MVC, но возвращайте Mono/Flux.
- Тестирование: WebTestClient вместо MockMvc — асинхронные тесты с StepVerifier (из Reactor).
- Камень: Блокирующий код в контроллерах (JDBC, sleep) сломает асинхронность — используйте reactive драйверы (R2DBC) и publishOn(Schedulers.boundedElastic()).
- Производительность: Под нагрузкой мониторьте с Micrometer — WebFlux даёт метрики из коробки.
В реальной жизни: Netflix использует похожий стек для стриминга, где WebFlux обрабатывает миллионы подключений.
#Java #middle #Reactor #WebFlux
Введение в Spring WebFlux
Spring WebFlux — это часть Spring Framework, предназначенная для создания неблокирующих, асинхронных веб-приложений. Он основан на Project Reactor и Reactive Streams API, что позволяет использовать Mono и Flux прямо в коде сервера. Это не замена Spring MVC (традиционному веб-фреймворку), а альтернатива для сценариев с высокой нагрузкой: микросервисы, реальное время, стриминг. Почему это новый подход? В традиционном веб (как Servlet с потоками на запрос) под пиковой нагрузкой сервер исчерпывает ресурсы — каждый запрос занимает поток, который висит в ожидании IO. WebFlux меняет это: использует event-loop (цикл обработки событий), где один поток обслуживает тысячи подключений, реагируя на события асинхронно. Это экономит CPU и память, делая приложения более отзывчивыми и масштабируемыми.
Почему Spring WebFlux: связь с реактивным мышлением
WebFlux воплощает принципы, которые мы разбирали: push-модель, где данные отправляются по готовности; обратное давление для контроля темпа; операторы для трансформаций. В экосистеме Spring он интегрируется с другими компонентами: Spring Boot для быстрого старта, Spring Data Reactive для баз данных (как R2DBC), Spring Security Reactive для безопасности. Ключевой выигрыш — для IO-bound задач: запросы к БД, API или файлам идут асинхронно, без блокировок. Если ваш сервис ждёт внешних ответов 90% времени, WebFlux высвобождает ресурсы, позволяя обрабатывать в 10-100 раз больше запросов на том же железе.
Чтобы начать: добавьте spring-boot-starter-webflux в зависимости (Maven/Gradle). Spring Boot автоматически настроит Reactor Netty как сервер (неблокирующий HTTP на базе Netty). Нет нужды в Tomcat — всё реактивно.
Ключевые компоненты Spring WebFlux
WebFlux предлагает два стиля разработки: аннотированный (похож на Spring MVC) и функциональный (роутеры как в Express.js). Оба используют Mono/Flux для ответов.
- Аннотированные контроллеры: @RestController с методами, возвращающими Mono или Flux.
Пример простого GET:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class HelloController {
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.just("Привет из WebFlux!"); // Асинхронный ответ
}
}
Здесь метод возвращает Mono — сервер не блокируется, ответ "течёт" асинхронно. Для Flux: стриминг данных, как SSE (сервер-сент события).
- Функциональные роутеры: Для маршрутизации без контроллеров.
Пример:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> routes() {
return RouterFunctions.route()
.GET("/hello", request -> ServerResponse.ok().bodyValue("Привет из роутера!"))
.build();
}
}
Это декларативно: опишите маршруты, обработчики возвращают ServerResponse с Mono/Flux.
- WebClient: реактивный клиент для исходящих запросов, интегрируется seamlessly.
Ещё фишки: поддержка WebSockets для bidirectional стриминга, Server-Sent Events для push-уведомлений, интеграция с Schedulers для распределения задач.
Практические советы и подводные камни
- Миграция: Начните с аннотированных контроллеров — синтаксис похож на MVC, но возвращайте Mono/Flux.
- Тестирование: WebTestClient вместо MockMvc — асинхронные тесты с StepVerifier (из Reactor).
- Камень: Блокирующий код в контроллерах (JDBC, sleep) сломает асинхронность — используйте reactive драйверы (R2DBC) и publishOn(Schedulers.boundedElastic()).
- Производительность: Под нагрузкой мониторьте с Micrometer — WebFlux даёт метрики из коробки.
В реальной жизни: Netflix использует похожий стек для стриминга, где WebFlux обрабатывает миллионы подключений.
#Java #middle #Reactor #WebFlux
👍3
Реактивное программирование
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
В контроллере:
Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
#Java #middle #Reactor #WebFlux #Mono #Flux
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Для Gradle: implementation 'org.springframework.boot:spring-boot-starter-webflux'
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class GreetingController {
@GetMapping("/greet/{name}")
public Mono<String> greet(@PathVariable String name) {
return Mono.just("Привет, " + name + "! Добро пожаловать в реактивный мир.");
}
}
Здесь @GetMapping — аннотация для маршрута, @PathVariable — параметр из URL. Метод возвращает Mono.just — простой синхронный элемент. Но под капотом: WebFlux подпишется на Mono, и когда значение готово (сразу), отправит HTTP 200 с телом. Если бы внутри был асинхронный вызов (например, Mono.delay(Duration.ofSeconds(2)).map(ignore -> "Задержанный привет")), сервер не заблокируется — запрос обработается асинхронно.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
@Service
public class UserService {
public Mono<User> findUserById(Long id) {
return Mono.fromCallable(() -> /* симуляция БД */ new User(id, "Имя")); // Асинхронно
}
}
В контроллере:
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findUserById(id);
}
Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
@PostMapping("/create-user")
public Mono<User> createUser(@RequestBody Mono<UserRequest> requestMono) {
return requestMono.flatMap(req -> {
// Асинхронная логика: создать пользователя
return userService.createUser(req.getName()).map(saved -> new User(saved.getId(), saved.getName()));
});
}
Здесь @RequestBody — Mono, потому что тело может приходить асинхронно (большие данные). flatMap — для цепочки (из поста 8), чтобы "развернуть" подпоток.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍1
Flux в контроллере: для коллекций и стриминга
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
Расширим: пагинация с @RequestParam.
Для POST с Flux телом (batch-операции):
Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
@GetMapping("/users")
public Flux<User> getAllUsers() {
return Flux.fromIterable(List.of(new User(1L, "Алиса"), new User(2L, "Боб")));
}
Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
public Flux<User> findAllUsers() {
return Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1)) // Симуляция задержки
.map(i -> new User((long) i, "Пользователь " + i));
}
Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
@GetMapping(value = "/stream-users", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return findAllUsers();
}
Теперь ответ — SSE: каждый элемент как event: data: {"id":1,"name":"Пользователь 1"}\n\n. Клиент (браузер или WebClient) может реагировать на каждый по отдельности, без ожидания всего. Это решает боли callback-ада: вместо polling (опроса сервера), сервер толкает данные (push из поста 3).
Расширим: пагинация с @RequestParam.
@GetMapping("/users-paged")
public Flux<User> getPagedUsers(@RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "10") int size) {
return userService.findPaged(page, size); // Сервис возвращает Flux<User>
}
Для POST с Flux телом (batch-операции):
@PostMapping("/batch-users")
public Flux<User> createBatch(@RequestBody Flux<UserRequest> requestsFlux) {
return requestsFlux.flatMap(req -> userService.createUser(req.getName()));
}
flatMap — для параллельной обработки, возвращает Flux сохранённых пользователей.
Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
@ExceptionHandler(UserNotFoundException.class)
public Mono<ResponseEntity<String>> handleNotFound(UserNotFoundException ex) {
return Mono.just(ResponseEntity.notFound().build());
}
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍1