Реактивное программирование
Концепции реактивного программирования: Push vs Pull — кто управляет данными
Сегодня разберём модели Push (отправку данных) и Pull (получение данных), почему одна из них идеальна для реактивного стиля, и как это решает проблемы из первого поста: от тяжёлых потоков до callback-ада.
Представьте: у вас есть конвейер на фабрике. В pull-модели рабочий сам подходит к предыдущему этапу и "берет" деталь, когда ему нужно. В push-модели предыдущий этап "отправляет" деталь дальше, как только она готова, не спрашивая.
Какая модель лучше? Зависит от сценария. В программировании это то же самое: pull подходит для предсказуемых, контролируемых потоков, а push — для динамичных, событийных, где данные приходят непредсказуемо (сеть, пользователи, сенсоры).
Реактивное программирование строится на push — и вот почему это революция.
Pull-модель: вы контролируете темп, но рискуете блокировками
В традиционном императивном программировании (когда код выполняется шаг за шагом, как рецепт) данные обычно "получаются" по требованию. Это pull-модель: потребитель (ваш код) сам запрашивает данные, когда готов их обработать.
Пример — чтение из итератора в Java:
Здесь вы контролируете процесс: hasNext() проверяет наличие, next() получает элемент. Это удобно для локальных, синхронных данных — вы знаете, сколько элементов, и ничего не ждёте. Но под капотом это синхронно: если данные в потоке (файл, сеть), next() может заблокироваться, как в старых потоках. Поток висит в ожидании, ресурсы тратятся впустую.
В асинхронных сценариях pull эволюционировал в Future или CompletableFuture: вы "запрашиваете" результат через get() или цепочки, но контроль остаётся у вас.
Проблема: если источник данных медленный (БД, API), ваш pull-запрос блокирует или создаёт callback-ад. Под нагрузкой — тысячи pull-запросов — система не масштабируется, потому что каждый требует ресурса (потока). Это как толпа в очереди: каждый тянет за своим, но если касса одна, все стоят.
Ещё минус: pull не справляется с "лишними" данными. Если источник генерирует 1 млн событий в секунду, а вы получаете по одному — либо перегрузка буфера, либо вы не успеваете. Нет встроенного механизма, чтобы сказать "замедлись".
#Java #middle #Reactor #Push #Flux
Концепции реактивного программирования: Push vs Pull — кто управляет данными
Сегодня разберём модели Push (отправку данных) и Pull (получение данных), почему одна из них идеальна для реактивного стиля, и как это решает проблемы из первого поста: от тяжёлых потоков до callback-ада.
Представьте: у вас есть конвейер на фабрике. В pull-модели рабочий сам подходит к предыдущему этапу и "берет" деталь, когда ему нужно. В push-модели предыдущий этап "отправляет" деталь дальше, как только она готова, не спрашивая.
Какая модель лучше? Зависит от сценария. В программировании это то же самое: pull подходит для предсказуемых, контролируемых потоков, а push — для динамичных, событийных, где данные приходят непредсказуемо (сеть, пользователи, сенсоры).
Реактивное программирование строится на push — и вот почему это революция.
Pull-модель: вы контролируете темп, но рискуете блокировками
В традиционном императивном программировании (когда код выполняется шаг за шагом, как рецепт) данные обычно "получаются" по требованию. Это pull-модель: потребитель (ваш код) сам запрашивает данные, когда готов их обработать.
Пример — чтение из итератора в Java:
List<String> fruits = Arrays.asList("яблоко", "банан", "вишня");
Iterator<String> iterator = fruits.iterator();
while (iterator.hasNext()) {
String fruit = iterator.next(); // "Вытягиваем" следующий элемент
System.out.println(fruit.toUpperCase());
}Здесь вы контролируете процесс: hasNext() проверяет наличие, next() получает элемент. Это удобно для локальных, синхронных данных — вы знаете, сколько элементов, и ничего не ждёте. Но под капотом это синхронно: если данные в потоке (файл, сеть), next() может заблокироваться, как в старых потоках. Поток висит в ожидании, ресурсы тратятся впустую.
В асинхронных сценариях pull эволюционировал в Future или CompletableFuture: вы "запрашиваете" результат через get() или цепочки, но контроль остаётся у вас.
Проблема: если источник данных медленный (БД, API), ваш pull-запрос блокирует или создаёт callback-ад. Под нагрузкой — тысячи pull-запросов — система не масштабируется, потому что каждый требует ресурса (потока). Это как толпа в очереди: каждый тянет за своим, но если касса одна, все стоят.
Ещё минус: pull не справляется с "лишними" данными. Если источник генерирует 1 млн событий в секунду, а вы получаете по одному — либо перегрузка буфера, либо вы не успеваете. Нет встроенного механизма, чтобы сказать "замедлись".
#Java #middle #Reactor #Push #Flux
👍3
Push-модель: данные приходят сами, реактивно и эффективно
Теперь изменим: в push-модели источник "отправляет" данные потребителю, как только они готовы, без запросов. Потребитель пассивен — он подписывается и реагирует.
Это основа реактивного программирования: события push'атся асинхронно, без блокировок. Контроль переходит к источнику, но с обратным давлением — подписчик может сказать "хватит на время".
В Reactive Streams это реализовано через Publisher и Subscriber: издатель толкает onNext(элемент), подписчик реагирует сразу.
Пример с Flux в Project Reactor (push-стиль):
Здесь Flux — издатель, который отправляет элементы по мере готовности. subscribe() — подписка, и элементы приходят автоматически: "ЯБЛОКО", "БАНАН"... Нет next() — нет pull.
Если источник асинхронный, например, чтение из сети:
Данные push'атся по мере поступления от сервера — без блокировок. Reactor использует неблокирующий IO (на базе Netty), так что поток не висит: один event-loop-цикл (цикл обработки событий) обслуживает тысячи подписок.
Почему push лучше для реактивности?
Во-первых, эффективность: нет лишних проверок hasNext().
Во-вторых, естественность для событий: клик мыши, сообщение в чате — это push по природе, они приходят сами.
В-третьих, масштабируемость: тысячи подписчиков на один издатель — ок, потому что push идёт через события, а не потоки на каждого.
Гибридные сценарии: когда mix работает
На практике модели смешиваются. В Reactor Flux может имитировать pull через операторы вроде buffer() или take(), но основа — push.
Пример: pull из локального списка, но push в сеть:
Здесь локальный pull (fromIterable) переходит в push (flatMap для API).
Это гибкость: используйте pull для контроля, push для асинхронности. Но важно избегать блокировок: Reactor проверяет и предупреждает, если в лямбде блокирующий код (onBlock()).
Ещё пример из реальной жизни: стриминг видео в Netflix. Pull — когда пользователь сам получает фреймы, но под нагрузкой лагает. Push — сервер отдает фреймы по мере готовности, с буферизацией. Реактивные библиотеки (как RxJava) позволяют строить такие конвейеры.
Почему Push — ключ к новому подходу в реактивном программировании
Возвращаясь к проблемам: потоки тяжёлые, потому что pull требует ожидания; Future блокирует на get(), потому что это pull в асинхронной обёртке; CompletableFuture даёт цепочки, но push-подход в нём слаб (колбэки — это мини-push, но без полного контроля).
Реактивный push меняет всё: данные текут как река, вы реагируете без ожидания, ресурсы на минимуме. Системы становятся resilient (устойчивыми): если один поток сломается, другие продолжают. Под нагрузкой — горизонтальное масштабирование без боли.
#Java #middle #Reactor #Push #Flux
Теперь изменим: в push-модели источник "отправляет" данные потребителю, как только они готовы, без запросов. Потребитель пассивен — он подписывается и реагирует.
Это основа реактивного программирования: события push'атся асинхронно, без блокировок. Контроль переходит к источнику, но с обратным давлением — подписчик может сказать "хватит на время".
В Reactive Streams это реализовано через Publisher и Subscriber: издатель толкает onNext(элемент), подписчик реагирует сразу.
Пример с Flux в Project Reactor (push-стиль):
Flux<String> pushFlux = Flux.fromIterable(Arrays.asList("яблоко", "банан", "вишня"))
.map(String::toUpperCase); // Преобразование в потоке
pushFlux.subscribe(
fruit -> System.out.println("Получено из push: " + fruit), // Реакция на толкание
Throwable::printStackTrace, // Обработка ошибок
() -> System.out.println("Push завершён")
);Здесь Flux — издатель, который отправляет элементы по мере готовности. subscribe() — подписка, и элементы приходят автоматически: "ЯБЛОКО", "БАНАН"... Нет next() — нет pull.
Если источник асинхронный, например, чтение из сети:
WebClient.create()
.get()
.uri("https://api.example.com/fruits")
.retrieve()
.bodyToFlux(String.class) // Flux толкает строки из ответа
.subscribe(fruit -> System.out.println("Push из API: " + fruit));
Данные push'атся по мере поступления от сервера — без блокировок. Reactor использует неблокирующий IO (на базе Netty), так что поток не висит: один event-loop-цикл (цикл обработки событий) обслуживает тысячи подписок.
Почему push лучше для реактивности?
Во-первых, эффективность: нет лишних проверок hasNext().
Во-вторых, естественность для событий: клик мыши, сообщение в чате — это push по природе, они приходят сами.
В-третьих, масштабируемость: тысячи подписчиков на один издатель — ок, потому что push идёт через события, а не потоки на каждого.
Гибридные сценарии: когда mix работает
На практике модели смешиваются. В Reactor Flux может имитировать pull через операторы вроде buffer() или take(), но основа — push.
Пример: pull из локального списка, но push в сеть:
Flux.fromIterable(fruits)
.map(String::toUpperCase)
.flatMap(fruit -> sendToApi(fruit)) // flatMap толкает в асинхронный API
.subscribe(result -> System.out.println("Ответ: " + result));
Здесь локальный pull (fromIterable) переходит в push (flatMap для API).
Это гибкость: используйте pull для контроля, push для асинхронности. Но важно избегать блокировок: Reactor проверяет и предупреждает, если в лямбде блокирующий код (onBlock()).
Ещё пример из реальной жизни: стриминг видео в Netflix. Pull — когда пользователь сам получает фреймы, но под нагрузкой лагает. Push — сервер отдает фреймы по мере готовности, с буферизацией. Реактивные библиотеки (как RxJava) позволяют строить такие конвейеры.
Почему Push — ключ к новому подходу в реактивном программировании
Возвращаясь к проблемам: потоки тяжёлые, потому что pull требует ожидания; Future блокирует на get(), потому что это pull в асинхронной обёртке; CompletableFuture даёт цепочки, но push-подход в нём слаб (колбэки — это мини-push, но без полного контроля).
Реактивный push меняет всё: данные текут как река, вы реагируете без ожидания, ресурсы на минимуме. Системы становятся resilient (устойчивыми): если один поток сломается, другие продолжают. Под нагрузкой — горизонтальное масштабирование без боли.
#Java #middle #Reactor #Push #Flux
👍3
Знакомство с Project Reactor: Mono и Flux
Project Reactor — это не просто обёртка, а полноценный фреймворк от команды Spring, оптимизированный для производительности. Он использует неблокирующий подход: всё работает на основе событий, с минимальным потреблением ресурсов. Ключевые типы — Mono и Flux, которые воплощают потоки данных. Mono для случаев с нуля или одним элементом (как одиночный запрос), Flux для последовательностей (как стриминг). Они реализуют Publisher из Reactive Streams, так что поддерживают подписку, реакции и обратное давление.
Что такое Project Reactor и как начать?
Project Reactor — библиотека для реактивного программирования, которая строит на Reactive Streams. Она предоставляет API для создания, трансформации и потребления асинхронных потоков. Под капотом — эффективный scheduler (планировщик задач), который распределяет работу по потокам без блокировок: использует event-loop для IO и параллельные пулы для вычислений.
Чтобы начать добавьте в pom.xml (Maven):
Импортируйте:
Reactor интегрируется с Spring (WebFlux), но работает standalone. Главное преимущество: код становится декларативным — вы описываете "что" (поток и реакции), а не "как" (ожидания и циклы). Это решает callback-ад: цепочки читаемы, как последовательный код, но асинхронны.
Mono: поток для нуля или одного элемента
Mono — это тип для сценариев, где ожидается максимум один результат: успех (элемент), ошибка или ничего. Идеален для HTTP-запросов, чтения записи из БД или вычислений с одиночным исходом. Mono реализует Publisher, так что вы можете подписаться и реагировать.
Создание Mono: используйте статические методы.
- Mono.just(value): из готового значения.
- Mono.empty(): пустой поток (завершится onComplete без onNext).
- Mono.fromCallable(() -> compute()): из синхронного вызова.
- Mono.defer(() -> createMono()): ленивое создание (выполняется при подписке).
Пример простого Mono:
Асинхронный пример: симулируем задержку.
Почему Mono лучше Future/CompletableFuture?
Нет блокирующего get(): результат приходит в onNext. Нет callback-ада: цепочки через операторы (map, flatMap — разберём в следующих постах). Под нагрузкой: тысячи Mono на одном потоке, без исчерпания пула.
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
Project Reactor — это не просто обёртка, а полноценный фреймворк от команды Spring, оптимизированный для производительности. Он использует неблокирующий подход: всё работает на основе событий, с минимальным потреблением ресурсов. Ключевые типы — Mono и Flux, которые воплощают потоки данных. Mono для случаев с нуля или одним элементом (как одиночный запрос), Flux для последовательностей (как стриминг). Они реализуют Publisher из Reactive Streams, так что поддерживают подписку, реакции и обратное давление.
Что такое Project Reactor и как начать?
Project Reactor — библиотека для реактивного программирования, которая строит на Reactive Streams. Она предоставляет API для создания, трансформации и потребления асинхронных потоков. Под капотом — эффективный scheduler (планировщик задач), который распределяет работу по потокам без блокировок: использует event-loop для IO и параллельные пулы для вычислений.
Чтобы начать добавьте в pom.xml (Maven):
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.0</version> <!-- версия может быть неактуальной -->
</dependency>
Импортируйте:
import reactor.core.publisher.Mono; import reactor.core.publisher.Flux;
Reactor интегрируется с Spring (WebFlux), но работает standalone. Главное преимущество: код становится декларативным — вы описываете "что" (поток и реакции), а не "как" (ожидания и циклы). Это решает callback-ад: цепочки читаемы, как последовательный код, но асинхронны.
Mono: поток для нуля или одного элемента
Mono — это тип для сценариев, где ожидается максимум один результат: успех (элемент), ошибка или ничего. Идеален для HTTP-запросов, чтения записи из БД или вычислений с одиночным исходом. Mono реализует Publisher, так что вы можете подписаться и реагировать.
Создание Mono: используйте статические методы.
- Mono.just(value): из готового значения.
- Mono.empty(): пустой поток (завершится onComplete без onNext).
- Mono.fromCallable(() -> compute()): из синхронного вызова.
- Mono.defer(() -> createMono()): ленивое создание (выполняется при подписке).
Пример простого Mono:
Mono<String> simpleMono = Mono.just("Привет из Reactor");
Подписка: subscribe() вызывает реакции.
simpleMono.subscribe(
value -> System.out.println("Значение: " + value), // onNext: реакция на элемент
error -> System.err.println("Ошибка: " + error), // onError
() -> System.out.println("Завершено") // onComplete
);
Здесь: "Значение: Привет из Reactor" и "Завершено". Подписка асинхронна — код после subscribe() идёт сразу, без ожидания. Если ошибка (например, Mono.error(new RuntimeException("Бум"))), сработает onError.Асинхронный пример: симулируем задержку.
Mono<String> asyncMono = Mono.delay(Duration.ofSeconds(1)).map(ignore -> "Готово после секунды");
asyncMono.subscribe(System.out::println); // Вывод после 1 сек
Почему Mono лучше Future/CompletableFuture?
Нет блокирующего get(): результат приходит в onNext. Нет callback-ада: цепочки через операторы (map, flatMap — разберём в следующих постах). Под нагрузкой: тысячи Mono на одном потоке, без исчерпания пула.
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
👍4
Flux: поток для нуля, одного или множества элементов
Flux — для последовательностей: от конечных (список) до бесконечных (стриминг). Может быть пустым, с одним элементом (как Mono) или миллионами. Flux тоже Publisher, поддерживает backpressure.
Создание Flux:
- Flux.just(a, b, c): из значений.
- Flux.fromIterable(list): из коллекции.
- Flux.range(start, count): последовательность чисел.
- Flux.interval(Duration): бесконечный таймер.
- Flux.generate(sink -> { sink.next(value); sink.complete(); }): генератор с состоянием.
Пример базового Flux:
С backpressure используйте BaseSubscriber для контроля.
Асинхронный Flux: стриминг с задержкой.
Почему Flux лучше потоков или циклов?
Масштабируемость: обрабатывает миллионы элементов без лишних ресурсов. Реактивность: реагируйте на каждый элемент, без буферов в памяти. Интеграция с обратным давлением: если подписчик медленный, Flux замедляется.
Практические советы и подводные камни
- Ленивость: Mono/Flux не выполняются без subscribe(). Полезно для отложенного вычисления.
- Ошибки: всегда обрабатывайте onError, иначе они "проглатываются".
- Блокировки: избегайте Thread.sleep() в реакциях — используйте delayElements() для асинхронных пауз.
- Тестирование: используйте StepVerifier из reactor-test: StepVerifier.create(flux).expectNext(1,2).verifyComplete();
- Камень: бесконечный Flux без take() или limitRequest() — рискуете памятью. Добавляйте .onBackpressureBuffer() или drop.
В реальной практике: в WebFlux контроллер возвращает Mono<Response> для GET, Flux<Event> для SSE (сервер-сент события).
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
Flux — для последовательностей: от конечных (список) до бесконечных (стриминг). Может быть пустым, с одним элементом (как Mono) или миллионами. Flux тоже Publisher, поддерживает backpressure.
Создание Flux:
- Flux.just(a, b, c): из значений.
- Flux.fromIterable(list): из коллекции.
- Flux.range(start, count): последовательность чисел.
- Flux.interval(Duration): бесконечный таймер.
- Flux.generate(sink -> { sink.next(value); sink.complete(); }): генератор с состоянием.
Пример базового Flux:
Flux<Integer> numbersFlux = Flux.range(1, 5);
numbersFlux.subscribe(
value -> System.out.println("Элемент: " + value), // onNext для каждого
error -> System.err.println("Ошибка: " + error),
() -> System.out.println("Завершено")
);
Вывод: 1, 2, 3, 4, 5 и "Завершено". Асинхронно: элементы "текут" по мере готовности.
С backpressure используйте BaseSubscriber для контроля.
numbersFlux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(2); // Запрашиваем по 2 элемента
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Получено: " + value);
request(1); // После обработки — следующий
}
});
Это предотвращает перегрузку: Flux выдаёт элементы порциями.Асинхронный Flux: стриминг с задержкой.
Flux<Long> tickingFlux = Flux.interval(Duration.ofMillis(500)).take(5); // 5 тиков каждые 0.5 сек
tickingFlux.subscribe(System.out::println); // 0, 1, 2, 3, 4 с паузами
Почему Flux лучше потоков или циклов?
Масштабируемость: обрабатывает миллионы элементов без лишних ресурсов. Реактивность: реагируйте на каждый элемент, без буферов в памяти. Интеграция с обратным давлением: если подписчик медленный, Flux замедляется.
Практические советы и подводные камни
- Ленивость: Mono/Flux не выполняются без subscribe(). Полезно для отложенного вычисления.
- Ошибки: всегда обрабатывайте onError, иначе они "проглатываются".
- Блокировки: избегайте Thread.sleep() в реакциях — используйте delayElements() для асинхронных пауз.
- Тестирование: используйте StepVerifier из reactor-test: StepVerifier.create(flux).expectNext(1,2).verifyComplete();
- Камень: бесконечный Flux без take() или limitRequest() — рискуете памятью. Добавляйте .onBackpressureBuffer() или drop.
В реальной практике: в WebFlux контроллер возвращает Mono<Response> для GET, Flux<Event> для SSE (сервер-сент события).
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
👍6
Реактивное программирование
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
В контроллере:
Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
#Java #middle #Reactor #WebFlux #Mono #Flux
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Для Gradle: implementation 'org.springframework.boot:spring-boot-starter-webflux'
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class GreetingController {
@GetMapping("/greet/{name}")
public Mono<String> greet(@PathVariable String name) {
return Mono.just("Привет, " + name + "! Добро пожаловать в реактивный мир.");
}
}
Здесь @GetMapping — аннотация для маршрута, @PathVariable — параметр из URL. Метод возвращает Mono.just — простой синхронный элемент. Но под капотом: WebFlux подпишется на Mono, и когда значение готово (сразу), отправит HTTP 200 с телом. Если бы внутри был асинхронный вызов (например, Mono.delay(Duration.ofSeconds(2)).map(ignore -> "Задержанный привет")), сервер не заблокируется — запрос обработается асинхронно.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
@Service
public class UserService {
public Mono<User> findUserById(Long id) {
return Mono.fromCallable(() -> /* симуляция БД */ new User(id, "Имя")); // Асинхронно
}
}
В контроллере:
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findUserById(id);
}Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
@PostMapping("/create-user")
public Mono<User> createUser(@RequestBody Mono<UserRequest> requestMono) {
return requestMono.flatMap(req -> {
// Асинхронная логика: создать пользователя
return userService.createUser(req.getName()).map(saved -> new User(saved.getId(), saved.getName()));
});
}
Здесь @RequestBody — Mono, потому что тело может приходить асинхронно (большие данные). flatMap — для цепочки (из поста 8), чтобы "развернуть" подпоток.#Java #middle #Reactor #WebFlux #Mono #Flux
👍2
Flux в контроллере: для коллекций и стриминга
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
Расширим: пагинация с @RequestParam.
Для POST с Flux телом (batch-операции):
Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
@GetMapping("/users")
public Flux<User> getAllUsers() {
return Flux.fromIterable(List.of(new User(1L, "Алиса"), new User(2L, "Боб")));
}Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
public Flux<User> findAllUsers() {
return Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1)) // Симуляция задержки
.map(i -> new User((long) i, "Пользователь " + i));
}Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
@GetMapping(value = "/stream-users", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return findAllUsers();
}
Теперь ответ — SSE: каждый элемент как event: data: {"id":1,"name":"Пользователь 1"}\n\n. Клиент (браузер или WebClient) может реагировать на каждый по отдельности, без ожидания всего. Это решает боли callback-ада: вместо polling (опроса сервера), сервер толкает данные (push из поста 3).
Расширим: пагинация с @RequestParam.
@GetMapping("/users-paged")
public Flux<User> getPagedUsers(@RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "10") int size) {
return userService.findPaged(page, size); // Сервис возвращает Flux<User>
}Для POST с Flux телом (batch-операции):
@PostMapping("/batch-users")
public Flux<User> createBatch(@RequestBody Flux<UserRequest> requestsFlux) {
return requestsFlux.flatMap(req -> userService.createUser(req.getName()));
}
flatMap — для параллельной обработки, возвращает Flux сохранённых пользователей.Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
@ExceptionHandler(UserNotFoundException.class)
public Mono<ResponseEntity<String>> handleNotFound(UserNotFoundException ex) {
return Mono.just(ResponseEntity.notFound().build());
}
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍1
Реактивное программирование
R2DBC vs JDBC: реактивные базы данных
Исторический контекст: что такое JDBC и почему он доминировал десятилетиями
JDBC — это стандартный API Java для доступа к реляционным базам данных, появившийся ещё в JDK 1.1 (1997 год).
Он позволяет выполнять SQL-запросы, управлять соединениями и обрабатывать результаты через унифицированный интерфейс, независимо от конкретной БД (PostgreSQL, MySQL, Oracle и т.д.).
Ключевые компоненты JDBC:
DriverManager или DataSource: Для получения соединения (Connection).
Statement/PreparedStatement: Для выполнения SQL (executeQuery, executeUpdate).
ResultSet: Для чтения результатов (next(), getString() и т.д.).
Transaction management: commit(), rollback().
Пример простого JDBC-кода:
Это синхронно и блокирующе: executeQuery() "виснет" до ответа от БД, блокируя текущий поток.
В традиционных приложениях (как Spring MVC) это работало: каждый запрос — отдельный поток из пула (например, Tomcat с 200 потоками), и если БД отвечает быстро, проблем нет. Но под высокой нагрузкой или с медленными запросами (сетевые задержки, сложные джойны) пул исчерпывается: потоки "спят" в ожидании IO, CPU простаивает, а новые запросы ждут в очереди, вызывая таймауты и отказы. Это классическая проблема асинхронщины: JDBC не предназначен для non-blocking IO, он полагается на blocking calls операционной системы.
В реактивных приложениях (WebFlux) использование JDBC — антипаттерн: если контроллер возвращает Mono, но внутри — blocking JDBC, весь выигрыш теряется. Поток из event-loop (Netty) блокируется, снижая throughput (пропускную способность). Вот почему нужен новый подход.
Проблемы JDBC в реактивном контексте: почему старый стандарт не справляется
Давайте разберём проблемы JDBC подробно, чтобы понять мотивацию R2DBC:
Блокирующая природа: Все операции (connect, query, fetch) — синхронны. В асинхронном коде это требует обёрток вроде CompletableFuture или offload на отдельный пул (Schedulers.boundedElastic()), но это хак: теряется истинная реактивность, и под нагрузкой пулы переполняются.
Отсутствие backpressure: ResultSet — pull-модель (next() получает данные), но без контроля темпа. Если результат огромный (миллионы строк), буфер переполняется, рискуя OOM (OutOfMemoryError). В реактивном мире (push с backpressure) это несовместимо.
Управление соединениями: JDBC полагается на пулы (HikariCP), но они ориентированы на blocking: соединение "занято" весь запрос. В реактиве нужно multiplexing — одно соединение для многих операций асинхронно.
Транзакции: @Transactional в Spring работает, но в реактиве требует специальной поддержки (reactive transactions), иначе — блокировки.
Масштабируемость: Под 10k+ RPS (requests per second) с БД-запросами JDBC требует огромных пулов потоков (тысячи), что жрёт память (каждый поток ~1MB стека) и контекст-свичинг.
Интеграция с Reactor: Нет native Publisher — результаты не "текут" как Flux, требуя ручной конвертации, что добавляет boilerplate и риски.
В итоге, JDBC — отличный для legacy или низконагруженных приложений, но в микросервисах с WebFlux он "ломает" реактивный стек, возвращая к болям callback-ада и ожиданий.
Введение в R2DBC: реактивный стандарт для реляционных БД
R2DBC — это спецификация (с 2019 года, под эгидой Spring и Pivotal), определяющая API для доступа к реляционным БД в реактивном стиле. Это не замена JDBC, а параллельный стандарт, ориентированный на non-blocking IO.
#Java #middle #Reactor #WebFlux #Mono #Flux #R2DBC
R2DBC vs JDBC: реактивные базы данных
Исторический контекст: что такое JDBC и почему он доминировал десятилетиями
JDBC — это стандартный API Java для доступа к реляционным базам данных, появившийся ещё в JDK 1.1 (1997 год).
Он позволяет выполнять SQL-запросы, управлять соединениями и обрабатывать результаты через унифицированный интерфейс, независимо от конкретной БД (PostgreSQL, MySQL, Oracle и т.д.).
Ключевые компоненты JDBC:
DriverManager или DataSource: Для получения соединения (Connection).
Statement/PreparedStatement: Для выполнения SQL (executeQuery, executeUpdate).
ResultSet: Для чтения результатов (next(), getString() и т.д.).
Transaction management: commit(), rollback().
Пример простого JDBC-кода:
import java.sql.*;
public class JdbcExample {
public static void main(String[] args) {
try (Connection conn = DriverManager.getConnection("jdbc:postgresql://localhost:5432/db", "user", "pass");
PreparedStatement stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?")) {
stmt.setLong(1, 1L);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
System.out.println("User: " + rs.getString("name"));
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
Это синхронно и блокирующе: executeQuery() "виснет" до ответа от БД, блокируя текущий поток.
В традиционных приложениях (как Spring MVC) это работало: каждый запрос — отдельный поток из пула (например, Tomcat с 200 потоками), и если БД отвечает быстро, проблем нет. Но под высокой нагрузкой или с медленными запросами (сетевые задержки, сложные джойны) пул исчерпывается: потоки "спят" в ожидании IO, CPU простаивает, а новые запросы ждут в очереди, вызывая таймауты и отказы. Это классическая проблема асинхронщины: JDBC не предназначен для non-blocking IO, он полагается на blocking calls операционной системы.
В реактивных приложениях (WebFlux) использование JDBC — антипаттерн: если контроллер возвращает Mono, но внутри — blocking JDBC, весь выигрыш теряется. Поток из event-loop (Netty) блокируется, снижая throughput (пропускную способность). Вот почему нужен новый подход.
Проблемы JDBC в реактивном контексте: почему старый стандарт не справляется
Давайте разберём проблемы JDBC подробно, чтобы понять мотивацию R2DBC:
Блокирующая природа: Все операции (connect, query, fetch) — синхронны. В асинхронном коде это требует обёрток вроде CompletableFuture или offload на отдельный пул (Schedulers.boundedElastic()), но это хак: теряется истинная реактивность, и под нагрузкой пулы переполняются.
Отсутствие backpressure: ResultSet — pull-модель (next() получает данные), но без контроля темпа. Если результат огромный (миллионы строк), буфер переполняется, рискуя OOM (OutOfMemoryError). В реактивном мире (push с backpressure) это несовместимо.
Управление соединениями: JDBC полагается на пулы (HikariCP), но они ориентированы на blocking: соединение "занято" весь запрос. В реактиве нужно multiplexing — одно соединение для многих операций асинхронно.
Транзакции: @Transactional в Spring работает, но в реактиве требует специальной поддержки (reactive transactions), иначе — блокировки.
Масштабируемость: Под 10k+ RPS (requests per second) с БД-запросами JDBC требует огромных пулов потоков (тысячи), что жрёт память (каждый поток ~1MB стека) и контекст-свичинг.
Интеграция с Reactor: Нет native Publisher — результаты не "текут" как Flux, требуя ручной конвертации, что добавляет boilerplate и риски.
В итоге, JDBC — отличный для legacy или низконагруженных приложений, но в микросервисах с WebFlux он "ломает" реактивный стек, возвращая к болям callback-ада и ожиданий.
Введение в R2DBC: реактивный стандарт для реляционных БД
R2DBC — это спецификация (с 2019 года, под эгидой Spring и Pivotal), определяющая API для доступа к реляционным БД в реактивном стиле. Это не замена JDBC, а параллельный стандарт, ориентированный на non-blocking IO.
#Java #middle #Reactor #WebFlux #Mono #Flux #R2DBC
👍1
Ключевые идеи:
Publisher-based API: Все операции возвращают Publisher (Mono/Flux из Reactive Streams): Connection как Mono<Connection>, Statement.execute() как Flux<Row>.
Non-blocking от начала до конца: Использует асинхронные драйверы (для PostgreSQL, MySQL и т.д.), где соединения мультиплексируются — одно для многих запросов.
Backpressure встроено: Результаты (Flux<Row>) уважают request(n): если подписчик не готов, БД не шлёт данные, избегая перегрузки.
Транзакции реактивные: Поддержка @Transactional с Mono/Flux.
Интеграция с экосистемой: Spring Data R2DBC — аналог Spring Data JPA, с репозиториями, @Query и CRUD.
Драйверы: r2dbc-postgresql, r2dbc-mysql и т.д. — реализуют спецификацию, используя неблокирующие сокеты (Netty или аналог).
Пример базового R2DBC-кода (без Spring):
Здесь usingWhen — реактивный try-with-resources: создаёт соединение асинхронно, выполняет запрос как Flux<Result>, map извлекает данные. Нет блокировок: если БД медленная, поток свободен.
Spring Data R2DBC: упрощение с репозиториями и аннотациями
Spring Data R2DBC — модуль, который абстрагирует R2DBC, как Spring Data JPA для JDBC.
Добавьте зависимость:
Настройте в application.properties:
Репозитории:
Сущность:
В сервисе/контроллере:
В контроллере:
Это декларативно: repo.findAll() — Flux, который "течёт" из БД без блокировок. Транзакции: @Transactional на методе — reactive, rollback асинхронно.
Расширенный пример: пагинация с ReactiveSortingRepository и Pageable.
Практические советы и подводные камни
Выбор БД: PostgreSQL — лучший для R2DBC (полная поддержка async).
Тестирование: Embedded H2 с r2dbc-h2, ReactiveTest для StepVerifier.
Камень: Нет full ORM (как JPA entities с relations) — используйте ручные joins или Spring Data Projections.
Камень: Транзакции не поддерживают propagation в nested методах fully — будьте осторожны.
Совет: Для hybrid (JDBC + R2DBC) — используйте разные DataSource, но избегайте в одном приложении.
Совет: Мониторьте с Micrometer: метрики на запросы, соединения.
#Java #middle #Reactor #WebFlux #Mono #Flux #R2DBC
Publisher-based API: Все операции возвращают Publisher (Mono/Flux из Reactive Streams): Connection как Mono<Connection>, Statement.execute() как Flux<Row>.
Non-blocking от начала до конца: Использует асинхронные драйверы (для PostgreSQL, MySQL и т.д.), где соединения мультиплексируются — одно для многих запросов.
Backpressure встроено: Результаты (Flux<Row>) уважают request(n): если подписчик не готов, БД не шлёт данные, избегая перегрузки.
Транзакции реактивные: Поддержка @Transactional с Mono/Flux.
Интеграция с экосистемой: Spring Data R2DBC — аналог Spring Data JPA, с репозиториями, @Query и CRUD.
Драйверы: r2dbc-postgresql, r2dbc-mysql и т.д. — реализуют спецификацию, используя неблокирующие сокеты (Netty или аналог).
Пример базового R2DBC-кода (без Spring):
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import reactor.core.publisher.Flux;
public void createConnectionFactory () {
ConnectionFactory factory = ConnectionFactories.get("r2dbc:postgresql://localhost:5432/db?username=user&password=pass");
Flux<String> namesFlux = Flux.usingWhen(
factory.create(), // Асинхронно создать соединение
conn -> conn.createStatement("SELECT name FROM users").execute().flatMap(result -> result.map((row, metadata) -> row.get("name", String.class))),
conn -> conn.close() // Асинхронно закрыть
);
namesFlux.subscribe(System.out::println); // Строки приходят асинхронно
}
Здесь usingWhen — реактивный try-with-resources: создаёт соединение асинхронно, выполняет запрос как Flux<Result>, map извлекает данные. Нет блокировок: если БД медленная, поток свободен.
Spring Data R2DBC: упрощение с репозиториями и аннотациями
Spring Data R2DBC — модуль, который абстрагирует R2DBC, как Spring Data JPA для JDBC.
Добавьте зависимость:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId> <!-- Для PostgreSQL -->
</dependency>
Настройте в application.properties:
spring.r2dbc.url=r2dbc:postgresql://localhost:5432/db
spring.r2dbc.username=user
spring.r2dbc.password=pass
Репозитории:
ReactiveRepository extends ReactiveCrudRepository<Entity, ID>.
Сущность:
@Entity
public class User {
@Id
private Long id;
private String name;
// Getters/setters
}
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
@Query("SELECT * FROM users WHERE name LIKE :name")
Flux<User> findByNameLike(String name);
}
В сервисе/контроллере:
@Service
public class UserService {
private final UserRepository repo;
public UserService(UserRepository repo) {
this.repo = repo;
}
public Flux<User> findAll() {
return repo.findAll(); // Flux асинхронно
}
public Mono<User> save(User user) {
return repo.save(user);
}
}
В контроллере:
@GetMapping("/users")
public Flux<User> getAllUsers() {
return userService.findAll();
}Это декларативно: repo.findAll() — Flux, который "течёт" из БД без блокировок. Транзакции: @Transactional на методе — reactive, rollback асинхронно.
Расширенный пример: пагинация с ReactiveSortingRepository и Pageable.
public interface UserRepository extends ReactiveSortingRepository<User, Long> {}
Flux<User> paged = repo.findAll(Sort.by("name").ascending()).skip(10).take(20); // Простая пагинация
Для complex: используйте @Query с параметрами, или Criteria API.Практические советы и подводные камни
Выбор БД: PostgreSQL — лучший для R2DBC (полная поддержка async).
Тестирование: Embedded H2 с r2dbc-h2, ReactiveTest для StepVerifier.
Камень: Нет full ORM (как JPA entities с relations) — используйте ручные joins или Spring Data Projections.
Камень: Транзакции не поддерживают propagation в nested методах fully — будьте осторожны.
Совет: Для hybrid (JDBC + R2DBC) — используйте разные DataSource, но избегайте в одном приложении.
Совет: Мониторьте с Micrometer: метрики на запросы, соединения.
#Java #middle #Reactor #WebFlux #Mono #Flux #R2DBC
👍1
Реактивное программирование
Горячие и холодные Publisher’ы в реактивном программировании
Publisher — это источник данных в Reactive Streams, который "толкает" элементы подписчикам (Subscriber). Но не все Publisher’ы одинаковы по поведению при множественных подписках. Это зависит от того, генерирует ли он данные независимо от подписчиков (горячий) или заново для каждого (холодный).
Холодный Publisher (Cold Publisher): Данные генерируются лениво — только при подписке, и для каждого подписчика отдельно. Это как видео по запросу: каждый зритель получает свою копию потока. Плюс: свежие данные каждый раз. Минус: если источник дорогой (запрос в БД, вычисления), повторяется зря.
Горячий Publisher (Hot Publisher): Данные генерируются независимо от подписчиков — поток "вещает" непрерывно. Подписчики "подключаются" к существующему потоку и получают данные с момента подписки. Это как живой эфир: все слушают одно и то же, но опоздавшие пропускают начало. Плюс: экономия ресурсов (один источник). Минус: данные могут быть "старыми" или пропущенными.
В Project Reactor большинство конструкторов — холодные (just, fromIterable, range), но есть горячие (interval, push). Поведение можно менять операторами (share, cache).
Примеры холодных Publisher’ов: ленивость и независимость
Холодные — default в Reactor: подписка запускает генерацию заново.
Пример с Mono (одиночный элемент, но принцип тот же):
С Flux:
Примеры горячих Publisher’ов: общий поток и вещание
Горячие — генерируют данные один раз, подписчики "присоединяются".
Пример с Flux.push (горячий по дизайну):
Другой горячий:
#Java #middle #Reactor #WebFlux #Mono #Flux
Горячие и холодные Publisher’ы в реактивном программировании
Publisher — это источник данных в Reactive Streams, который "толкает" элементы подписчикам (Subscriber). Но не все Publisher’ы одинаковы по поведению при множественных подписках. Это зависит от того, генерирует ли он данные независимо от подписчиков (горячий) или заново для каждого (холодный).
Холодный Publisher (Cold Publisher): Данные генерируются лениво — только при подписке, и для каждого подписчика отдельно. Это как видео по запросу: каждый зритель получает свою копию потока. Плюс: свежие данные каждый раз. Минус: если источник дорогой (запрос в БД, вычисления), повторяется зря.
Горячий Publisher (Hot Publisher): Данные генерируются независимо от подписчиков — поток "вещает" непрерывно. Подписчики "подключаются" к существующему потоку и получают данные с момента подписки. Это как живой эфир: все слушают одно и то же, но опоздавшие пропускают начало. Плюс: экономия ресурсов (один источник). Минус: данные могут быть "старыми" или пропущенными.
В Project Reactor большинство конструкторов — холодные (just, fromIterable, range), но есть горячие (interval, push). Поведение можно менять операторами (share, cache).
Примеры холодных Publisher’ов: ленивость и независимость
Холодные — default в Reactor: подписка запускает генерацию заново.
Пример с Mono (одиночный элемент, но принцип тот же):
Mono<String> coldMono = Mono.fromCallable(() -> {
System.out.println("Генерация данных...");
return "Данные";
});
coldMono.subscribe(System.out::println); // Вывод: "Генерация данных..." и "Данные"
coldMono.subscribe(System.out::println); // Снова: "Генерация данных..." и "Данные"
Каждый subscribe() вызывает fromCallable заново — данные свежие, но если это запрос в API, будет два вызова.С Flux:
Flux<Integer> coldFlux = Flux.range(1, 3).doOnSubscribe(sub -> System.out.println("Новая подписка!"));
coldFlux.subscribe(val -> System.out.println("Подписчик 1: " + val));
coldFlux.subscribe(val -> System.out.println("Подписчик 2: " + val));
// Вывод: "Новая подписка!" + 1,2,3 для первого; "Новая подписка!" + 1,2,3 для второго
Каждый подписчик получает полный поток независимо. Полезно для idempotent операций (без side-effects), как чтение статичных данных.
Асинхронный пример: coldFlux = Flux.interval(Duration.ofSeconds(1)).take(3); // Каждый subscribe() запускает свой таймер.Примеры горячих Publisher’ов: общий поток и вещание
Горячие — генерируют данные один раз, подписчики "присоединяются".
Пример с Flux.push (горячий по дизайну):
ConnectableFlux<Integer> hotFlux = Flux.push(sink -> {
// Симулируем внешний источник
new Thread(() -> {
for (int i = 1; i <= 3; i++) {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
sink.next(i);
}
sink.complete();
}).start();
});
hotFlux.subscribe(val -> System.out.println("Подписчик 1: " + val));
Thread.sleep(1500); // Ждём, чтобы пропустить начало
hotFlux.subscribe(val -> System.out.println("Подписчик 2: " + val));
hotFlux.connect(); // Запуск горячего
// Вывод примерно: Подписчик 1: 1 (1с), 2 (2с), 3 (3с); Подписчик 2: 2 (присоединился после 1), 3
Второй пропустил 1 — поток общий. Connect() — триггер для ConnectableFlux (обёртка для горячих).Другой горячий:
Flux.interval(Duration.ofSeconds(1)) — бесконечный таймер, вещает независимо.
Оператор share(): Делает холодный горячим.
Flux<Integer> shared = coldFlux.share();
shared.subscribe(...); // Запускает
shared.subscribe(...); // Присоединяется к существующему
#Java #middle #Reactor #WebFlux #Mono #Flux
👍2
Переключение типов: операторы для контроля
Из холодного в горячий: share() (для Flux), cache() (кэширует элементы для повторов), publish() (ConnectableFlux с backpressure).
Пример cache:
coldMono.cache() — первый subscribe генерирует, последующие — из кэша.
Из горячего в холодный: Редко нужно, но replay() на ConnectableFlux кэширует историю для новых подписчиков.
Сценарии:
Холодный: Запросы к БД (каждый клиент — свежие данные).
Горячий: Мониторинг (один сенсор — всем подписчикам), стриминг событий (Kafka topic).
Практические советы и подводные камни
Диагностика: doOnSubscribe(() -> log("Subscribe")) — увидите, сколько раз запускается.
Камень: Холодный с side-effects (мутации) — непредсказуемо при множественных подписках; используйте defer() для ленивости.
Камень: Горячий бесконечный без take() — утечки; добавьте refCount() на publish() для авто-отписки при 0 подписчиках.
Совет: В WebFlux — Flux из БД (R2DBC, пост 17) холодный по умолчанию; share() для кэширования результатов.
Тестирование: StepVerifier с .publish() для симуляции горячих.
#Java #middle #Reactor #WebFlux #Mono #Flux
Из холодного в горячий: share() (для Flux), cache() (кэширует элементы для повторов), publish() (ConnectableFlux с backpressure).
Пример cache:
coldMono.cache() — первый subscribe генерирует, последующие — из кэша.
Из горячего в холодный: Редко нужно, но replay() на ConnectableFlux кэширует историю для новых подписчиков.
Сценарии:
Холодный: Запросы к БД (каждый клиент — свежие данные).
Горячий: Мониторинг (один сенсор — всем подписчикам), стриминг событий (Kafka topic).
Практические советы и подводные камни
Диагностика: doOnSubscribe(() -> log("Subscribe")) — увидите, сколько раз запускается.
Камень: Холодный с side-effects (мутации) — непредсказуемо при множественных подписках; используйте defer() для ленивости.
Камень: Горячий бесконечный без take() — утечки; добавьте refCount() на publish() для авто-отписки при 0 подписчиках.
Совет: В WebFlux — Flux из БД (R2DBC, пост 17) холодный по умолчанию; share() для кэширования результатов.
Тестирование: StepVerifier с .publish() для симуляции горячих.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍2