Реактивное программирование
Концепции реактивного программирования: Push vs Pull — кто управляет данными
Сегодня разберём модели Push (отправку данных) и Pull (получение данных), почему одна из них идеальна для реактивного стиля, и как это решает проблемы из первого поста: от тяжёлых потоков до callback-ада.
Представьте: у вас есть конвейер на фабрике. В pull-модели рабочий сам подходит к предыдущему этапу и "берет" деталь, когда ему нужно. В push-модели предыдущий этап "отправляет" деталь дальше, как только она готова, не спрашивая.
Какая модель лучше? Зависит от сценария. В программировании это то же самое: pull подходит для предсказуемых, контролируемых потоков, а push — для динамичных, событийных, где данные приходят непредсказуемо (сеть, пользователи, сенсоры).
Реактивное программирование строится на push — и вот почему это революция.
Pull-модель: вы контролируете темп, но рискуете блокировками
В традиционном императивном программировании (когда код выполняется шаг за шагом, как рецепт) данные обычно "получаются" по требованию. Это pull-модель: потребитель (ваш код) сам запрашивает данные, когда готов их обработать.
Пример — чтение из итератора в Java:
Здесь вы контролируете процесс: hasNext() проверяет наличие, next() получает элемент. Это удобно для локальных, синхронных данных — вы знаете, сколько элементов, и ничего не ждёте. Но под капотом это синхронно: если данные в потоке (файл, сеть), next() может заблокироваться, как в старых потоках. Поток висит в ожидании, ресурсы тратятся впустую.
В асинхронных сценариях pull эволюционировал в Future или CompletableFuture: вы "запрашиваете" результат через get() или цепочки, но контроль остаётся у вас.
Проблема: если источник данных медленный (БД, API), ваш pull-запрос блокирует или создаёт callback-ад. Под нагрузкой — тысячи pull-запросов — система не масштабируется, потому что каждый требует ресурса (потока). Это как толпа в очереди: каждый тянет за своим, но если касса одна, все стоят.
Ещё минус: pull не справляется с "лишними" данными. Если источник генерирует 1 млн событий в секунду, а вы получаете по одному — либо перегрузка буфера, либо вы не успеваете. Нет встроенного механизма, чтобы сказать "замедлись".
#Java #middle #Reactor #Push #Flux
Концепции реактивного программирования: Push vs Pull — кто управляет данными
Сегодня разберём модели Push (отправку данных) и Pull (получение данных), почему одна из них идеальна для реактивного стиля, и как это решает проблемы из первого поста: от тяжёлых потоков до callback-ада.
Представьте: у вас есть конвейер на фабрике. В pull-модели рабочий сам подходит к предыдущему этапу и "берет" деталь, когда ему нужно. В push-модели предыдущий этап "отправляет" деталь дальше, как только она готова, не спрашивая.
Какая модель лучше? Зависит от сценария. В программировании это то же самое: pull подходит для предсказуемых, контролируемых потоков, а push — для динамичных, событийных, где данные приходят непредсказуемо (сеть, пользователи, сенсоры).
Реактивное программирование строится на push — и вот почему это революция.
Pull-модель: вы контролируете темп, но рискуете блокировками
В традиционном императивном программировании (когда код выполняется шаг за шагом, как рецепт) данные обычно "получаются" по требованию. Это pull-модель: потребитель (ваш код) сам запрашивает данные, когда готов их обработать.
Пример — чтение из итератора в Java:
List<String> fruits = Arrays.asList("яблоко", "банан", "вишня");
Iterator<String> iterator = fruits.iterator();
while (iterator.hasNext()) {
String fruit = iterator.next(); // "Вытягиваем" следующий элемент
System.out.println(fruit.toUpperCase());
}
Здесь вы контролируете процесс: hasNext() проверяет наличие, next() получает элемент. Это удобно для локальных, синхронных данных — вы знаете, сколько элементов, и ничего не ждёте. Но под капотом это синхронно: если данные в потоке (файл, сеть), next() может заблокироваться, как в старых потоках. Поток висит в ожидании, ресурсы тратятся впустую.
В асинхронных сценариях pull эволюционировал в Future или CompletableFuture: вы "запрашиваете" результат через get() или цепочки, но контроль остаётся у вас.
Проблема: если источник данных медленный (БД, API), ваш pull-запрос блокирует или создаёт callback-ад. Под нагрузкой — тысячи pull-запросов — система не масштабируется, потому что каждый требует ресурса (потока). Это как толпа в очереди: каждый тянет за своим, но если касса одна, все стоят.
Ещё минус: pull не справляется с "лишними" данными. Если источник генерирует 1 млн событий в секунду, а вы получаете по одному — либо перегрузка буфера, либо вы не успеваете. Нет встроенного механизма, чтобы сказать "замедлись".
#Java #middle #Reactor #Push #Flux
👍3
Push-модель: данные приходят сами, реактивно и эффективно
Теперь изменим: в push-модели источник "отправляет" данные потребителю, как только они готовы, без запросов. Потребитель пассивен — он подписывается и реагирует.
Это основа реактивного программирования: события push'атся асинхронно, без блокировок. Контроль переходит к источнику, но с обратным давлением — подписчик может сказать "хватит на время".
В Reactive Streams это реализовано через Publisher и Subscriber: издатель толкает onNext(элемент), подписчик реагирует сразу.
Пример с Flux в Project Reactor (push-стиль):
Здесь Flux — издатель, который отправляет элементы по мере готовности. subscribe() — подписка, и элементы приходят автоматически: "ЯБЛОКО", "БАНАН"... Нет next() — нет pull.
Если источник асинхронный, например, чтение из сети:
Данные push'атся по мере поступления от сервера — без блокировок. Reactor использует неблокирующий IO (на базе Netty), так что поток не висит: один event-loop-цикл (цикл обработки событий) обслуживает тысячи подписок.
Почему push лучше для реактивности?
Во-первых, эффективность: нет лишних проверок hasNext().
Во-вторых, естественность для событий: клик мыши, сообщение в чате — это push по природе, они приходят сами.
В-третьих, масштабируемость: тысячи подписчиков на один издатель — ок, потому что push идёт через события, а не потоки на каждого.
Гибридные сценарии: когда mix работает
На практике модели смешиваются. В Reactor Flux может имитировать pull через операторы вроде buffer() или take(), но основа — push.
Пример: pull из локального списка, но push в сеть:
Здесь локальный pull (fromIterable) переходит в push (flatMap для API).
Это гибкость: используйте pull для контроля, push для асинхронности. Но важно избегать блокировок: Reactor проверяет и предупреждает, если в лямбде блокирующий код (onBlock()).
Ещё пример из реальной жизни: стриминг видео в Netflix. Pull — когда пользователь сам получает фреймы, но под нагрузкой лагает. Push — сервер отдает фреймы по мере готовности, с буферизацией. Реактивные библиотеки (как RxJava) позволяют строить такие конвейеры.
Почему Push — ключ к новому подходу в реактивном программировании
Возвращаясь к проблемам: потоки тяжёлые, потому что pull требует ожидания; Future блокирует на get(), потому что это pull в асинхронной обёртке; CompletableFuture даёт цепочки, но push-подход в нём слаб (колбэки — это мини-push, но без полного контроля).
Реактивный push меняет всё: данные текут как река, вы реагируете без ожидания, ресурсы на минимуме. Системы становятся resilient (устойчивыми): если один поток сломается, другие продолжают. Под нагрузкой — горизонтальное масштабирование без боли.
#Java #middle #Reactor #Push #Flux
Теперь изменим: в push-модели источник "отправляет" данные потребителю, как только они готовы, без запросов. Потребитель пассивен — он подписывается и реагирует.
Это основа реактивного программирования: события push'атся асинхронно, без блокировок. Контроль переходит к источнику, но с обратным давлением — подписчик может сказать "хватит на время".
В Reactive Streams это реализовано через Publisher и Subscriber: издатель толкает onNext(элемент), подписчик реагирует сразу.
Пример с Flux в Project Reactor (push-стиль):
Flux<String> pushFlux = Flux.fromIterable(Arrays.asList("яблоко", "банан", "вишня"))
.map(String::toUpperCase); // Преобразование в потоке
pushFlux.subscribe(
fruit -> System.out.println("Получено из push: " + fruit), // Реакция на толкание
Throwable::printStackTrace, // Обработка ошибок
() -> System.out.println("Push завершён")
);
Здесь Flux — издатель, который отправляет элементы по мере готовности. subscribe() — подписка, и элементы приходят автоматически: "ЯБЛОКО", "БАНАН"... Нет next() — нет pull.
Если источник асинхронный, например, чтение из сети:
WebClient.create()
.get()
.uri("https://api.example.com/fruits")
.retrieve()
.bodyToFlux(String.class) // Flux толкает строки из ответа
.subscribe(fruit -> System.out.println("Push из API: " + fruit));
Данные push'атся по мере поступления от сервера — без блокировок. Reactor использует неблокирующий IO (на базе Netty), так что поток не висит: один event-loop-цикл (цикл обработки событий) обслуживает тысячи подписок.
Почему push лучше для реактивности?
Во-первых, эффективность: нет лишних проверок hasNext().
Во-вторых, естественность для событий: клик мыши, сообщение в чате — это push по природе, они приходят сами.
В-третьих, масштабируемость: тысячи подписчиков на один издатель — ок, потому что push идёт через события, а не потоки на каждого.
Гибридные сценарии: когда mix работает
На практике модели смешиваются. В Reactor Flux может имитировать pull через операторы вроде buffer() или take(), но основа — push.
Пример: pull из локального списка, но push в сеть:
Flux.fromIterable(fruits)
.map(String::toUpperCase)
.flatMap(fruit -> sendToApi(fruit)) // flatMap толкает в асинхронный API
.subscribe(result -> System.out.println("Ответ: " + result));
Здесь локальный pull (fromIterable) переходит в push (flatMap для API).
Это гибкость: используйте pull для контроля, push для асинхронности. Но важно избегать блокировок: Reactor проверяет и предупреждает, если в лямбде блокирующий код (onBlock()).
Ещё пример из реальной жизни: стриминг видео в Netflix. Pull — когда пользователь сам получает фреймы, но под нагрузкой лагает. Push — сервер отдает фреймы по мере готовности, с буферизацией. Реактивные библиотеки (как RxJava) позволяют строить такие конвейеры.
Почему Push — ключ к новому подходу в реактивном программировании
Возвращаясь к проблемам: потоки тяжёлые, потому что pull требует ожидания; Future блокирует на get(), потому что это pull в асинхронной обёртке; CompletableFuture даёт цепочки, но push-подход в нём слаб (колбэки — это мини-push, но без полного контроля).
Реактивный push меняет всё: данные текут как река, вы реагируете без ожидания, ресурсы на минимуме. Системы становятся resilient (устойчивыми): если один поток сломается, другие продолжают. Под нагрузкой — горизонтальное масштабирование без боли.
#Java #middle #Reactor #Push #Flux
👍3
Знакомство с Project Reactor: Mono и Flux
Project Reactor — это не просто обёртка, а полноценный фреймворк от команды Spring, оптимизированный для производительности. Он использует неблокирующий подход: всё работает на основе событий, с минимальным потреблением ресурсов. Ключевые типы — Mono и Flux, которые воплощают потоки данных. Mono для случаев с нуля или одним элементом (как одиночный запрос), Flux для последовательностей (как стриминг). Они реализуют Publisher из Reactive Streams, так что поддерживают подписку, реакции и обратное давление.
Что такое Project Reactor и как начать?
Project Reactor — библиотека для реактивного программирования, которая строит на Reactive Streams. Она предоставляет API для создания, трансформации и потребления асинхронных потоков. Под капотом — эффективный scheduler (планировщик задач), который распределяет работу по потокам без блокировок: использует event-loop для IO и параллельные пулы для вычислений.
Чтобы начать добавьте в pom.xml (Maven):
Импортируйте:
Reactor интегрируется с Spring (WebFlux), но работает standalone. Главное преимущество: код становится декларативным — вы описываете "что" (поток и реакции), а не "как" (ожидания и циклы). Это решает callback-ад: цепочки читаемы, как последовательный код, но асинхронны.
Mono: поток для нуля или одного элемента
Mono — это тип для сценариев, где ожидается максимум один результат: успех (элемент), ошибка или ничего. Идеален для HTTP-запросов, чтения записи из БД или вычислений с одиночным исходом. Mono реализует Publisher, так что вы можете подписаться и реагировать.
Создание Mono: используйте статические методы.
- Mono.just(value): из готового значения.
- Mono.empty(): пустой поток (завершится onComplete без onNext).
- Mono.fromCallable(() -> compute()): из синхронного вызова.
- Mono.defer(() -> createMono()): ленивое создание (выполняется при подписке).
Пример простого Mono:
Асинхронный пример: симулируем задержку.
Почему Mono лучше Future/CompletableFuture?
Нет блокирующего get(): результат приходит в onNext. Нет callback-ада: цепочки через операторы (map, flatMap — разберём в следующих постах). Под нагрузкой: тысячи Mono на одном потоке, без исчерпания пула.
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
Project Reactor — это не просто обёртка, а полноценный фреймворк от команды Spring, оптимизированный для производительности. Он использует неблокирующий подход: всё работает на основе событий, с минимальным потреблением ресурсов. Ключевые типы — Mono и Flux, которые воплощают потоки данных. Mono для случаев с нуля или одним элементом (как одиночный запрос), Flux для последовательностей (как стриминг). Они реализуют Publisher из Reactive Streams, так что поддерживают подписку, реакции и обратное давление.
Что такое Project Reactor и как начать?
Project Reactor — библиотека для реактивного программирования, которая строит на Reactive Streams. Она предоставляет API для создания, трансформации и потребления асинхронных потоков. Под капотом — эффективный scheduler (планировщик задач), который распределяет работу по потокам без блокировок: использует event-loop для IO и параллельные пулы для вычислений.
Чтобы начать добавьте в pom.xml (Maven):
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.0</version> <!-- версия может быть неактуальной -->
</dependency>
Импортируйте:
import reactor.core.publisher.Mono; import reactor.core.publisher.Flux;
Reactor интегрируется с Spring (WebFlux), но работает standalone. Главное преимущество: код становится декларативным — вы описываете "что" (поток и реакции), а не "как" (ожидания и циклы). Это решает callback-ад: цепочки читаемы, как последовательный код, но асинхронны.
Mono: поток для нуля или одного элемента
Mono — это тип для сценариев, где ожидается максимум один результат: успех (элемент), ошибка или ничего. Идеален для HTTP-запросов, чтения записи из БД или вычислений с одиночным исходом. Mono реализует Publisher, так что вы можете подписаться и реагировать.
Создание Mono: используйте статические методы.
- Mono.just(value): из готового значения.
- Mono.empty(): пустой поток (завершится onComplete без onNext).
- Mono.fromCallable(() -> compute()): из синхронного вызова.
- Mono.defer(() -> createMono()): ленивое создание (выполняется при подписке).
Пример простого Mono:
Mono<String> simpleMono = Mono.just("Привет из Reactor");
Подписка: subscribe() вызывает реакции.
simpleMono.subscribe(
value -> System.out.println("Значение: " + value), // onNext: реакция на элемент
error -> System.err.println("Ошибка: " + error), // onError
() -> System.out.println("Завершено") // onComplete
);
Здесь: "Значение: Привет из Reactor" и "Завершено". Подписка асинхронна — код после subscribe() идёт сразу, без ожидания. Если ошибка (например, Mono.error(new RuntimeException("Бум"))), сработает onError.
Асинхронный пример: симулируем задержку.
Mono<String> asyncMono = Mono.delay(Duration.ofSeconds(1)).map(ignore -> "Готово после секунды");
asyncMono.subscribe(System.out::println); // Вывод после 1 сек
Почему Mono лучше Future/CompletableFuture?
Нет блокирующего get(): результат приходит в onNext. Нет callback-ада: цепочки через операторы (map, flatMap — разберём в следующих постах). Под нагрузкой: тысячи Mono на одном потоке, без исчерпания пула.
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
👍4
Flux: поток для нуля, одного или множества элементов
Flux — для последовательностей: от конечных (список) до бесконечных (стриминг). Может быть пустым, с одним элементом (как Mono) или миллионами. Flux тоже Publisher, поддерживает backpressure.
Создание Flux:
- Flux.just(a, b, c): из значений.
- Flux.fromIterable(list): из коллекции.
- Flux.range(start, count): последовательность чисел.
- Flux.interval(Duration): бесконечный таймер.
- Flux.generate(sink -> { sink.next(value); sink.complete(); }): генератор с состоянием.
Пример базового Flux:
С backpressure используйте BaseSubscriber для контроля.
Асинхронный Flux: стриминг с задержкой.
Почему Flux лучше потоков или циклов?
Масштабируемость: обрабатывает миллионы элементов без лишних ресурсов. Реактивность: реагируйте на каждый элемент, без буферов в памяти. Интеграция с обратным давлением: если подписчик медленный, Flux замедляется.
Практические советы и подводные камни
- Ленивость: Mono/Flux не выполняются без subscribe(). Полезно для отложенного вычисления.
- Ошибки: всегда обрабатывайте onError, иначе они "проглатываются".
- Блокировки: избегайте Thread.sleep() в реакциях — используйте delayElements() для асинхронных пауз.
- Тестирование: используйте StepVerifier из reactor-test: StepVerifier.create(flux).expectNext(1,2).verifyComplete();
- Камень: бесконечный Flux без take() или limitRequest() — рискуете памятью. Добавляйте .onBackpressureBuffer() или drop.
В реальной практике: в WebFlux контроллер возвращает Mono<Response> для GET, Flux<Event> для SSE (сервер-сент события).
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
Flux — для последовательностей: от конечных (список) до бесконечных (стриминг). Может быть пустым, с одним элементом (как Mono) или миллионами. Flux тоже Publisher, поддерживает backpressure.
Создание Flux:
- Flux.just(a, b, c): из значений.
- Flux.fromIterable(list): из коллекции.
- Flux.range(start, count): последовательность чисел.
- Flux.interval(Duration): бесконечный таймер.
- Flux.generate(sink -> { sink.next(value); sink.complete(); }): генератор с состоянием.
Пример базового Flux:
Flux<Integer> numbersFlux = Flux.range(1, 5);
numbersFlux.subscribe(
value -> System.out.println("Элемент: " + value), // onNext для каждого
error -> System.err.println("Ошибка: " + error),
() -> System.out.println("Завершено")
);
Вывод: 1, 2, 3, 4, 5 и "Завершено". Асинхронно: элементы "текут" по мере готовности.
С backpressure используйте BaseSubscriber для контроля.
numbersFlux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(2); // Запрашиваем по 2 элемента
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Получено: " + value);
request(1); // После обработки — следующий
}
});
Это предотвращает перегрузку: Flux выдаёт элементы порциями.
Асинхронный Flux: стриминг с задержкой.
Flux<Long> tickingFlux = Flux.interval(Duration.ofMillis(500)).take(5); // 5 тиков каждые 0.5 сек
tickingFlux.subscribe(System.out::println); // 0, 1, 2, 3, 4 с паузами
Почему Flux лучше потоков или циклов?
Масштабируемость: обрабатывает миллионы элементов без лишних ресурсов. Реактивность: реагируйте на каждый элемент, без буферов в памяти. Интеграция с обратным давлением: если подписчик медленный, Flux замедляется.
Практические советы и подводные камни
- Ленивость: Mono/Flux не выполняются без subscribe(). Полезно для отложенного вычисления.
- Ошибки: всегда обрабатывайте onError, иначе они "проглатываются".
- Блокировки: избегайте Thread.sleep() в реакциях — используйте delayElements() для асинхронных пауз.
- Тестирование: используйте StepVerifier из reactor-test: StepVerifier.create(flux).expectNext(1,2).verifyComplete();
- Камень: бесконечный Flux без take() или limitRequest() — рискуете памятью. Добавляйте .onBackpressureBuffer() или drop.
В реальной практике: в WebFlux контроллер возвращает Mono<Response> для GET, Flux<Event> для SSE (сервер-сент события).
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
👍6
Реактивное программирование
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
В контроллере:
Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
#Java #middle #Reactor #WebFlux #Mono #Flux
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Для Gradle: implementation 'org.springframework.boot:spring-boot-starter-webflux'
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class GreetingController {
@GetMapping("/greet/{name}")
public Mono<String> greet(@PathVariable String name) {
return Mono.just("Привет, " + name + "! Добро пожаловать в реактивный мир.");
}
}
Здесь @GetMapping — аннотация для маршрута, @PathVariable — параметр из URL. Метод возвращает Mono.just — простой синхронный элемент. Но под капотом: WebFlux подпишется на Mono, и когда значение готово (сразу), отправит HTTP 200 с телом. Если бы внутри был асинхронный вызов (например, Mono.delay(Duration.ofSeconds(2)).map(ignore -> "Задержанный привет")), сервер не заблокируется — запрос обработается асинхронно.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
@Service
public class UserService {
public Mono<User> findUserById(Long id) {
return Mono.fromCallable(() -> /* симуляция БД */ new User(id, "Имя")); // Асинхронно
}
}
В контроллере:
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findUserById(id);
}
Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
@PostMapping("/create-user")
public Mono<User> createUser(@RequestBody Mono<UserRequest> requestMono) {
return requestMono.flatMap(req -> {
// Асинхронная логика: создать пользователя
return userService.createUser(req.getName()).map(saved -> new User(saved.getId(), saved.getName()));
});
}
Здесь @RequestBody — Mono, потому что тело может приходить асинхронно (большие данные). flatMap — для цепочки (из поста 8), чтобы "развернуть" подпоток.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍1
Flux в контроллере: для коллекций и стриминга
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
Расширим: пагинация с @RequestParam.
Для POST с Flux телом (batch-операции):
Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
@GetMapping("/users")
public Flux<User> getAllUsers() {
return Flux.fromIterable(List.of(new User(1L, "Алиса"), new User(2L, "Боб")));
}
Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
public Flux<User> findAllUsers() {
return Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1)) // Симуляция задержки
.map(i -> new User((long) i, "Пользователь " + i));
}
Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
@GetMapping(value = "/stream-users", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return findAllUsers();
}
Теперь ответ — SSE: каждый элемент как event: data: {"id":1,"name":"Пользователь 1"}\n\n. Клиент (браузер или WebClient) может реагировать на каждый по отдельности, без ожидания всего. Это решает боли callback-ада: вместо polling (опроса сервера), сервер толкает данные (push из поста 3).
Расширим: пагинация с @RequestParam.
@GetMapping("/users-paged")
public Flux<User> getPagedUsers(@RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "10") int size) {
return userService.findPaged(page, size); // Сервис возвращает Flux<User>
}
Для POST с Flux телом (batch-операции):
@PostMapping("/batch-users")
public Flux<User> createBatch(@RequestBody Flux<UserRequest> requestsFlux) {
return requestsFlux.flatMap(req -> userService.createUser(req.getName()));
}
flatMap — для параллельной обработки, возвращает Flux сохранённых пользователей.
Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
@ExceptionHandler(UserNotFoundException.class)
public Mono<ResponseEntity<String>> handleNotFound(UserNotFoundException ex) {
return Mono.just(ResponseEntity.notFound().build());
}
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍1