Java for Beginner
742 subscribers
714 photos
201 videos
12 files
1.16K links
Канал от новичков для новичков!
Изучайте Java вместе с нами!
Здесь мы обмениваемся опытом и постоянно изучаем что-то новое!

Наш YouTube канал - https://www.youtube.com/@Java_Beginner-Dev

Наш канал на RUTube - https://rutube.ru/channel/37896292/
Download Telegram
Реактивное программирование

Концепции реактивного программирования: Reactive Streams API — Publisher и Subscriber

Сегодня разберём Reactive Streams API — это спецификация, которая лежит в сердце реактивного программирования на Java. Она не просто набор интерфейсов, а рамка для построения асинхронных потоков данных с контролем над ними. Представьте это как правила дорожного движения для потока событий: без них — хаос и пробки, с ними — плавный поток.

Reactive Streams API решает ключевые проблемы из первого поста: вместо тяжёлых потоков и callback-ада, мы получаем унифицированный способ обмена данными между компонентами. Это не библиотека, а интерфейсы, которые реализуют фреймворки вроде Project Reactor или RxJava. Они обеспечивают совместимость: один компонент от Reactor может работать с другим от Akka Streams.


Что такое Reactive Streams API


Спецификация Reactive Streams появилась в 2015 году как ответ на хаос асинхронности. До неё каждый фреймворк изобретал велосипед: свои способы обработки потоков, ошибок и давления.
API стандартизирует это: четыре интерфейса (Publisher, Subscriber, Subscription, Processor), которые описывают, как данные текут асинхронно. Главная идея — неблокирующая коммуникация: ничего не ждём синхронно, всё через события.


Это подводит нас ближе к реактивному мышлению: вместо "запроси и жди" (как в Future.get()), мы "подпишись и реагируй". Под нагрузкой это экономит ресурсы: один поток (event-loop) может обслуживать тысячи подписок, без создания новых на каждую задачу. В итоге, системы становятся более отзывчивыми и масштабируемыми — идеально для микросервисов или реального времени.


Publisher: источник данных, который отправляет события

Publisher — это интерфейс для издателя, который генерирует поток данных.
Он как фабрика событий: производит элементы (любые объекты), ошибки или сигнал завершения. Publisher не знает, кто его слушает, — он просто готов отправлять (push) данные, когда они появятся.


Интерфейс прост: всего один метод subscribe(Subscriber s). Когда вы вызываете его, издатель регистрирует подписчика и начинает процесс. Но данные не льются сразу — всё под контролем (об этом ниже).

Пример простого издателя в Project Reactor (который реализует Reactive Streams):
import reactor.core.publisher.Flux; // Flux реализует Publisher

Flux<Integer> publisher = Flux.range(1, 5); // Издатель: поток от 1 до 5


Здесь publisher — источник.
Он может быть асинхронным: например, читать из сети или БД. Когда данные готовы, он отправляет их подписчику через onNext(элемент).
Если ошибка — onError(Throwable).
Если конец — onComplete().

Почему это лучше традиционных подходов? В отличие от потоков (где каждый запрос — отдельный тяжёлый объект), publisher лёгкий: он не выделяет ресурсы заранее, а реагирует на подписку. Нет блокировок: если данных нет, ничего не происходит.


#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber #Publisher
👍1
Subscriber: получатель, который реагирует на события

Subscriber — интерфейс для подписчика, который "слушает" издателя. Он как потребитель на конвейере: получает элементы и решает, что с ними делать.

Методы:
- onSubscribe(Subscription s): вызывается сразу после подписки. Здесь подписчик получает подписку — объект для контроля.
- onNext(T item): срабатывает на каждый элемент. Здесь обработка: логика, трансформация и т.д.
- onError(Throwable t): если ошибка — обработка исключения.
- onComplete(): поток завершён успешно.


Подписчик пассивен: не получает данные (pull), а ждёт push. Это решает callback-ад из CompletableFuture — реакции в одном месте, код чище.

Пример подписки:
publisher.subscribe(new Subscriber<Integer>() {
private Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1); // Запрашиваем первый элемент
}

@Override
public void onNext(Integer item) {
System.out.println("Получено: " + item);
subscription.request(1); // Запрашиваем следующий
}

@Override
public void onError(Throwable t) {
System.err.println("Ошибка: " + t);
}

@Override
public void onComplete() {
System.out.println("Завершено");
}
});


Здесь subscribe() связывает publisher и subscriber. Данные отправляются по одному, по запросу (request(1)). Это асинхронно: код после subscribe() продолжается сразу, без ожидания.

В Reactor есть BaseSubscriber для упрощения — переопределяйте только нужные методы.


Subscription: мост контроля с обратным давлением

Subscription — ключ к управлению: это объект, который подписчик получает в onSubscribe.

Методы:
- request(long n): "Дай мне n элементов". Это обратное давление (backpressure) — подписчик контролирует темп, чтобы не захлебнуться.
- cancel(): "Хватит, отпишись".


Без этого publisher мог бы зафлудить подписчика данными. Например, если источник — бесконечный поток (сенсоры), без request(n) — переполнение памяти.

В примере выше request(1) делает обработку последовательной: получил — обработал — запросил следующий. Для скорости — request(Long.MAX_VALUE) (неограниченно), но осторожно: рискуете буфером.

Это решает проблемы блокировок: вместо висящих потоков, всё в event-loop. Под нагрузкой — graceful degradation (грациозная деградация): если подписчик медленный, publisher замедляется, а не падает.


Processor: промежуточный звено для трансформаций

Processor — комбо: реализует и Publisher, и Subscriber. Он как фильтр в конвейере: принимает данные от одного издателя, обрабатывает и отправляет дальше.
Пример — в цепочках Flux: map() или filter() создают процессоры внутри.


В практике вы редко пишете свой Processor — библиотеки предоставляют готовые операторы. Но понимание помогает: весь конвейер — цепь publisher → processor → ... → subscriber.


Практические советы и подводные камни


- Всегда управляйте подпиской: без request() данные не потекут (по умолчанию unbounded — неограниченно, но лучше явно).
- Обрабатывайте ошибки: onError — ваш спасатель, чтобы не потерять исключения.
- Тестируйте с TestSubscriber (в
Reactor): симулируйте сценарии.
- Камень: если в onNext блокирующий код — сломаете асинхронность. Используйте Schedulers для offload (перенос на другой поток).


В реальном коде: в Spring WebFlux контроллеры возвращают Flux/Mono — publisher'ы, клиенты подписываются.


#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber
👍1
Реактивное программирование

Концепции реактивного программирования: Backpressure — что делать, если данных слишком много


Представьте: источник данных — как бурная река, а ваш подписчик — маленькая лодка. Если вода хлынет слишком быстро в лодку, она утонет.
Здесь на помощь приходит backpressure (обратное давление) — механизм, который позволяет подписчику контролировать скорость потока, говоря "дай мне столько, сколько я могу переварить". И это не просто тормоз, а умный регулятор, который предотвращает перегрузки и делает реактивные системы устойчивыми под любой нагрузкой.


Backpressure — ключевая фишка реактивного программирования, которая отличает его от традиционных подходов. В старых моделях (как потоки или Future) вы либо блокируете всё, либо тонете в очередях данных, рискуя исчерпать память или CPU. Здесь же контроль у потребителя: он решает, когда и сколько брать. Это решает проблемы callback-ада и блокировок, позволяя строить масштабируемые приложения — от мобильных до облачных кластеров.


Что такое backpressure и почему оно нужно?

Обратное давление — это способ, при котором получатель данных сигнализирует источнику: "замедлись, если я не успеваю". В реактивном мире данные передаются асинхронно, но без контроля это может привести к проблемам: если издатель генерирует 1 млн элементов в секунду, а подписчик обрабатывает только 100, буфер переполнится, и приложение упадёт с ошибкой "израсходована память" (OutOfMemoryError).

Backpressure вводит "обратную связь": подписчик запрашивает элементы порциями, а издатель выдаёт ровно столько.


Это встроено в Reactive Streams: в методе onSubscribe подписчик получает Subscription и использует request(long n) — "запроси n элементов". Если не запросить — данные не потекут. Это как шлюзы на реке: открываешь по мере нужды, избегая наводнения. В отличие от pull-модели (где вы тянете всё сразу), здесь баланс: push для динамики, но с контролем.

Пример базового использования в Flux (поток для множества элементов):
import reactor.core.publisher.Flux;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

Flux<Integer> fastPublisher = Flux.range(1, 1000); // Быстрый источник: 1000 элементов

fastPublisher.subscribe(new Subscriber<Integer>() {
private Subscription sub;

@Override
public void onSubscribe(Subscription s) {
sub = s;
sub.request(5); // Сначала запрашиваем 5 элементов
}

@Override
public void onNext(Integer item) {
System.out.println("Обработано: " + item);
// Симулируем медленную обработку: Thread.sleep(100); (но в реальности избегайте блокировок!)
sub.request(1); // После каждого — запрашиваем следующий
}

@Override
public void onError(Throwable t) { /* обработка */ }
@Override
public void onComplete() { System.out.println("Готово"); }
});

Здесь подписчик контролирует темп: запросил 5 — получил 5, обработал — запросил ещё. Если не request() — поток остановится. Это асинхронно: издатель не блокируется, а ждёт сигналов.



#Java #middle #Reactor #Reactive_Streams_API #backpressure
👍2
Стратегии обработки backpressure: что если запросов мало?

Не всегда подписчик может обрабатывать всё. Reactive Streams не диктует, что делать при переполнении — это на усмотрение реализации (как Reactor).

Вот стратегии, чтобы не упасть:
- BUFFER (буферизация): Копирует лишние элементы в очередь (буфер). Полезно для временных пиков, но рискуете памятью при бесконечном потоке. В Reactor: .onBackpressureBuffer().
- DROP (сброс): Игнорирует лишние элементы, пока подписчик не запросит. Для сценариев, где свежие данные важнее старых (например, мониторинг). .onBackpressureDrop().
- LATEST (последний): Сохраняет только самый свежий элемент, сбрасывая предыдущие. Идеально для UI-обновлений (как курс валют). .onBackpressureLatest().
- ERROR (ошибка): Если буфер полон — кидает исключение (IllegalStateException). Для строгих систем, где потеря данных недопустима. .onBackpressureError().


Пример с DROP в
Reactor:
Flux.interval(Duration.ofMillis(1)) // Бесконечный поток: элемент каждую мс
.onBackpressureDrop(dropped -> System.out.println("Сброшено: " + dropped)) // Стратегия: сбрасываем лишнее
.subscribe(item -> {
try { Thread.sleep(100); } catch (InterruptedException e) {} // Медленный подписчик
System.out.println("Обработано: " + item);
});

Здесь быстрый Flux "замедляется" под подписчика: лишние элементы сбрасываются, система не падает.

Ещё опция — cancel(): подписчик может полностью отменить подписку, если данных слишком много или не нужно.



Практические советы и подводные камни

- Всегда вызывайте request() в onSubscribe, иначе ничего не потечёт. Для неограниченного — request(Long.MAX_VALUE), но только если уверены в памяти.
- Избегайте блокировок в onNext: если обработка медленная, offload на другой Scheduler (в
Reactor: .publishOn(Schedulers.parallel())).
- Тестируйте под нагрузкой: используйте TestPublisher/TestSubscriber для симуляции быстрых/медленных сценариев.
- Камень: бесконечные потоки без стратегии — рецепт OOM. Всегда добавляйте .onBackpressureBuffer(размер) или drop.


В реальной жизни: в Spring WebFlux backpressure работает сквозь стек — от БД (reactive драйверы) до клиента. Например, стриминг больших файлов: клиент запрашивает chunks, сервер толкает по мере.

#Java #middle #Reactor #Reactive_Streams_API #backpressure
👍3
Знакомство с Project Reactor: Mono и Flux

Project
Reactor — это не просто обёртка, а полноценный фреймворк от команды Spring, оптимизированный для производительности. Он использует неблокирующий подход: всё работает на основе событий, с минимальным потреблением ресурсов. Ключевые типы — Mono и Flux, которые воплощают потоки данных. Mono для случаев с нуля или одним элементом (как одиночный запрос), Flux для последовательностей (как стриминг). Они реализуют Publisher из Reactive Streams, так что поддерживают подписку, реакции и обратное давление.


Что такое Project Reactor и как начать?

Project Reactor — библиотека для реактивного программирования, которая строит на Reactive Streams. Она предоставляет API для создания, трансформации и потребления асинхронных потоков. Под капотом — эффективный scheduler (планировщик задач), который распределяет работу по потокам без блокировок: использует event-loop для IO и параллельные пулы для вычислений.

Чтобы начать добавьте в pom.xml (Maven):
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.0</version> <!-- версия может быть неактуальной -->
</dependency>


Импортируйте:
import reactor.core.publisher.Mono; import reactor.core.publisher.Flux;


Reactor интегрируется с Spring (WebFlux), но работает standalone. Главное преимущество: код становится декларативным — вы описываете "что" (поток и реакции), а не "как" (ожидания и циклы). Это решает callback-ад: цепочки читаемы, как последовательный код, но асинхронны.


Mono: поток для нуля или одного элемента

Mono — это тип для сценариев, где ожидается максимум один результат: успех (элемент), ошибка или ничего. Идеален для HTTP-запросов, чтения записи из БД или вычислений с одиночным исходом. Mono реализует Publisher, так что вы можете подписаться и реагировать.

Создание Mono: используйте статические методы.
- Mono.just(value): из готового значения.
- Mono.empty(): пустой поток (завершится onComplete без onNext).
- Mono.fromCallable(() -> compute()): из синхронного вызова.
- Mono.defer(() -> createMono()): ленивое создание (выполняется при подписке).


Пример простого Mono:
Mono<String> simpleMono = Mono.just("Привет из Reactor");

Подписка: subscribe() вызывает реакции.

simpleMono.subscribe(
value -> System.out.println("Значение: " + value), // onNext: реакция на элемент
error -> System.err.println("Ошибка: " + error), // onError
() -> System.out.println("Завершено") // onComplete
);

Здесь: "Значение: Привет из Reactor" и "Завершено". Подписка асинхронна — код после subscribe() идёт сразу, без ожидания. Если ошибка (например, Mono.error(new RuntimeException("Бум"))), сработает onError.


Асинхронный пример: симулируем задержку.
Mono<String> asyncMono = Mono.delay(Duration.ofSeconds(1)).map(ignore -> "Готово после секунды");

asyncMono.subscribe(System.out::println); // Вывод после 1 сек


Почему Mono лучше Future/CompletableFuture?
Нет блокирующего get(): результат приходит в onNext. Нет callback-ада: цепочки через операторы (map, flatMap — разберём в следующих постах). Под нагрузкой: тысячи Mono на одном потоке, без исчерпания пула.



#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
👍4
Flux: поток для нуля, одного или множества элементов

Flux — для последовательностей: от конечных (список) до бесконечных (стриминг). Может быть пустым, с одним элементом (как Mono) или миллионами. Flux тоже Publisher, поддерживает backpressure.

Создание Flux:
- Flux.just(a, b, c): из значений.
- Flux.fromIterable(list): из коллекции.
- Flux.range(start, count): последовательность чисел.
- Flux.interval(Duration): бесконечный таймер.
- Flux.generate(sink -> {
sink.next(value); sink.complete(); }): генератор с состоянием.

Пример базового Flux:
Flux<Integer> numbersFlux = Flux.range(1, 5);

numbersFlux.subscribe(
value -> System.out.println("Элемент: " + value), // onNext для каждого
error -> System.err.println("Ошибка: " + error),
() -> System.out.println("Завершено")
);

Вывод: 1, 2, 3, 4, 5 и "Завершено". Асинхронно: элементы "текут" по мере готовности.


С backpressure используйте BaseSubscriber для контроля.

numbersFlux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(2); // Запрашиваем по 2 элемента
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Получено: " + value);
request(1); // После обработки — следующий
}
});

Это предотвращает перегрузку: Flux выдаёт элементы порциями.


Асинхронный Flux: стриминг с задержкой.
Flux<Long> tickingFlux = Flux.interval(Duration.ofMillis(500)).take(5); // 5 тиков каждые 0.5 сек

tickingFlux.subscribe(System.out::println); // 0, 1, 2, 3, 4 с паузами


Почему Flux лучше потоков или циклов?
Масштабируемость: обрабатывает миллионы элементов без лишних ресурсов. Реактивность: реагируйте на каждый элемент, без буферов в памяти. Интеграция с обратным давлением: если подписчик медленный, Flux замедляется.



Практические советы и подводные камни

- Ленивость: Mono/Flux не выполняются без subscribe(). Полезно для отложенного вычисления.
- Ошибки: всегда обрабатывайте onError, иначе они "проглатываются".
- Блокировки: избегайте Thread.sleep() в реакциях — используйте delayElements() для асинхронных пауз.
- Тестирование: используйте StepVerifier из
reactor-test: StepVerifier.create(flux).expectNext(1,2).verifyComplete();
- Камень: бесконечный Flux без take() или limitRequest() — рискуете памятью. Добавляйте .onBackpressureBuffer() или drop.

В реальной практике: в WebFlux контроллер возвращает Mono<Response> для GET, Flux<Event> для SSE (сервер-сент события).



#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
👍5
Реактивное программирование

Подписка и жизненный цикл в Reactor: onNext, onError, onComplete

Reactor делает код декларативным — вы описываете поток событий, а не управляете ожиданиями вручную.
Но чтобы потоки "ожили", нужна подписка: это момент, когда издатель начинает передавать данные, а подписчик реагирует. Подписка и жизненный цикл в
Reactor - это фундамент: без понимания, как срабатывают onNext (реакция на элемент), onError (обработка ошибки) и onComplete (завершение), ваши реактивные конвейеры останутся мёртвыми.
Представьте жизненный цикл как этапы реки: подписка — запуск потока, onNext — течение воды, onError — буря, onComplete — устье.


Подписка — это связь между издателем (Publisher, как Mono/Flux) и подписчиком (Subscriber). Она запускает поток данных асинхронно, без блокировок. Жизненный цикл определяет порядок событий: от старта до конца или ошибки. Это решает проблемы из первого поста — вместо callback-ада и ручных get(), вы получаете структурированные реакции. Всё построено на Reactive Streams API, с обратным давлением.


Подписка: запуск потока данных

В Reactor Mono или Flux — ленивые: они ничего не делают, пока не подпишетесь. Метод subscribe() — это триггер: он регистрирует подписчика и начинает передавать события (push, из поста 3).
Подписка асинхронна: после subscribe() код продолжается сразу, без ожидания завершения.


Базовые варианты subscribe():
subscribe(): без параметров — данные "проглатываются", но поток запускается. Полезно для fire-and-forget (запустил и забыл).
subscribe(Consumer<T> onNext): только реакция на элементы.
subscribe(Consumer<T> onNext, Consumer<Throwable> onError): плюс обработка ошибок.
subscribe(Consumer<T> onNext, Consumer<Throwable> onError, Runnable onComplete): полный цикл.
subscribe(Subscriber<T> subscriber): кастомный подписчик для полного контроля (с onSubscribe для Subscription).


Пример простейшей подписки на Flux:
import reactor.core.publisher.Flux;

Flux<String> fruitsFlux = Flux.just("яблоко", "банан", "вишня");
fruitsFlux.subscribe(
fruit -> System.out.println("Съедено: " + fruit), // onNext: реакция на каждый элемент
error -> System.err.println("Проблема: " + error.getMessage()), // onError: если ошибка
() -> System.out.println("Фрукты закончились") // onComplete: завершение
);

Вывод: "Съедено: яблоко", "Съедено: банан", "Съедено: вишня", "Фрукты закончились". Если добавить ошибку — Flux.just("яблоко").concatWith(Flux.error(new RuntimeException("Гнилой фрукт"))) — сработает onError, и onComplete не вызовется.


Почему это лучше традиционных? В CompletableFuture вы цепляете thenApply/thenAccept, но рискуете вложенностью. В Reactor subscribe() — точка входа, а реакции — в одном месте. Плюс, подписка возвращает Disposable: объект для отмены (dispose()) в любой момент.

Пример:
Disposable disposable = fruitsFlux.subscribe(...);
disposable.dispose(); // Отмена: поток остановится, onComplete не сработает.
Это полезно для UI или долгоживущих потоков: отпишись, когда компонент уничтожен, чтобы избежать утечек памяти.



#Java #middle #Reactor #Reactive_Streams_API #onNext #onError #onComplet
👍3
Жизненный цикл: этапы от старта до финиша

Жизненный цикл в Reactive Streams — последовательность вызовов: onSubscribe → (onNext)* → (onError | onComplete). Это правило: после onError или onComplete ничего не будет, и поток считается завершённым.

onSubscribe(Subscription s): первый вызов после subscribe(). Здесь подписчик получает Subscription для контроля (request(n) для backpressure или cancel()). Без request() данные не потекут — это защита от перегрузки.
onNext(T item): для каждого элемента. Здесь основная логика: обработка, логирование, трансформация. Может вызываться много раз (в Flux) или раз/никогда (в Mono).
Важно: держите onNext быстрым и неблокирующим — если медленный, перенесите на отдельный планировщик (Schedulers).
onError(Throwable t): если ошибка (исключение). Поток прерывается, onComplete не сработает. Обработайте, чтобы не потерять: логируйте, retry (повторите) или fallback (запасной вариант).
onComplete(): успешное завершение. Нет элементов после, но сигнал, что всё ок.


Пример полного цикла с кастомным подписчиком (BaseSubscriber в
Reactor упрощает):
import reactor.core.publisher.BaseSubscriber;

Flux<Integer> numbersFlux = Flux.range(1, 10).map(i -> {
if (i == 5) throw new RuntimeException("Ошибка на 5"); // Симулируем ошибку
return i;
});

numbersFlux.subscribe(new BaseSubscriber<Integer>() {

@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Подписка готова");
request(3); // Запрашиваем первые 3
}

@Override
protected void hookOnNext(Integer value) {
System.out.println("Элемент: " + value);
request(1); // Запрашиваем по одному дальше
}

@Override
protected void hookOnError(Throwable throwable) {
System.err.println("Ошибка: " + throwable.getMessage());
}

@Override
protected void hookOnComplete() {
System.out.println("Цикл завершён");
}
});

Вывод: "Подписка готова", "Элемент: 1", "Элемент: 2", "Элемент: 3", "Элемент: 4", "Ошибка: Ошибка на 5". onComplete не сработает, потому что ошибка. Без ошибки — все 10 элементов и "Цикл завершён".

Это демонстрирует контроль: request() управляет темпом, как в backpressure (пост 5). Если не request() — только "Подписка готова".


Обработка ошибок и завершения: стратегии для устойчивости

Ошибки — часть жизни: сеть упала, БД не ответила. В Reactor onError — ваш щит. Не игнорируйте: используйте операторы для восстановления.

doOnError(Consumer<Throwable>): дополнительная реакция перед onError подписчика.
onErrorReturn(T value): fallback — верни значение вместо ошибки.
onErrorResume(Function<Throwable, Publisher<T>>): замени на другой поток.
retry(long times): повтори попытку.


Пример с восстановлением:
Mono<String> riskyMono = Mono.fromCallable(() -> {
if (Math.random() > 0.5) throw new RuntimeException("Сбой");
return "Успех";
}).onErrorReturn("Fallback").retry(2); // Retry 2 раза

riskyMono.subscribe(
System.out::println,
error -> System.out.println("Не удалось: " + error),
() -> System.out.println("Завершено")
);

Если сбой — retry, потом fallback. onComplete сработает только при успехе или fallback.

Для onComplete: используйте doFinally(Runnable) — сработает всегда, после onComplete или onError. Полезно для закрытия ресурсов: doFinally(() -> connection.close()).


Практические советы и подводные камни

Всегда реализуйте все методы в Subscriber: иначе дефолтные могут "проглотить" ошибки (onError кидает RuntimeException, если не переопределён).
Используйте Hooks: doOnSubscribe, doOnNext для логирования без изменения потока.
Отмена: dispose() не гарантирует мгновенную остановку — upstream (источник) может продолжить, но данные не дойдут.
Камень: в onNext избегайте блокировок (sleep, IO) — используйте publishOn(Schedulers.boundedElastic()) для переноса.
Тестирование: StepVerifier.create(flux).expectSubscription().expectNext(1,2).expectError().verify(); — проверяет цикл.


В практике: в WebFlux сервис возвращает Flux, клиент subscribe() в контроллере — реакции на события в реальном времени.


#Java #middle #Reactor #Reactive_Streams_API #onNext #onError #onComplet
👍3
Реактивное программирование

Базовые операторы в
Reactor: map, filter, flatMap

Операторы — это методы на Mono/Flux, которые позволяют строить конвейеры: преобразовывать, фильтровать и комбинировать данные асинхронно. Представьте их как звенья в цепи: каждый берёт входной поток, меняет его и передаёт дальше. Сегодня разберём три фундаментальных: map (преобразование элементов), filter (фильтрация) и flatMap (плоское преобразование, для слияния подпотоков). Эти операторы — основа для сложных сценариев, они решают проблемы из первого поста, позволяя писать декларативный код вместо ручных циклов и ожиданий.


Операторы в
Reactor — декларативные: вы описываете, что делать с данными, а библиотека заботится об асинхронности, backpressure и ошибках. Они не меняют исходный поток (иммутабельны), а создают новый. Это делает код читаемым и тестируемым.


Map: простое преобразование элементов

Map — оператор для изменения каждого элемента потока. Он берёт входной элемент, применяет функцию и выдаёт результат. Синхронный: функция должна быть быстрой и без блокировок. Идеален для конвертации типов, вычислений или форматирования.

Пример на Flux:
import reactor.core.publisher.Flux;
Flux<String> originalFlux = Flux.just("яблоко", "банан", "вишня");
Flux<String> transformed = originalFlux.map(fruit -> fruit.toUpperCase()); // Преобразование в верхний регистр
transformed.subscribe(System.out::println); // Вывод: "ЯБЛОКО", "БАНАН", "ВИШНЯ"

Здесь map применяет лямбду к каждому элементу последовательно. Если ошибка в функции — сработает onError.


На Mono:
Mono<Integer> num = Mono.just(5).map(x -> x * 2); // Результат: 10


Почему map полезен? В традиционных подходах (как в CompletableFuture.thenApply) вы строите цепочки, но рискуете вложенностью. В Reactor map делает конвейер линейным: читается как последовательный код, но работает асинхронно. Поддерживает backpressure: если подписчик запрашивает n, map передаёт запрос upstream (источнику).


Filter: отбор элементов по условию

Filter — для пропуска только нужных элементов. Принимает предикат (функцию, возвращающую true/false) и пропускает те, для которых true. Остальные игнорируются — поток "сужается".

Пример на Flux:
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evenNumbers = numbers.filter(num -> num % 2 == 0); // Только чётные
evenNumbers.subscribe(System.out::println); // Вывод: 2, 4, 6, 8, 10

Если поток пустой или ничего не проходит — onComplete сработает без onNext.


На Mono:
Mono<String> word = Mono.just("привет").filter(w -> w.length() > 7); // Не пройдёт — пустой Mono


Filter экономит ресурсы: ненужные элементы не обрабатываются дальше в цепи. В отличие от императивных циклов (где вы фильтруете в for с if), здесь всё асинхронно и с backpressure — запросы передаются источнику только для прошедших элементов.

Комбинация с map: numbers.filter(num -> num > 5).map(num -> num * 10).subscribe(); // 60, 70, 80, 90, 100
Это строит конвейер: фильтр → преобразование, без ручных переменных.



#Java #middle #Reactor #map #filter #flatMap
👍1
FlatMap: плоское преобразование для асинхронных подпотоков

FlatMap — мощный оператор для случаев, когда из одного элемента нужно создать подпоток (Publisher), и слить их в плоский результат. Это как map, но для асинхронных или множественных выходов: он "разворачивает" вложенные потоки. Полезен для запросов в цикле: например, для каждого пользователя — асинхронно запросить данные.


Пример на Flux:
Flux<String> fruits = Flux.just("яблоко", "банан");
Flux<Character> letters = fruits.flatMap(fruit -> Flux.fromArray(fruit.toCharArray())); // Из строки — поток символов
letters.subscribe(System.out::println); // Вывод: я, б, л, о, к, о, б, а, н, а, н (в возможном перемешанном порядке, если асинхронно)

Здесь flatMap берёт строку, создаёт Flux из символов и сливает всё в один поток. В отличие от map (который вернул бы Flux<Flux<Character>> — вложенный), flatMap "сплющивает".



Асинхронный пример: симулируем API-запросы.

import java.time.Duration;
Flux<String> users = Flux.just("user1", "user2");
Flux<String> data = users.flatMap(user -> Mono.just("Данные для " + user).delayElement(Duration.ofSeconds(1))); // Асинхронный подпоток с задержкой
data.subscribe(System.out::println); // Вывод через секунды: "Данные для user1", "Данные для user2" (параллельно, если scheduler позволяет)

FlatMap уважает backpressure: запрашивает у подпотоков по мере нужды. Но осторожно: если подпотоки бесконечные — рискуете перегрузкой. Параметр concurrency (flatMap(func, concurrency)) ограничивает параллелизм.


Почему flatMap решает проблемы? В традиционных подходах (циклы с Future) вы ждёте каждый запрос, блокируя. Здесь — асинхронное слияние, без ожиданий и callback-ада: цепочка читаема.



Практические советы и подводные камни

Читаемость: цепочки операторов пишите по строкам для ясности: flux.filter(...).map(...).flatMap(...);
Ошибки: если в map/flatMap исключение — onError. Используйте handle() для условной обработки.
Производительность: в flatMap устанавливайте concurrency (default 256) для контроля параллелизма: flatMap(func, 4) — max 4 подпотока одновременно.
Камень: блокирующий код в лямбдах — сломает асинхронность. Для IO — используйте flatMap с Mono.fromCallable и publishOn(Schedulers.boundedElastic()).
Тестирование: StepVerifier.create(
flux.map(...)).expectNext("ЯБЛОКО").verifyComplete();


#Java #middle #Reactor #map #filter #flatMap
👍2
Реактивное программирование

Комбинации потоков в Reactor: concat, merge и другие

Комбинации потоков — это как сборка пазла: вы берёте отдельные потоки событий и сливаете в один, управляя порядком, параллелизмом и обработкой. Операторы уважают жизненный цикл и обратное давление: если подписчик не успевает, запросы распределяются по источникам. Это делает системы масштабируемыми — под нагрузкой не тонут в очередях.


Concat: последовательное объединение потоков


Concat — оператор для слияния потоков по очереди: сначала все элементы первого, потом второго и так далее. Он ждёт завершения предыдущего (onComplete), прежде чем перейти к следующему. Идеален, когда порядок важен и параллелизм не нужен: например, загрузка данных по шагам.


Пример на Flux:
import reactor.core.publisher.Flux;
import java.time.Duration;
Flux<String> first = Flux.just("Шаг 1a", "Шаг 1b").delayElements(Duration.ofSeconds(1)); // Задержка для симуляции
Flux<String> second = Flux.just("Шаг 2a", "Шаг 2b");
Flux<String> combined = Flux.concat(first, second);

combined.subscribe(System.out::println); // Вывод: "Шаг 1a" (через 1с), "Шаг 1b" (ещё 1с), "Шаг 2a", "Шаг 2b"

Здесь concat гарантирует последовательность: второй Flux стартует только после onComplete первого. Если ошибка в первом — весь поток прервётся onError.


На Mono:
Mono<String> m1 = Mono.just("A").delayElement(Duration.ofSeconds(1));
Mono<String> m2 = Mono.just("B");
Flux<String> seq = Flux.concat(m1, m2); // Mono как Flux с одним элементом


Почему concat полезен? В традиционных подходах (thenCompose в CompletableFuture) вы пишете цепочки вручную, рискуя callback-адом. Здесь — декларативно, с автоматическим backpressure: запросы идут к текущему потоку. Минус: медленный, если источники асинхронные — ждёт завершения.
Вариант: concatWith(other) на одном Flux для добавления.



Merge: параллельное слияние по готовности


Merge — для объединения потоков параллельно: элементы выдаются по мере готовности, без ожидания завершения. Порядок не гарантирован — зависит от скорости источников. Идеален для независимых событий: например, слияние логов из сервисов.

Пример:

Flux<String> slow = Flux.just("Медленный 1", "Медленный 2").delayElements(Duration.ofSeconds(2));
Flux<String> fast = Flux.just("Быстрый A", "Быстрый B").delayElements(Duration.ofMillis(500));
Flux<String> merged = Flux.merge(slow, fast);

merged.subscribe(System.out::println); // Возможный вывод: "Быстрый A" (0.5с), "Быстрый B" (ещё 0.5с), "Медленный 1" (2с), "Медленный 2" (ещё 2с)

Здесь merge отдает элементы, как только они готовы — параллельно. Если ошибка в одном — весь поток onError (по умолчанию), но можно настроить.


На Mono: merge работает с Mono как с Flux'ом одного элемента.

Почему merge лучше потоков? В старых моделях (join в Executor) вы ждёте всех, блокируя. Здесь — асинхронно, с backpressure: merge распределяет запросы по источникам пропорционально. Параметр: merge(func, concurrency) для ограничения параллелизма.
Вариант: mergeWith(other) на одном Flux.



#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
👍1
Zip: попарная комбинация элементов

Zip — объединяет элементы из потоков попарно: берёт первый от первого, первый от второго и т.д., применяя функцию для слияния. Завершается, когда любой поток исчерпан. Идеален для синхронизации: например, zip координат X и Y в точки.

Пример:
Flux<Integer> xCoords = Flux.just(1, 2, 3);
Flux<Integer> yCoords = Flux.just(10, 20, 30, 40); // Лишний элемент игнорируется
Flux<String> points = Flux.zip(xCoords, yCoords, (x, y) -> "(" + x + ", " + y + ")");

points.subscribe(System.out::println); // Вывод: "(1, 10)", "(2, 20)", "(3, 30)"

Здесь zip ждёт пару: если один медленный — задерживает. Для >2 потоков: zip(tuple -> ..., flux1, flux2, flux3).


Почему zip решает проблемы? Вместо ручных семафоров или ожиданий в циклах, декларативно комбинируете асинхронные источники. Backpressure: запрашивает у всех равномерно.

CombineLatest: комбинация последних элементов


CombineLatest — выдаёт комбинацию последних элементов от каждого потока, как только любой обновляется. Не ждёт пар — всегда использует свежие. Идеален для реального времени: например, комбинация курсов валют.


Пример:
Flux<String> stockA = Flux.just("A:100", "A:110").delayElements(Duration.ofSeconds(1));
Flux<String> stockB = Flux.just("B:200").delayElements(Duration.ofSeconds(2));
Flux<String> latest = Flux.combineLatest(stockA, stockB, (a, b) -> a + " + " + b);

latest.subscribe(System.out::println); // Вывод примерно: "A:100 + B:200" (после 2с), "A:110 + B:200" (ещё 1с после)

Здесь combineLatest реагирует на изменения: при обновлении A использует последний B. Для >2: combineLatest(tuple -> ..., fluxes).


В отличие от zip (строгие пары), здесь — динамика. Backpressure: как в merge.


Другие комбинации: withLatestFrom и concatMap

WithLatestFrom: похож на combineLatest, но "master"-поток (основной) триггерит выдачу, беря последние из второстепенных. Пример: flux.withLatestFrom(other, (main, other) -> main + other).
ConcatMap: как flatMap, но последовательный (как concat внутри). Для orderly асинхронных подпотоков.

Эти дополняют: выбирайте по сценарию — последовательность (concat/concatMap), параллелизм (merge/flatMap) или синхронизация (zip/combineLatest).



Практические советы и подводные камни

Ошибки: по умолчанию onError останавливает всё — используйте onErrorResume для продолжения.
Параллелизм: в merge/flatMap устанавливайте prefetch (буфер) или concurrency для тюнинга.
Камень: бесконечные потоки в merge — рискуете OOM; добавьте take() или limitRate().
Тестирование: StepVerifier.create(Flux.merge(f1, f2)).expectNextMatches(...).verify();



#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
👍1