Реактивное программирование
Комбинации потоков в Reactor: concat, merge и другие
Комбинации потоков — это как сборка пазла: вы берёте отдельные потоки событий и сливаете в один, управляя порядком, параллелизмом и обработкой. Операторы уважают жизненный цикл и обратное давление: если подписчик не успевает, запросы распределяются по источникам. Это делает системы масштабируемыми — под нагрузкой не тонут в очередях.
Concat: последовательное объединение потоков
Concat — оператор для слияния потоков по очереди: сначала все элементы первого, потом второго и так далее. Он ждёт завершения предыдущего (onComplete), прежде чем перейти к следующему. Идеален, когда порядок важен и параллелизм не нужен: например, загрузка данных по шагам.
Пример на Flux:
На Mono:
Почему concat полезен? В традиционных подходах (thenCompose в CompletableFuture) вы пишете цепочки вручную, рискуя callback-адом. Здесь — декларативно, с автоматическим backpressure: запросы идут к текущему потоку. Минус: медленный, если источники асинхронные — ждёт завершения.
Вариант: concatWith(other) на одном Flux для добавления.
Merge: параллельное слияние по готовности
Merge — для объединения потоков параллельно: элементы выдаются по мере готовности, без ожидания завершения. Порядок не гарантирован — зависит от скорости источников. Идеален для независимых событий: например, слияние логов из сервисов.
Пример:
На Mono: merge работает с Mono как с Flux'ом одного элемента.
Почему merge лучше потоков? В старых моделях (join в Executor) вы ждёте всех, блокируя. Здесь — асинхронно, с backpressure: merge распределяет запросы по источникам пропорционально. Параметр: merge(func, concurrency) для ограничения параллелизма.
Вариант: mergeWith(other) на одном Flux.
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
Комбинации потоков в Reactor: concat, merge и другие
Комбинации потоков — это как сборка пазла: вы берёте отдельные потоки событий и сливаете в один, управляя порядком, параллелизмом и обработкой. Операторы уважают жизненный цикл и обратное давление: если подписчик не успевает, запросы распределяются по источникам. Это делает системы масштабируемыми — под нагрузкой не тонут в очередях.
Concat: последовательное объединение потоков
Concat — оператор для слияния потоков по очереди: сначала все элементы первого, потом второго и так далее. Он ждёт завершения предыдущего (onComplete), прежде чем перейти к следующему. Идеален, когда порядок важен и параллелизм не нужен: например, загрузка данных по шагам.
Пример на Flux:
import reactor.core.publisher.Flux;
import java.time.Duration;
Flux<String> first = Flux.just("Шаг 1a", "Шаг 1b").delayElements(Duration.ofSeconds(1)); // Задержка для симуляции
Flux<String> second = Flux.just("Шаг 2a", "Шаг 2b");
Flux<String> combined = Flux.concat(first, second);
combined.subscribe(System.out::println); // Вывод: "Шаг 1a" (через 1с), "Шаг 1b" (ещё 1с), "Шаг 2a", "Шаг 2b"
Здесь concat гарантирует последовательность: второй Flux стартует только после onComplete первого. Если ошибка в первом — весь поток прервётся onError.
На Mono:
Mono<String> m1 = Mono.just("A").delayElement(Duration.ofSeconds(1));
Mono<String> m2 = Mono.just("B");
Flux<String> seq = Flux.concat(m1, m2); // Mono как Flux с одним элементом
Почему concat полезен? В традиционных подходах (thenCompose в CompletableFuture) вы пишете цепочки вручную, рискуя callback-адом. Здесь — декларативно, с автоматическим backpressure: запросы идут к текущему потоку. Минус: медленный, если источники асинхронные — ждёт завершения.
Вариант: concatWith(other) на одном Flux для добавления.
Merge: параллельное слияние по готовности
Merge — для объединения потоков параллельно: элементы выдаются по мере готовности, без ожидания завершения. Порядок не гарантирован — зависит от скорости источников. Идеален для независимых событий: например, слияние логов из сервисов.
Пример:
Flux<String> slow = Flux.just("Медленный 1", "Медленный 2").delayElements(Duration.ofSeconds(2));
Flux<String> fast = Flux.just("Быстрый A", "Быстрый B").delayElements(Duration.ofMillis(500));
Flux<String> merged = Flux.merge(slow, fast);
merged.subscribe(System.out::println); // Возможный вывод: "Быстрый A" (0.5с), "Быстрый B" (ещё 0.5с), "Медленный 1" (2с), "Медленный 2" (ещё 2с)
Здесь merge отдает элементы, как только они готовы — параллельно. Если ошибка в одном — весь поток onError (по умолчанию), но можно настроить.
На Mono: merge работает с Mono как с Flux'ом одного элемента.
Почему merge лучше потоков? В старых моделях (join в Executor) вы ждёте всех, блокируя. Здесь — асинхронно, с backpressure: merge распределяет запросы по источникам пропорционально. Параметр: merge(func, concurrency) для ограничения параллелизма.
Вариант: mergeWith(other) на одном Flux.
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
👍1
Zip: попарная комбинация элементов
Zip — объединяет элементы из потоков попарно: берёт первый от первого, первый от второго и т.д., применяя функцию для слияния. Завершается, когда любой поток исчерпан. Идеален для синхронизации: например, zip координат X и Y в точки.
Пример:
Почему zip решает проблемы? Вместо ручных семафоров или ожиданий в циклах, декларативно комбинируете асинхронные источники. Backpressure: запрашивает у всех равномерно.
CombineLatest: комбинация последних элементов
CombineLatest — выдаёт комбинацию последних элементов от каждого потока, как только любой обновляется. Не ждёт пар — всегда использует свежие. Идеален для реального времени: например, комбинация курсов валют.
Пример:
В отличие от zip (строгие пары), здесь — динамика. Backpressure: как в merge.
Другие комбинации: withLatestFrom и concatMap
WithLatestFrom: похож на combineLatest, но "master"-поток (основной) триггерит выдачу, беря последние из второстепенных. Пример: flux.withLatestFrom(other, (main, other) -> main + other).
ConcatMap: как flatMap, но последовательный (как concat внутри). Для orderly асинхронных подпотоков.
Эти дополняют: выбирайте по сценарию — последовательность (concat/concatMap), параллелизм (merge/flatMap) или синхронизация (zip/combineLatest).
Практические советы и подводные камни
Ошибки: по умолчанию onError останавливает всё — используйте onErrorResume для продолжения.
Параллелизм: в merge/flatMap устанавливайте prefetch (буфер) или concurrency для тюнинга.
Камень: бесконечные потоки в merge — рискуете OOM; добавьте take() или limitRate().
Тестирование: StepVerifier.create(Flux.merge(f1, f2)).expectNextMatches(...).verify();
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
Zip — объединяет элементы из потоков попарно: берёт первый от первого, первый от второго и т.д., применяя функцию для слияния. Завершается, когда любой поток исчерпан. Идеален для синхронизации: например, zip координат X и Y в точки.
Пример:
Flux<Integer> xCoords = Flux.just(1, 2, 3);
Flux<Integer> yCoords = Flux.just(10, 20, 30, 40); // Лишний элемент игнорируется
Flux<String> points = Flux.zip(xCoords, yCoords, (x, y) -> "(" + x + ", " + y + ")");
points.subscribe(System.out::println); // Вывод: "(1, 10)", "(2, 20)", "(3, 30)"
Здесь zip ждёт пару: если один медленный — задерживает. Для >2 потоков: zip(tuple -> ..., flux1, flux2, flux3).
Почему zip решает проблемы? Вместо ручных семафоров или ожиданий в циклах, декларативно комбинируете асинхронные источники. Backpressure: запрашивает у всех равномерно.
CombineLatest: комбинация последних элементов
CombineLatest — выдаёт комбинацию последних элементов от каждого потока, как только любой обновляется. Не ждёт пар — всегда использует свежие. Идеален для реального времени: например, комбинация курсов валют.
Пример:
Flux<String> stockA = Flux.just("A:100", "A:110").delayElements(Duration.ofSeconds(1));
Flux<String> stockB = Flux.just("B:200").delayElements(Duration.ofSeconds(2));
Flux<String> latest = Flux.combineLatest(stockA, stockB, (a, b) -> a + " + " + b);
latest.subscribe(System.out::println); // Вывод примерно: "A:100 + B:200" (после 2с), "A:110 + B:200" (ещё 1с после)
Здесь combineLatest реагирует на изменения: при обновлении A использует последний B. Для >2: combineLatest(tuple -> ..., fluxes).
В отличие от zip (строгие пары), здесь — динамика. Backpressure: как в merge.
Другие комбинации: withLatestFrom и concatMap
WithLatestFrom: похож на combineLatest, но "master"-поток (основной) триггерит выдачу, беря последние из второстепенных. Пример: flux.withLatestFrom(other, (main, other) -> main + other).
ConcatMap: как flatMap, но последовательный (как concat внутри). Для orderly асинхронных подпотоков.
Эти дополняют: выбирайте по сценарию — последовательность (concat/concatMap), параллелизм (merge/flatMap) или синхронизация (zip/combineLatest).
Практические советы и подводные камни
Ошибки: по умолчанию onError останавливает всё — используйте onErrorResume для продолжения.
Параллелизм: в merge/flatMap устанавливайте prefetch (буфер) или concurrency для тюнинга.
Камень: бесконечные потоки в merge — рискуете OOM; добавьте take() или limitRate().
Тестирование: StepVerifier.create(Flux.merge(f1, f2)).expectNextMatches(...).verify();
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
👍2