Пример: Аспект для логирования
Создадим сервис — класс с бизнес-логикой:
Теперь аспект для логирования:
Более сложный пример: Аспект вокруг метода
Для обработки ошибок или измерения времени используй @Around — он оборачивает метод.
Продвинутые советы для опытных разработчиков
Производительность: Прокси в Spring добавляют overhead (небольшую задержку). Для критичных мест используй compile-time weaving из AspectJ.
Порядок аспектов: Если несколько аспектов на одном методе, используй @Order(1) для приоритета (меньше число — выше приоритет).
Обработка исключений: В @Around лови Throwable, логируй и перебрасывай, чтобы не глотать ошибки.
Тестирование: Используй @EnableAspectJAutoProxy в тестах, моки (заменители) для аспектов с Mockito.
Интеграция с другими модулями: Spring Security или Spring Cache часто используют АОП внутри — изучи их исходники для идей.
Ограничения: Spring AOP работает только на методах бинов (объектов, управляемых Spring). Для статических методов или конструкторов нужен AspectJ.
#Java #middle #on_request #AOP
Создадим сервис — класс с бизнес-логикой:
javaimport org.springframework.stereotype.Service;
@Service // Обозначает, что это сервис, Spring создаст экземпляр
public class MyService {
public String doSomething(String input) {
return "Результат: " + input.toUpperCase(); // Простая логика
}
}
Теперь аспект для логирования:
javaimport org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.stereotype.Component;
@Aspect // Это аспект
@Component // Spring зарегистрирует его
public class LoggingAspect {
@Before("execution(* com.example.service.*.*(..))") // Pointcut: все методы в пакете service
public void logBefore(JoinPoint joinPoint) { // JoinPoint — информация о точке
System.out.println("Вызов метода: " + joinPoint.getSignature().getName());
System.out.println("Аргументы: " + Arrays.toString(joinPoint.getArgs()));
}
}
Здесь @Before значит "выполни перед методом". execution — выражение для pointcut: * значит любой возврат, com.example.service..(..) — любой класс в пакете service, любой метод с любыми аргументами.
Если вызвать myService.doSomething("hello"), в консоли увидишь лог перед результатом.
Более сложный пример: Аспект вокруг метода
Для обработки ошибок или измерения времени используй @Around — он оборачивает метод.
javaimport org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class TimingAspect {
@Around("execution(* com.example.service.*.*(..))")
public Object measureTime(ProceedingJoinPoint joinPoint) throws Throwable {
long start = System.currentTimeMillis();
Object result = joinPoint.proceed(); // Выполняет оригинальный метод
long end = System.currentTimeMillis();
System.out.println("Время выполнения: " + (end - start) + " мс");
return result; // Возвращает результат метода
}
}
ProceedingJoinPoint позволяет контролировать вызов: можно пропустить метод, изменить аргументы или результат. Идеально для транзакций или кэша.
Продвинутые советы для опытных разработчиков
Производительность: Прокси в Spring добавляют overhead (небольшую задержку). Для критичных мест используй compile-time weaving из AspectJ.
Порядок аспектов: Если несколько аспектов на одном методе, используй @Order(1) для приоритета (меньше число — выше приоритет).
Обработка исключений: В @Around лови Throwable, логируй и перебрасывай, чтобы не глотать ошибки.
Тестирование: Используй @EnableAspectJAutoProxy в тестах, моки (заменители) для аспектов с Mockito.
Интеграция с другими модулями: Spring Security или Spring Cache часто используют АОП внутри — изучи их исходники для идей.
Ограничения: Spring AOP работает только на методах бинов (объектов, управляемых Spring). Для статических методов или конструкторов нужен AspectJ.
#Java #middle #on_request #AOP
👍5
Реактивное программирование
Базовые операторы в Reactor: map, filter, flatMap
Операторы — это методы на Mono/Flux, которые позволяют строить конвейеры: преобразовывать, фильтровать и комбинировать данные асинхронно. Представьте их как звенья в цепи: каждый берёт входной поток, меняет его и передаёт дальше. Сегодня разберём три фундаментальных: map (преобразование элементов), filter (фильтрация) и flatMap (плоское преобразование, для слияния подпотоков). Эти операторы — основа для сложных сценариев, они решают проблемы из первого поста, позволяя писать декларативный код вместо ручных циклов и ожиданий.
Операторы в Reactor — декларативные: вы описываете, что делать с данными, а библиотека заботится об асинхронности, backpressure и ошибках. Они не меняют исходный поток (иммутабельны), а создают новый. Это делает код читаемым и тестируемым.
Map: простое преобразование элементов
Map — оператор для изменения каждого элемента потока. Он берёт входной элемент, применяет функцию и выдаёт результат. Синхронный: функция должна быть быстрой и без блокировок. Идеален для конвертации типов, вычислений или форматирования.
Пример на Flux:
На Mono:
Почему map полезен? В традиционных подходах (как в CompletableFuture.thenApply) вы строите цепочки, но рискуете вложенностью. В Reactor map делает конвейер линейным: читается как последовательный код, но работает асинхронно. Поддерживает backpressure: если подписчик запрашивает n, map передаёт запрос upstream (источнику).
Filter: отбор элементов по условию
Filter — для пропуска только нужных элементов. Принимает предикат (функцию, возвращающую true/false) и пропускает те, для которых true. Остальные игнорируются — поток "сужается".
Пример на Flux:
На Mono:
Filter экономит ресурсы: ненужные элементы не обрабатываются дальше в цепи. В отличие от императивных циклов (где вы фильтруете в for с if), здесь всё асинхронно и с backpressure — запросы передаются источнику только для прошедших элементов.
Комбинация с map: numbers.filter(num -> num > 5).map(num -> num * 10).subscribe(); // 60, 70, 80, 90, 100
Это строит конвейер: фильтр → преобразование, без ручных переменных.
#Java #middle #Reactor #map #filter #flatMap
Базовые операторы в Reactor: map, filter, flatMap
Операторы — это методы на Mono/Flux, которые позволяют строить конвейеры: преобразовывать, фильтровать и комбинировать данные асинхронно. Представьте их как звенья в цепи: каждый берёт входной поток, меняет его и передаёт дальше. Сегодня разберём три фундаментальных: map (преобразование элементов), filter (фильтрация) и flatMap (плоское преобразование, для слияния подпотоков). Эти операторы — основа для сложных сценариев, они решают проблемы из первого поста, позволяя писать декларативный код вместо ручных циклов и ожиданий.
Операторы в Reactor — декларативные: вы описываете, что делать с данными, а библиотека заботится об асинхронности, backpressure и ошибках. Они не меняют исходный поток (иммутабельны), а создают новый. Это делает код читаемым и тестируемым.
Map: простое преобразование элементов
Map — оператор для изменения каждого элемента потока. Он берёт входной элемент, применяет функцию и выдаёт результат. Синхронный: функция должна быть быстрой и без блокировок. Идеален для конвертации типов, вычислений или форматирования.
Пример на Flux:
import reactor.core.publisher.Flux;
Flux<String> originalFlux = Flux.just("яблоко", "банан", "вишня");
Flux<String> transformed = originalFlux.map(fruit -> fruit.toUpperCase()); // Преобразование в верхний регистр
transformed.subscribe(System.out::println); // Вывод: "ЯБЛОКО", "БАНАН", "ВИШНЯ"
Здесь map применяет лямбду к каждому элементу последовательно. Если ошибка в функции — сработает onError.
На Mono:
Mono<Integer> num = Mono.just(5).map(x -> x * 2); // Результат: 10
Почему map полезен? В традиционных подходах (как в CompletableFuture.thenApply) вы строите цепочки, но рискуете вложенностью. В Reactor map делает конвейер линейным: читается как последовательный код, но работает асинхронно. Поддерживает backpressure: если подписчик запрашивает n, map передаёт запрос upstream (источнику).
Filter: отбор элементов по условию
Filter — для пропуска только нужных элементов. Принимает предикат (функцию, возвращающую true/false) и пропускает те, для которых true. Остальные игнорируются — поток "сужается".
Пример на Flux:
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evenNumbers = numbers.filter(num -> num % 2 == 0); // Только чётные
evenNumbers.subscribe(System.out::println); // Вывод: 2, 4, 6, 8, 10
Если поток пустой или ничего не проходит — onComplete сработает без onNext.
На Mono:
Mono<String> word = Mono.just("привет").filter(w -> w.length() > 7); // Не пройдёт — пустой MonoFilter экономит ресурсы: ненужные элементы не обрабатываются дальше в цепи. В отличие от императивных циклов (где вы фильтруете в for с if), здесь всё асинхронно и с backpressure — запросы передаются источнику только для прошедших элементов.
Комбинация с map: numbers.filter(num -> num > 5).map(num -> num * 10).subscribe(); // 60, 70, 80, 90, 100
Это строит конвейер: фильтр → преобразование, без ручных переменных.
#Java #middle #Reactor #map #filter #flatMap
👍3
FlatMap: плоское преобразование для асинхронных подпотоков
FlatMap — мощный оператор для случаев, когда из одного элемента нужно создать подпоток (Publisher), и слить их в плоский результат. Это как map, но для асинхронных или множественных выходов: он "разворачивает" вложенные потоки. Полезен для запросов в цикле: например, для каждого пользователя — асинхронно запросить данные.
Пример на Flux:
Асинхронный пример: симулируем API-запросы.
Почему flatMap решает проблемы? В традиционных подходах (циклы с Future) вы ждёте каждый запрос, блокируя. Здесь — асинхронное слияние, без ожиданий и callback-ада: цепочка читаема.
Практические советы и подводные камни
Читаемость: цепочки операторов пишите по строкам для ясности: flux.filter(...).map(...).flatMap(...);
Ошибки: если в map/flatMap исключение — onError. Используйте handle() для условной обработки.
Производительность: в flatMap устанавливайте concurrency (default 256) для контроля параллелизма: flatMap(func, 4) — max 4 подпотока одновременно.
Камень: блокирующий код в лямбдах — сломает асинхронность. Для IO — используйте flatMap с Mono.fromCallable и publishOn(Schedulers.boundedElastic()).
Тестирование: StepVerifier.create(flux.map(...)).expectNext("ЯБЛОКО").verifyComplete();
#Java #middle #Reactor #map #filter #flatMap
FlatMap — мощный оператор для случаев, когда из одного элемента нужно создать подпоток (Publisher), и слить их в плоский результат. Это как map, но для асинхронных или множественных выходов: он "разворачивает" вложенные потоки. Полезен для запросов в цикле: например, для каждого пользователя — асинхронно запросить данные.
Пример на Flux:
Flux<String> fruits = Flux.just("яблоко", "банан");
Flux<Character> letters = fruits.flatMap(fruit -> Flux.fromArray(fruit.toCharArray())); // Из строки — поток символов
letters.subscribe(System.out::println); // Вывод: я, б, л, о, к, о, б, а, н, а, н (в возможном перемешанном порядке, если асинхронно)
Здесь flatMap берёт строку, создаёт Flux из символов и сливает всё в один поток. В отличие от map (который вернул бы Flux<Flux<Character>> — вложенный), flatMap "сплющивает".Асинхронный пример: симулируем API-запросы.
import java.time.Duration;
Flux<String> users = Flux.just("user1", "user2");
Flux<String> data = users.flatMap(user -> Mono.just("Данные для " + user).delayElement(Duration.ofSeconds(1))); // Асинхронный подпоток с задержкой
data.subscribe(System.out::println); // Вывод через секунды: "Данные для user1", "Данные для user2" (параллельно, если scheduler позволяет)
FlatMap уважает backpressure: запрашивает у подпотоков по мере нужды. Но осторожно: если подпотоки бесконечные — рискуете перегрузкой. Параметр concurrency (flatMap(func, concurrency)) ограничивает параллелизм.
Почему flatMap решает проблемы? В традиционных подходах (циклы с Future) вы ждёте каждый запрос, блокируя. Здесь — асинхронное слияние, без ожиданий и callback-ада: цепочка читаема.
Практические советы и подводные камни
Читаемость: цепочки операторов пишите по строкам для ясности: flux.filter(...).map(...).flatMap(...);
Ошибки: если в map/flatMap исключение — onError. Используйте handle() для условной обработки.
Производительность: в flatMap устанавливайте concurrency (default 256) для контроля параллелизма: flatMap(func, 4) — max 4 подпотока одновременно.
Камень: блокирующий код в лямбдах — сломает асинхронность. Для IO — используйте flatMap с Mono.fromCallable и publishOn(Schedulers.boundedElastic()).
Тестирование: StepVerifier.create(flux.map(...)).expectNext("ЯБЛОКО").verifyComplete();
#Java #middle #Reactor #map #filter #flatMap
👍3
Реактивное программирование
Комбинации потоков в Reactor: concat, merge и другие
Комбинации потоков — это как сборка пазла: вы берёте отдельные потоки событий и сливаете в один, управляя порядком, параллелизмом и обработкой. Операторы уважают жизненный цикл и обратное давление: если подписчик не успевает, запросы распределяются по источникам. Это делает системы масштабируемыми — под нагрузкой не тонут в очередях.
Concat: последовательное объединение потоков
Concat — оператор для слияния потоков по очереди: сначала все элементы первого, потом второго и так далее. Он ждёт завершения предыдущего (onComplete), прежде чем перейти к следующему. Идеален, когда порядок важен и параллелизм не нужен: например, загрузка данных по шагам.
Пример на Flux:
На Mono:
Почему concat полезен? В традиционных подходах (thenCompose в CompletableFuture) вы пишете цепочки вручную, рискуя callback-адом. Здесь — декларативно, с автоматическим backpressure: запросы идут к текущему потоку. Минус: медленный, если источники асинхронные — ждёт завершения.
Вариант: concatWith(other) на одном Flux для добавления.
Merge: параллельное слияние по готовности
Merge — для объединения потоков параллельно: элементы выдаются по мере готовности, без ожидания завершения. Порядок не гарантирован — зависит от скорости источников. Идеален для независимых событий: например, слияние логов из сервисов.
Пример:
На Mono: merge работает с Mono как с Flux'ом одного элемента.
Почему merge лучше потоков? В старых моделях (join в Executor) вы ждёте всех, блокируя. Здесь — асинхронно, с backpressure: merge распределяет запросы по источникам пропорционально. Параметр: merge(func, concurrency) для ограничения параллелизма.
Вариант: mergeWith(other) на одном Flux.
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
Комбинации потоков в Reactor: concat, merge и другие
Комбинации потоков — это как сборка пазла: вы берёте отдельные потоки событий и сливаете в один, управляя порядком, параллелизмом и обработкой. Операторы уважают жизненный цикл и обратное давление: если подписчик не успевает, запросы распределяются по источникам. Это делает системы масштабируемыми — под нагрузкой не тонут в очередях.
Concat: последовательное объединение потоков
Concat — оператор для слияния потоков по очереди: сначала все элементы первого, потом второго и так далее. Он ждёт завершения предыдущего (onComplete), прежде чем перейти к следующему. Идеален, когда порядок важен и параллелизм не нужен: например, загрузка данных по шагам.
Пример на Flux:
import reactor.core.publisher.Flux;
import java.time.Duration;
Flux<String> first = Flux.just("Шаг 1a", "Шаг 1b").delayElements(Duration.ofSeconds(1)); // Задержка для симуляции
Flux<String> second = Flux.just("Шаг 2a", "Шаг 2b");
Flux<String> combined = Flux.concat(first, second);
combined.subscribe(System.out::println); // Вывод: "Шаг 1a" (через 1с), "Шаг 1b" (ещё 1с), "Шаг 2a", "Шаг 2b"
Здесь concat гарантирует последовательность: второй Flux стартует только после onComplete первого. Если ошибка в первом — весь поток прервётся onError.
На Mono:
Mono<String> m1 = Mono.just("A").delayElement(Duration.ofSeconds(1));
Mono<String> m2 = Mono.just("B");
Flux<String> seq = Flux.concat(m1, m2); // Mono как Flux с одним элементомПочему concat полезен? В традиционных подходах (thenCompose в CompletableFuture) вы пишете цепочки вручную, рискуя callback-адом. Здесь — декларативно, с автоматическим backpressure: запросы идут к текущему потоку. Минус: медленный, если источники асинхронные — ждёт завершения.
Вариант: concatWith(other) на одном Flux для добавления.
Merge: параллельное слияние по готовности
Merge — для объединения потоков параллельно: элементы выдаются по мере готовности, без ожидания завершения. Порядок не гарантирован — зависит от скорости источников. Идеален для независимых событий: например, слияние логов из сервисов.
Пример:
Flux<String> slow = Flux.just("Медленный 1", "Медленный 2").delayElements(Duration.ofSeconds(2));
Flux<String> fast = Flux.just("Быстрый A", "Быстрый B").delayElements(Duration.ofMillis(500));
Flux<String> merged = Flux.merge(slow, fast);
merged.subscribe(System.out::println); // Возможный вывод: "Быстрый A" (0.5с), "Быстрый B" (ещё 0.5с), "Медленный 1" (2с), "Медленный 2" (ещё 2с)
Здесь merge отдает элементы, как только они готовы — параллельно. Если ошибка в одном — весь поток onError (по умолчанию), но можно настроить.На Mono: merge работает с Mono как с Flux'ом одного элемента.
Почему merge лучше потоков? В старых моделях (join в Executor) вы ждёте всех, блокируя. Здесь — асинхронно, с backpressure: merge распределяет запросы по источникам пропорционально. Параметр: merge(func, concurrency) для ограничения параллелизма.
Вариант: mergeWith(other) на одном Flux.
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
👍2
Zip: попарная комбинация элементов
Zip — объединяет элементы из потоков попарно: берёт первый от первого, первый от второго и т.д., применяя функцию для слияния. Завершается, когда любой поток исчерпан. Идеален для синхронизации: например, zip координат X и Y в точки.
Пример:
Почему zip решает проблемы? Вместо ручных семафоров или ожиданий в циклах, декларативно комбинируете асинхронные источники. Backpressure: запрашивает у всех равномерно.
CombineLatest: комбинация последних элементов
CombineLatest — выдаёт комбинацию последних элементов от каждого потока, как только любой обновляется. Не ждёт пар — всегда использует свежие. Идеален для реального времени: например, комбинация курсов валют.
Пример:
В отличие от zip (строгие пары), здесь — динамика. Backpressure: как в merge.
Другие комбинации: withLatestFrom и concatMap
WithLatestFrom: похож на combineLatest, но "master"-поток (основной) триггерит выдачу, беря последние из второстепенных. Пример: flux.withLatestFrom(other, (main, other) -> main + other).
ConcatMap: как flatMap, но последовательный (как concat внутри). Для orderly асинхронных подпотоков.
Эти дополняют: выбирайте по сценарию — последовательность (concat/concatMap), параллелизм (merge/flatMap) или синхронизация (zip/combineLatest).
Практические советы и подводные камни
Ошибки: по умолчанию onError останавливает всё — используйте onErrorResume для продолжения.
Параллелизм: в merge/flatMap устанавливайте prefetch (буфер) или concurrency для тюнинга.
Камень: бесконечные потоки в merge — рискуете OOM; добавьте take() или limitRate().
Тестирование: StepVerifier.create(Flux.merge(f1, f2)).expectNextMatches(...).verify();
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
Zip — объединяет элементы из потоков попарно: берёт первый от первого, первый от второго и т.д., применяя функцию для слияния. Завершается, когда любой поток исчерпан. Идеален для синхронизации: например, zip координат X и Y в точки.
Пример:
Flux<Integer> xCoords = Flux.just(1, 2, 3);
Flux<Integer> yCoords = Flux.just(10, 20, 30, 40); // Лишний элемент игнорируется
Flux<String> points = Flux.zip(xCoords, yCoords, (x, y) -> "(" + x + ", " + y + ")");
points.subscribe(System.out::println); // Вывод: "(1, 10)", "(2, 20)", "(3, 30)"
Здесь zip ждёт пару: если один медленный — задерживает. Для >2 потоков: zip(tuple -> ..., flux1, flux2, flux3).
Почему zip решает проблемы? Вместо ручных семафоров или ожиданий в циклах, декларативно комбинируете асинхронные источники. Backpressure: запрашивает у всех равномерно.
CombineLatest: комбинация последних элементов
CombineLatest — выдаёт комбинацию последних элементов от каждого потока, как только любой обновляется. Не ждёт пар — всегда использует свежие. Идеален для реального времени: например, комбинация курсов валют.
Пример:
Flux<String> stockA = Flux.just("A:100", "A:110").delayElements(Duration.ofSeconds(1));
Flux<String> stockB = Flux.just("B:200").delayElements(Duration.ofSeconds(2));
Flux<String> latest = Flux.combineLatest(stockA, stockB, (a, b) -> a + " + " + b);
latest.subscribe(System.out::println); // Вывод примерно: "A:100 + B:200" (после 2с), "A:110 + B:200" (ещё 1с после)
Здесь combineLatest реагирует на изменения: при обновлении A использует последний B. Для >2: combineLatest(tuple -> ..., fluxes).В отличие от zip (строгие пары), здесь — динамика. Backpressure: как в merge.
Другие комбинации: withLatestFrom и concatMap
WithLatestFrom: похож на combineLatest, но "master"-поток (основной) триггерит выдачу, беря последние из второстепенных. Пример: flux.withLatestFrom(other, (main, other) -> main + other).
ConcatMap: как flatMap, но последовательный (как concat внутри). Для orderly асинхронных подпотоков.
Эти дополняют: выбирайте по сценарию — последовательность (concat/concatMap), параллелизм (merge/flatMap) или синхронизация (zip/combineLatest).
Практические советы и подводные камни
Ошибки: по умолчанию onError останавливает всё — используйте onErrorResume для продолжения.
Параллелизм: в merge/flatMap устанавливайте prefetch (буфер) или concurrency для тюнинга.
Камень: бесконечные потоки в merge — рискуете OOM; добавьте take() или limitRate().
Тестирование: StepVerifier.create(Flux.merge(f1, f2)).expectNextMatches(...).verify();
#Java #middle #Reactor #Concat #Merge #Zip #CombineLatest
👍4
Реактивное программирование
Обработка ошибок в реактивных стримах
Представьте ошибки как пороги в потоке событий: без обработки они останавливают течение, но с правильными операторами — поток продолжается, минимизируя простои. Это решает проблемы из первого поста: вместо жёстких сбоев в потоках или Future, где ошибка рушит всё, реактивный подход даёт контроль и устойчивость.
Обработка ошибок в Reactor строится на жизненном цикле: когда исключение возникает в потоке (в map, flatMap или источнике), срабатывает onError, прерывая onNext и onComplete. Но вместо того чтобы "падать", вы можете трансформировать ошибку в данные, повторить попытку или логировать.
Операторы — декларативные: добавляете в цепочку, и Reactor управляет асинхронностью, backpressure и распространением ошибок. Это делает код resilient (устойчивым): приложение не крашится, а адаптируется.
Базовая реакция: doOnError и onErrorMap
Сначала — простые операторы для наблюдения и модификации ошибок, без изменения потока.
- doOnError: дополнительная реакция на ошибку, как "хук" (зацепка). Полезен для логирования или метрик, не влияет на основной onError.
- onErrorMap: преобразует исключение в другое, для кастомизации (например, оборачивает в бизнес-ошибку).
Пример на Flux:
На Mono: аналогично, но для одиночного элемента.
Эти операторы решают проблему традиционных try-catch: вместо разбросанных блоков, всё в конвейере, читаемо и централизовано.
Восстановление: onErrorReturn и onErrorResume
Когда ошибка — не конец света, используйте fallback.
- onErrorReturn: возвращает фиксированное значение вместо ошибки. Простой запасной вариант.
- onErrorResume: более гибкий — заменяет ошибку новым Publisher (Mono/Flux). Можно генерировать динамически, в зависимости от исключения.
Пример с onErrorReturn:
С onErrorResume — условно:
onErrorResume позволяет ветвление: проверь тип ошибки и верни альтернативный поток. Это асинхронно: если fallback — Mono.delay, оно подождёт без блокировки.
Почему лучше CompletableFuture.handle?
Нет вложенных колбэков — цепочка линейна, ошибки интегрированы в конвейер.
#Java #middle #Reactor #doOnError #onErrorMap
Обработка ошибок в реактивных стримах
Представьте ошибки как пороги в потоке событий: без обработки они останавливают течение, но с правильными операторами — поток продолжается, минимизируя простои. Это решает проблемы из первого поста: вместо жёстких сбоев в потоках или Future, где ошибка рушит всё, реактивный подход даёт контроль и устойчивость.
Обработка ошибок в Reactor строится на жизненном цикле: когда исключение возникает в потоке (в map, flatMap или источнике), срабатывает onError, прерывая onNext и onComplete. Но вместо того чтобы "падать", вы можете трансформировать ошибку в данные, повторить попытку или логировать.
Операторы — декларативные: добавляете в цепочку, и Reactor управляет асинхронностью, backpressure и распространением ошибок. Это делает код resilient (устойчивым): приложение не крашится, а адаптируется.
Базовая реакция: doOnError и onErrorMap
Сначала — простые операторы для наблюдения и модификации ошибок, без изменения потока.
- doOnError: дополнительная реакция на ошибку, как "хук" (зацепка). Полезен для логирования или метрик, не влияет на основной onError.
- onErrorMap: преобразует исключение в другое, для кастомизации (например, оборачивает в бизнес-ошибку).
Пример на Flux:
import reactor.core.publisher.Flux;
import java.io.IOException;
Flux<String> riskyFlux = Flux.just("данные1", "данные2").map(data -> {
if (data.equals("данные2")) throw new IOException("Сбой ввода-вывода");
return data.toUpperCase();
});
riskyFlux
.doOnError(e -> System.err.println("Лог: " + e.getMessage())) // Логируем
.onErrorMap(e -> new RuntimeException("Обёрнутая ошибка: " + e)) // Преобразуем
.subscribe(
System.out::println,
error -> System.err.println("Финальная ошибка: " + error) // onError в подписке
);
// Вывод: "ДАННЫЕ1", потом лог "Сбой ввода-вывода", и финальная "Обёрнутая ошибка: ..."
Здесь doOnError срабатывает перед onErrorMap, а подписка ловит модифицированную ошибку. Это асинхронно: если ошибка в асинхронном подпотоке (flatMap), Reactor передаёт её downstream (дальше по цепи) без блокировок.
На Mono: аналогично, но для одиночного элемента.
Эти операторы решают проблему традиционных try-catch: вместо разбросанных блоков, всё в конвейере, читаемо и централизовано.
Восстановление: onErrorReturn и onErrorResume
Когда ошибка — не конец света, используйте fallback.
- onErrorReturn: возвращает фиксированное значение вместо ошибки. Простой запасной вариант.
- onErrorResume: более гибкий — заменяет ошибку новым Publisher (Mono/Flux). Можно генерировать динамически, в зависимости от исключения.
Пример с onErrorReturn:
Mono<String> httpMono = Mono.fromCallable(() -> {
// Симулируем HTTP-запрос
throw new RuntimeException("Сервер не отвечает");
}).onErrorReturn("Кэшированные данные");
httpMono.subscribe(System.out::println); // Вывод: "Кэшированные данные", потом onComplete
Здесь ошибка преобразуется в значение, поток завершается успешно.С onErrorResume — условно:
Flux<Integer> calcFlux = Flux.range(1, 5).map(i -> {
if (i == 3) throw new ArithmeticException("Деление на ноль");
return 10 / (i - 3); // Симуляция
}).onErrorResume(e -> {
if (e instanceof ArithmeticException) {
return Flux.just(0, 0); // Fallback на нули
} else {
return Flux.error(e); // Пропустить другие ошибки
}
});
calcFlux.subscribe(System.out::println); // Вывод: элементы до ошибки, потом 0, 0, onCompleteonErrorResume позволяет ветвление: проверь тип ошибки и верни альтернативный поток. Это асинхронно: если fallback — Mono.delay, оно подождёт без блокировки.
Почему лучше CompletableFuture.handle?
Нет вложенных колбэков — цепочка линейна, ошибки интегрированы в конвейер.
#Java #middle #Reactor #doOnError #onErrorMap
👍3
Retry: повтор попыток при ошибке
Retry — для transient (временных) ошибок: сеть, временный сбой. Повторяет upstream (источник) заданное число раз.
- retry(long times): простая повторка.
- retryWhen(Retry strategy): с кастомной логикой (backoff — задержка, условия).
Пример базовый:
С retryWhen для экспоненциальной задержки (backoff):
Это предотвращает "молоток" (hammering) сервера: ждёт перед retry. Асинхронно: задержки не блокируют поток.
Retry решает боли блокировок: вместо цикла с sleep в традиционном коде, декларативно и эффективно.
Практические советы и подводные камни
- Комбинируйте: riskyFlux.doOnError(log).onErrorResume(fallback).retry(2) — лог + retry + fallback.
- Условия: в onErrorResume используйте instanceof для типов ошибок, чтобы не catch всё подряд.
- Глобально: используйте Hooks.onErrorDropped для непойманных ошибок (редко).
- Камень: retry на бесконечных потоках — вечный цикл; добавьте timeout() или maxAttempts.
- Тестирование: StepVerifier.create(flux).expectErrorMatches(e -> e instanceof IOException).verify();
В практике: в WebFlux — контроллер возвращает Mono с retry для внешних API, onErrorReturn для кэша.
плавному восстановлению, экономя ресурсы и упрощая код.
#Java #middle #Reactor #doOnError #onErrorMap
Retry — для transient (временных) ошибок: сеть, временный сбой. Повторяет upstream (источник) заданное число раз.
- retry(long times): простая повторка.
- retryWhen(Retry strategy): с кастомной логикой (backoff — задержка, условия).
Пример базовый:
Mono<String> flakyMono = Mono.defer(() -> {
if (Math.random() > 0.3) throw new RuntimeException("Временный сбой");
return Mono.just("Успех");
}).retry(3); // Повторить 3 раза
flakyMono.subscribe(System.out::println, Throwable::printStackTrace);
Здесь retry повторяет весь Mono при ошибке, до успеха или исчерпания попыток. Если все попытки fail — финальный onError.С retryWhen для экспоненциальной задержки (backoff):
import reactor.util.retry.Retry;
flakyMono.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))); // 3 попытки с задержкой 1с, 2с, 4с
Это предотвращает "молоток" (hammering) сервера: ждёт перед retry. Асинхронно: задержки не блокируют поток.
Retry решает боли блокировок: вместо цикла с sleep в традиционном коде, декларативно и эффективно.
Практические советы и подводные камни
- Комбинируйте: riskyFlux.doOnError(log).onErrorResume(fallback).retry(2) — лог + retry + fallback.
- Условия: в onErrorResume используйте instanceof для типов ошибок, чтобы не catch всё подряд.
- Глобально: используйте Hooks.onErrorDropped для непойманных ошибок (редко).
- Камень: retry на бесконечных потоках — вечный цикл; добавьте timeout() или maxAttempts.
- Тестирование: StepVerifier.create(flux).expectErrorMatches(e -> e instanceof IOException).verify();
В практике: в WebFlux — контроллер возвращает Mono с retry для внешних API, onErrorReturn для кэша.
плавному восстановлению, экономя ресурсы и упрощая код.
#Java #middle #Reactor #doOnError #onErrorMap
👍2
Реактивное программирование
Управление потоками в Reactor: Schedulers
Schedulers — часть Reactor Core, они интегрируют с Java's ExecutorService, но абстрагируют детали: вы выбираете тип, а Reactor управляет жизненным циклом. По умолчанию Reactor использует Schedulers.parallel() для вычислений, но вы можете переключать с помощью операторов publishOn (для downstream) и subscribeOn (для upstream). Это даёт контроль: CPU-bound задачи на параллельных потоках, IO-bound — на эластичных. Давайте разберём типы schedulers, операторы и примеры — пробуйте в коде с reactor-core.
Типы Schedulers: выбор под задачу
Reactor предоставляет готовые планировщики для разных сценариев. Каждый — фабрика потоков с настройками (размер пула, демонизация).
- parallel(): Для CPU-bound задач (расчёты). Фиксированный пул = кол-во CPU ядер. Не демоны — ждёт завершения. Идеален для map с тяжёлыми вычислениями.
- boundedElastic(): Для IO-bound (сеть, файлы, БД) с блокирующими операциями. Эластичный пул: до 10*CPU, но переиспользует. Демоны — приложение не ждёт. Полезен, если в лямбде "грязный" код (sleep, blocking IO).
- single(): Один поток для всех задач. Для последовательных операций, где параллелизм не нужен (тесты, простые цепочки).
- immediate(): Выполняет на текущем потоке. Для синхронных тестов или когда асинхронность не требуется.
- fromExecutor(Executor): Кастомный из вашего пула.
Пример создания:
Операторы для переключения: publishOn и subscribeOn
- publishOn(Scheduler): Переключает downstream (операции после него) на указанный scheduler. Влияет на onNext, map, filter и т.д. после точки.
- subscribeOn(Scheduler): Переключает upstream (источник и подписку) на scheduler. Влияет на весь поток от subscribe().
Пример комбинации:
Почему важно? Без schedulers все на default (parallel), но IO может заблокировать CPU-потоки. PublishOn изолирует блокирующие части, subscribeOn — для источников (например, blocking чтение файла на отдельном потоке).
Управление контекстом и параллелизмом
Schedulers помогают с контекстом: в Reactor есть Context для хранения данных (как ThreadLocal, но реактивный). Операторы как withScheduler сохраняют его.
Для параллелизма: в flatMap добавьте concurrency: flatMap(func, concurrency, prefetch), где concurrency — max параллельных подпотоков, prefetch — буфер.
Пример с параллелизмом:
#Java #middle #Reactor #WebClient #Schedulers
Управление потоками в Reactor: Schedulers
Schedulers — часть Reactor Core, они интегрируют с Java's ExecutorService, но абстрагируют детали: вы выбираете тип, а Reactor управляет жизненным циклом. По умолчанию Reactor использует Schedulers.parallel() для вычислений, но вы можете переключать с помощью операторов publishOn (для downstream) и subscribeOn (для upstream). Это даёт контроль: CPU-bound задачи на параллельных потоках, IO-bound — на эластичных. Давайте разберём типы schedulers, операторы и примеры — пробуйте в коде с reactor-core.
Типы Schedulers: выбор под задачу
Reactor предоставляет готовые планировщики для разных сценариев. Каждый — фабрика потоков с настройками (размер пула, демонизация).
- parallel(): Для CPU-bound задач (расчёты). Фиксированный пул = кол-во CPU ядер. Не демоны — ждёт завершения. Идеален для map с тяжёлыми вычислениями.
- boundedElastic(): Для IO-bound (сеть, файлы, БД) с блокирующими операциями. Эластичный пул: до 10*CPU, но переиспользует. Демоны — приложение не ждёт. Полезен, если в лямбде "грязный" код (sleep, blocking IO).
- single(): Один поток для всех задач. Для последовательных операций, где параллелизм не нужен (тесты, простые цепочки).
- immediate(): Выполняет на текущем потоке. Для синхронных тестов или когда асинхронность не требуется.
- fromExecutor(Executor): Кастомный из вашего пула.
Пример создания:
import reactor.core.scheduler.Schedulers;
Schedulers.parallel(); // Готов к использованию
Schedulers не создают потоки заранее — ленивые, как Mono/Flux.
Операторы для переключения: publishOn и subscribeOn
- publishOn(Scheduler): Переключает downstream (операции после него) на указанный scheduler. Влияет на onNext, map, filter и т.д. после точки.
- subscribeOn(Scheduler): Переключает upstream (источник и подписку) на scheduler. Влияет на весь поток от subscribe().
Пример комбинации:
import reactor.core.publisher.Flux;
import java.time.Duration;
Flux<Integer> flux = Flux.range(1, 3)
.map(i -> { // На subscribeOn
System.out.println("Map1 на потоке: " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.boundedElastic()) // Переключаем downstream
.delayElements(Duration.ofSeconds(1)) // На boundedElastic
.map(i -> { // На boundedElastic
System.out.println("Map2 на потоке: " + Thread.currentThread().getName());
return i + 1;
});
flux.subscribeOn(Schedulers.single()) // Upstream на single
.subscribe(System.out::println);
Вывод покажет: Map1 на "single-1", delay и Map2 на "boundedElastic-X". Это асинхронно: основной поток не блокируется.
Почему важно? Без schedulers все на default (parallel), но IO может заблокировать CPU-потоки. PublishOn изолирует блокирующие части, subscribeOn — для источников (например, blocking чтение файла на отдельном потоке).
Управление контекстом и параллелизмом
Schedulers помогают с контекстом: в Reactor есть Context для хранения данных (как ThreadLocal, но реактивный). Операторы как withScheduler сохраняют его.
Для параллелизма: в flatMap добавьте concurrency: flatMap(func, concurrency, prefetch), где concurrency — max параллельных подпотоков, prefetch — буфер.
Пример с параллелизмом:
Flux.range(1, 10)
.flatMap(i -> Mono.just(i).delayElement(Duration.ofMillis(100)), 2) // Max 2 параллельно
.subscribeOn(Schedulers.parallel())
.blockLast(); // Для теста, в prod не блокируйте!
Это ограничивает: не 10 задержек сразу, а по 2.
#Java #middle #Reactor #WebClient #Schedulers
👍2
Практические советы и подводные камни
- Диагностика: используйте doOnNext(() -> Thread.currentThread().getName()) для логирования потоков.
- Shutdown: Schedulers.shutdownNow() для cleanup, но в Spring — автоматом.
- проблема: blocking в parallel() — заблокирует все CPU; всегда publishOn(boundedElastic()) перед blocking.
- Оптимизация: для веб — WebFlux использует Schedulers.default() на Netty event-loop (для IO).
- Тестирование: VirtualTimeScheduler для симуляции времени в StepVerifier.
В практике: в микросервисах — subscribeOn(parallel()) для вычислений, publishOn(boundedElastic()) для БД-запросов.
Реактивные API-запросы с WebClient
WebClient — declarative: строите запрос, получаете Mono/Flux с ответом. Под капотом — Netty или HttpClient для IO, с Schedulers.boundedElastic() по умолчанию. Давайте разберём создание, методы (GET/POST), обработку и примеры — добавьте spring-webflux в зависимости и пробуйте.
Создание и базовая настройка WebClient
WebClient.builder() для кастомизации: baseUrl, headers, client (Netty/HttpClient).
Пример:
Реактивные запросы: GET, POST и обработка
- GET: retrieve() возвращает Mono<ResponseEntity> или Flux для стриминга.
- POST: bodyValue() для данных, exchange() для полного контроля.
Пример GET с Mono:
Пример POST с Flux:
Обработка ответов и ошибок
Retrieve() даёт bodyToMono/Flux, onStatus для ошибок (4xx/5xx).
Пример с обработкой:
Практические советы и подводные камни
- Таймауты: client.mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5)))).build();
- HTTPS: автоматом, но настройте trustStore если нужно.
- Тестирование: WebClient с mock (WireMock) и StepVerifier.
В практике: в Spring Boot контроллер вызывает WebClient для агрегации от сервисов — асинхронно, без ожидания.
#Java #middle #Reactor #WebClient #Schedulers
- Диагностика: используйте doOnNext(() -> Thread.currentThread().getName()) для логирования потоков.
- Shutdown: Schedulers.shutdownNow() для cleanup, но в Spring — автоматом.
- проблема: blocking в parallel() — заблокирует все CPU; всегда publishOn(boundedElastic()) перед blocking.
- Оптимизация: для веб — WebFlux использует Schedulers.default() на Netty event-loop (для IO).
- Тестирование: VirtualTimeScheduler для симуляции времени в StepVerifier.
В практике: в микросервисах — subscribeOn(parallel()) для вычислений, publishOn(boundedElastic()) для БД-запросов.
Реактивные API-запросы с WebClient
WebClient — declarative: строите запрос, получаете Mono/Flux с ответом. Под капотом — Netty или HttpClient для IO, с Schedulers.boundedElastic() по умолчанию. Давайте разберём создание, методы (GET/POST), обработку и примеры — добавьте spring-webflux в зависимости и пробуйте.
Создание и базовая настройка WebClient
WebClient.builder() для кастомизации: baseUrl, headers, client (Netty/HttpClient).
Пример:
import org.springframework.web.reactive.function.client.WebClient;
WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader("Authorization", "Bearer token")
.build();
Это готово к запросам. Для кодеков (JSON) — добавьте Jackson или Gson.
Реактивные запросы: GET, POST и обработка
- GET: retrieve() возвращает Mono<ResponseEntity> или Flux для стриминга.
- POST: bodyValue() для данных, exchange() для полного контроля.
Пример GET с Mono:
Mono<String> responseMono = client.get()
.uri("/users/{id}", 1) // URI с параметрами
.retrieve()
.bodyToMono(String.class); // Декодируем в String
responseMono.subscribe(System.out::println); // Асинхронно: ответ придёт позже
Для JSON: bodyToMono(User.class) с Jackson.
Пример POST с Flux:
Flux<User> usersFlux = Flux.just(new User("Alice"), new User("Bob"));
Mono<Void> postMono = client.post()
.uri("/users/batch")
.body(usersFlux, User.class) // Тело как Flux
.retrieve()
.bodyToMono(Void.class);
postMono.subscribe(); // Отправка асинхронно
WebClient уважает backpressure: если сервер шлёт Flux (SSE), клиент запрашивает по мере обработки.
С Schedulers: по умолчанию на event-loop, но publishOn для кастом.Обработка ответов и ошибок
Retrieve() даёт bodyToMono/Flux, onStatus для ошибок (4xx/5xx).
Пример с обработкой:
client.get()
.uri("/data")
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, resp -> Mono.error(new BadRequestException("Ошибка клиента")))
.bodyToFlux(Data.class)
.doOnError(e -> System.err.println("Сбой: " + e)) // Из поста 10
.retry(3) // Retry на сетевые ошибки
.subscribe(data -> process(data));
Это интегрирует с Reactor: ошибки как onError, retry для flaky API.
Для стриминга: bodyToFlux для больших данных, без загрузки всего в память.
Практические советы и подводные камни
- Таймауты: client.mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5)))).build();
- HTTPS: автоматом, но настройте trustStore если нужно.
- Тестирование: WebClient с mock (WireMock) и StepVerifier.
В практике: в Spring Boot контроллер вызывает WebClient для агрегации от сервисов — асинхронно, без ожидания.
#Java #middle #Reactor #WebClient #Schedulers
👍2
Реактивное программирование
Введение в Spring WebFlux
Spring WebFlux — это часть Spring Framework, предназначенная для создания неблокирующих, асинхронных веб-приложений. Он основан на Project Reactor и Reactive Streams API, что позволяет использовать Mono и Flux прямо в коде сервера. Это не замена Spring MVC (традиционному веб-фреймворку), а альтернатива для сценариев с высокой нагрузкой: микросервисы, реальное время, стриминг. Почему это новый подход? В традиционном веб (как Servlet с потоками на запрос) под пиковой нагрузкой сервер исчерпывает ресурсы — каждый запрос занимает поток, который висит в ожидании IO. WebFlux меняет это: использует event-loop (цикл обработки событий), где один поток обслуживает тысячи подключений, реагируя на события асинхронно. Это экономит CPU и память, делая приложения более отзывчивыми и масштабируемыми.
Почему Spring WebFlux: связь с реактивным мышлением
WebFlux воплощает принципы, которые мы разбирали: push-модель, где данные отправляются по готовности; обратное давление для контроля темпа; операторы для трансформаций. В экосистеме Spring он интегрируется с другими компонентами: Spring Boot для быстрого старта, Spring Data Reactive для баз данных (как R2DBC), Spring Security Reactive для безопасности. Ключевой выигрыш — для IO-bound задач: запросы к БД, API или файлам идут асинхронно, без блокировок. Если ваш сервис ждёт внешних ответов 90% времени, WebFlux высвобождает ресурсы, позволяя обрабатывать в 10-100 раз больше запросов на том же железе.
Чтобы начать: добавьте spring-boot-starter-webflux в зависимости (Maven/Gradle). Spring Boot автоматически настроит Reactor Netty как сервер (неблокирующий HTTP на базе Netty). Нет нужды в Tomcat — всё реактивно.
Ключевые компоненты Spring WebFlux
WebFlux предлагает два стиля разработки: аннотированный (похож на Spring MVC) и функциональный (роутеры как в Express.js). Оба используют Mono/Flux для ответов.
- Аннотированные контроллеры: @RestController с методами, возвращающими Mono или Flux.
Пример простого GET:
- Функциональные роутеры: Для маршрутизации без контроллеров.
Пример:
- WebClient: реактивный клиент для исходящих запросов, интегрируется seamlessly.
Ещё фишки: поддержка WebSockets для bidirectional стриминга, Server-Sent Events для push-уведомлений, интеграция с Schedulers для распределения задач.
Практические советы и подводные камни
- Миграция: Начните с аннотированных контроллеров — синтаксис похож на MVC, но возвращайте Mono/Flux.
- Тестирование: WebTestClient вместо MockMvc — асинхронные тесты с StepVerifier (из Reactor).
- Камень: Блокирующий код в контроллерах (JDBC, sleep) сломает асинхронность — используйте reactive драйверы (R2DBC) и publishOn(Schedulers.boundedElastic()).
- Производительность: Под нагрузкой мониторьте с Micrometer — WebFlux даёт метрики из коробки.
В реальной жизни: Netflix использует похожий стек для стриминга, где WebFlux обрабатывает миллионы подключений.
#Java #middle #Reactor #WebFlux
Введение в Spring WebFlux
Spring WebFlux — это часть Spring Framework, предназначенная для создания неблокирующих, асинхронных веб-приложений. Он основан на Project Reactor и Reactive Streams API, что позволяет использовать Mono и Flux прямо в коде сервера. Это не замена Spring MVC (традиционному веб-фреймворку), а альтернатива для сценариев с высокой нагрузкой: микросервисы, реальное время, стриминг. Почему это новый подход? В традиционном веб (как Servlet с потоками на запрос) под пиковой нагрузкой сервер исчерпывает ресурсы — каждый запрос занимает поток, который висит в ожидании IO. WebFlux меняет это: использует event-loop (цикл обработки событий), где один поток обслуживает тысячи подключений, реагируя на события асинхронно. Это экономит CPU и память, делая приложения более отзывчивыми и масштабируемыми.
Почему Spring WebFlux: связь с реактивным мышлением
WebFlux воплощает принципы, которые мы разбирали: push-модель, где данные отправляются по готовности; обратное давление для контроля темпа; операторы для трансформаций. В экосистеме Spring он интегрируется с другими компонентами: Spring Boot для быстрого старта, Spring Data Reactive для баз данных (как R2DBC), Spring Security Reactive для безопасности. Ключевой выигрыш — для IO-bound задач: запросы к БД, API или файлам идут асинхронно, без блокировок. Если ваш сервис ждёт внешних ответов 90% времени, WebFlux высвобождает ресурсы, позволяя обрабатывать в 10-100 раз больше запросов на том же железе.
Чтобы начать: добавьте spring-boot-starter-webflux в зависимости (Maven/Gradle). Spring Boot автоматически настроит Reactor Netty как сервер (неблокирующий HTTP на базе Netty). Нет нужды в Tomcat — всё реактивно.
Ключевые компоненты Spring WebFlux
WebFlux предлагает два стиля разработки: аннотированный (похож на Spring MVC) и функциональный (роутеры как в Express.js). Оба используют Mono/Flux для ответов.
- Аннотированные контроллеры: @RestController с методами, возвращающими Mono или Flux.
Пример простого GET:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class HelloController {
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.just("Привет из WebFlux!"); // Асинхронный ответ
}
}
Здесь метод возвращает Mono — сервер не блокируется, ответ "течёт" асинхронно. Для Flux: стриминг данных, как SSE (сервер-сент события).
- Функциональные роутеры: Для маршрутизации без контроллеров.
Пример:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> routes() {
return RouterFunctions.route()
.GET("/hello", request -> ServerResponse.ok().bodyValue("Привет из роутера!"))
.build();
}
}
Это декларативно: опишите маршруты, обработчики возвращают ServerResponse с Mono/Flux.
- WebClient: реактивный клиент для исходящих запросов, интегрируется seamlessly.
Ещё фишки: поддержка WebSockets для bidirectional стриминга, Server-Sent Events для push-уведомлений, интеграция с Schedulers для распределения задач.
Практические советы и подводные камни
- Миграция: Начните с аннотированных контроллеров — синтаксис похож на MVC, но возвращайте Mono/Flux.
- Тестирование: WebTestClient вместо MockMvc — асинхронные тесты с StepVerifier (из Reactor).
- Камень: Блокирующий код в контроллерах (JDBC, sleep) сломает асинхронность — используйте reactive драйверы (R2DBC) и publishOn(Schedulers.boundedElastic()).
- Производительность: Под нагрузкой мониторьте с Micrometer — WebFlux даёт метрики из коробки.
В реальной жизни: Netflix использует похожий стек для стриминга, где WebFlux обрабатывает миллионы подключений.
#Java #middle #Reactor #WebFlux
👍3
Реактивное программирование
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
В контроллере:
Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
#Java #middle #Reactor #WebFlux #Mono #Flux
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Для Gradle: implementation 'org.springframework.boot:spring-boot-starter-webflux'
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class GreetingController {
@GetMapping("/greet/{name}")
public Mono<String> greet(@PathVariable String name) {
return Mono.just("Привет, " + name + "! Добро пожаловать в реактивный мир.");
}
}
Здесь @GetMapping — аннотация для маршрута, @PathVariable — параметр из URL. Метод возвращает Mono.just — простой синхронный элемент. Но под капотом: WebFlux подпишется на Mono, и когда значение готово (сразу), отправит HTTP 200 с телом. Если бы внутри был асинхронный вызов (например, Mono.delay(Duration.ofSeconds(2)).map(ignore -> "Задержанный привет")), сервер не заблокируется — запрос обработается асинхронно.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
@Service
public class UserService {
public Mono<User> findUserById(Long id) {
return Mono.fromCallable(() -> /* симуляция БД */ new User(id, "Имя")); // Асинхронно
}
}
В контроллере:
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findUserById(id);
}Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
@PostMapping("/create-user")
public Mono<User> createUser(@RequestBody Mono<UserRequest> requestMono) {
return requestMono.flatMap(req -> {
// Асинхронная логика: создать пользователя
return userService.createUser(req.getName()).map(saved -> new User(saved.getId(), saved.getName()));
});
}
Здесь @RequestBody — Mono, потому что тело может приходить асинхронно (большие данные). flatMap — для цепочки (из поста 8), чтобы "развернуть" подпоток.#Java #middle #Reactor #WebFlux #Mono #Flux
👍2
Flux в контроллере: для коллекций и стриминга
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
Расширим: пагинация с @RequestParam.
Для POST с Flux телом (batch-операции):
Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
@GetMapping("/users")
public Flux<User> getAllUsers() {
return Flux.fromIterable(List.of(new User(1L, "Алиса"), new User(2L, "Боб")));
}Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
public Flux<User> findAllUsers() {
return Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1)) // Симуляция задержки
.map(i -> new User((long) i, "Пользователь " + i));
}Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
@GetMapping(value = "/stream-users", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return findAllUsers();
}
Теперь ответ — SSE: каждый элемент как event: data: {"id":1,"name":"Пользователь 1"}\n\n. Клиент (браузер или WebClient) может реагировать на каждый по отдельности, без ожидания всего. Это решает боли callback-ада: вместо polling (опроса сервера), сервер толкает данные (push из поста 3).
Расширим: пагинация с @RequestParam.
@GetMapping("/users-paged")
public Flux<User> getPagedUsers(@RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "10") int size) {
return userService.findPaged(page, size); // Сервис возвращает Flux<User>
}Для POST с Flux телом (batch-операции):
@PostMapping("/batch-users")
public Flux<User> createBatch(@RequestBody Flux<UserRequest> requestsFlux) {
return requestsFlux.flatMap(req -> userService.createUser(req.getName()));
}
flatMap — для параллельной обработки, возвращает Flux сохранённых пользователей.Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
@ExceptionHandler(UserNotFoundException.class)
public Mono<ResponseEntity<String>> handleNotFound(UserNotFoundException ex) {
return Mono.just(ResponseEntity.notFound().build());
}
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍1
Реактивное программирование
R2DBC vs JDBC: реактивные базы данных
Исторический контекст: что такое JDBC и почему он доминировал десятилетиями
JDBC — это стандартный API Java для доступа к реляционным базам данных, появившийся ещё в JDK 1.1 (1997 год).
Он позволяет выполнять SQL-запросы, управлять соединениями и обрабатывать результаты через унифицированный интерфейс, независимо от конкретной БД (PostgreSQL, MySQL, Oracle и т.д.).
Ключевые компоненты JDBC:
DriverManager или DataSource: Для получения соединения (Connection).
Statement/PreparedStatement: Для выполнения SQL (executeQuery, executeUpdate).
ResultSet: Для чтения результатов (next(), getString() и т.д.).
Transaction management: commit(), rollback().
Пример простого JDBC-кода:
Это синхронно и блокирующе: executeQuery() "виснет" до ответа от БД, блокируя текущий поток.
В традиционных приложениях (как Spring MVC) это работало: каждый запрос — отдельный поток из пула (например, Tomcat с 200 потоками), и если БД отвечает быстро, проблем нет. Но под высокой нагрузкой или с медленными запросами (сетевые задержки, сложные джойны) пул исчерпывается: потоки "спят" в ожидании IO, CPU простаивает, а новые запросы ждут в очереди, вызывая таймауты и отказы. Это классическая проблема асинхронщины: JDBC не предназначен для non-blocking IO, он полагается на blocking calls операционной системы.
В реактивных приложениях (WebFlux) использование JDBC — антипаттерн: если контроллер возвращает Mono, но внутри — blocking JDBC, весь выигрыш теряется. Поток из event-loop (Netty) блокируется, снижая throughput (пропускную способность). Вот почему нужен новый подход.
Проблемы JDBC в реактивном контексте: почему старый стандарт не справляется
Давайте разберём проблемы JDBC подробно, чтобы понять мотивацию R2DBC:
Блокирующая природа: Все операции (connect, query, fetch) — синхронны. В асинхронном коде это требует обёрток вроде CompletableFuture или offload на отдельный пул (Schedulers.boundedElastic()), но это хак: теряется истинная реактивность, и под нагрузкой пулы переполняются.
Отсутствие backpressure: ResultSet — pull-модель (next() получает данные), но без контроля темпа. Если результат огромный (миллионы строк), буфер переполняется, рискуя OOM (OutOfMemoryError). В реактивном мире (push с backpressure) это несовместимо.
Управление соединениями: JDBC полагается на пулы (HikariCP), но они ориентированы на blocking: соединение "занято" весь запрос. В реактиве нужно multiplexing — одно соединение для многих операций асинхронно.
Транзакции: @Transactional в Spring работает, но в реактиве требует специальной поддержки (reactive transactions), иначе — блокировки.
Масштабируемость: Под 10k+ RPS (requests per second) с БД-запросами JDBC требует огромных пулов потоков (тысячи), что жрёт память (каждый поток ~1MB стека) и контекст-свичинг.
Интеграция с Reactor: Нет native Publisher — результаты не "текут" как Flux, требуя ручной конвертации, что добавляет boilerplate и риски.
В итоге, JDBC — отличный для legacy или низконагруженных приложений, но в микросервисах с WebFlux он "ломает" реактивный стек, возвращая к болям callback-ада и ожиданий.
Введение в R2DBC: реактивный стандарт для реляционных БД
R2DBC — это спецификация (с 2019 года, под эгидой Spring и Pivotal), определяющая API для доступа к реляционным БД в реактивном стиле. Это не замена JDBC, а параллельный стандарт, ориентированный на non-blocking IO.
#Java #middle #Reactor #WebFlux #Mono #Flux #R2DBC
R2DBC vs JDBC: реактивные базы данных
Исторический контекст: что такое JDBC и почему он доминировал десятилетиями
JDBC — это стандартный API Java для доступа к реляционным базам данных, появившийся ещё в JDK 1.1 (1997 год).
Он позволяет выполнять SQL-запросы, управлять соединениями и обрабатывать результаты через унифицированный интерфейс, независимо от конкретной БД (PostgreSQL, MySQL, Oracle и т.д.).
Ключевые компоненты JDBC:
DriverManager или DataSource: Для получения соединения (Connection).
Statement/PreparedStatement: Для выполнения SQL (executeQuery, executeUpdate).
ResultSet: Для чтения результатов (next(), getString() и т.д.).
Transaction management: commit(), rollback().
Пример простого JDBC-кода:
import java.sql.*;
public class JdbcExample {
public static void main(String[] args) {
try (Connection conn = DriverManager.getConnection("jdbc:postgresql://localhost:5432/db", "user", "pass");
PreparedStatement stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?")) {
stmt.setLong(1, 1L);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
System.out.println("User: " + rs.getString("name"));
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
Это синхронно и блокирующе: executeQuery() "виснет" до ответа от БД, блокируя текущий поток.
В традиционных приложениях (как Spring MVC) это работало: каждый запрос — отдельный поток из пула (например, Tomcat с 200 потоками), и если БД отвечает быстро, проблем нет. Но под высокой нагрузкой или с медленными запросами (сетевые задержки, сложные джойны) пул исчерпывается: потоки "спят" в ожидании IO, CPU простаивает, а новые запросы ждут в очереди, вызывая таймауты и отказы. Это классическая проблема асинхронщины: JDBC не предназначен для non-blocking IO, он полагается на blocking calls операционной системы.
В реактивных приложениях (WebFlux) использование JDBC — антипаттерн: если контроллер возвращает Mono, но внутри — blocking JDBC, весь выигрыш теряется. Поток из event-loop (Netty) блокируется, снижая throughput (пропускную способность). Вот почему нужен новый подход.
Проблемы JDBC в реактивном контексте: почему старый стандарт не справляется
Давайте разберём проблемы JDBC подробно, чтобы понять мотивацию R2DBC:
Блокирующая природа: Все операции (connect, query, fetch) — синхронны. В асинхронном коде это требует обёрток вроде CompletableFuture или offload на отдельный пул (Schedulers.boundedElastic()), но это хак: теряется истинная реактивность, и под нагрузкой пулы переполняются.
Отсутствие backpressure: ResultSet — pull-модель (next() получает данные), но без контроля темпа. Если результат огромный (миллионы строк), буфер переполняется, рискуя OOM (OutOfMemoryError). В реактивном мире (push с backpressure) это несовместимо.
Управление соединениями: JDBC полагается на пулы (HikariCP), но они ориентированы на blocking: соединение "занято" весь запрос. В реактиве нужно multiplexing — одно соединение для многих операций асинхронно.
Транзакции: @Transactional в Spring работает, но в реактиве требует специальной поддержки (reactive transactions), иначе — блокировки.
Масштабируемость: Под 10k+ RPS (requests per second) с БД-запросами JDBC требует огромных пулов потоков (тысячи), что жрёт память (каждый поток ~1MB стека) и контекст-свичинг.
Интеграция с Reactor: Нет native Publisher — результаты не "текут" как Flux, требуя ручной конвертации, что добавляет boilerplate и риски.
В итоге, JDBC — отличный для legacy или низконагруженных приложений, но в микросервисах с WebFlux он "ломает" реактивный стек, возвращая к болям callback-ада и ожиданий.
Введение в R2DBC: реактивный стандарт для реляционных БД
R2DBC — это спецификация (с 2019 года, под эгидой Spring и Pivotal), определяющая API для доступа к реляционным БД в реактивном стиле. Это не замена JDBC, а параллельный стандарт, ориентированный на non-blocking IO.
#Java #middle #Reactor #WebFlux #Mono #Flux #R2DBC
👍1
Ключевые идеи:
Publisher-based API: Все операции возвращают Publisher (Mono/Flux из Reactive Streams): Connection как Mono<Connection>, Statement.execute() как Flux<Row>.
Non-blocking от начала до конца: Использует асинхронные драйверы (для PostgreSQL, MySQL и т.д.), где соединения мультиплексируются — одно для многих запросов.
Backpressure встроено: Результаты (Flux<Row>) уважают request(n): если подписчик не готов, БД не шлёт данные, избегая перегрузки.
Транзакции реактивные: Поддержка @Transactional с Mono/Flux.
Интеграция с экосистемой: Spring Data R2DBC — аналог Spring Data JPA, с репозиториями, @Query и CRUD.
Драйверы: r2dbc-postgresql, r2dbc-mysql и т.д. — реализуют спецификацию, используя неблокирующие сокеты (Netty или аналог).
Пример базового R2DBC-кода (без Spring):
Здесь usingWhen — реактивный try-with-resources: создаёт соединение асинхронно, выполняет запрос как Flux<Result>, map извлекает данные. Нет блокировок: если БД медленная, поток свободен.
Spring Data R2DBC: упрощение с репозиториями и аннотациями
Spring Data R2DBC — модуль, который абстрагирует R2DBC, как Spring Data JPA для JDBC.
Добавьте зависимость:
Настройте в application.properties:
Репозитории:
Сущность:
В сервисе/контроллере:
В контроллере:
Это декларативно: repo.findAll() — Flux, который "течёт" из БД без блокировок. Транзакции: @Transactional на методе — reactive, rollback асинхронно.
Расширенный пример: пагинация с ReactiveSortingRepository и Pageable.
Практические советы и подводные камни
Выбор БД: PostgreSQL — лучший для R2DBC (полная поддержка async).
Тестирование: Embedded H2 с r2dbc-h2, ReactiveTest для StepVerifier.
Камень: Нет full ORM (как JPA entities с relations) — используйте ручные joins или Spring Data Projections.
Камень: Транзакции не поддерживают propagation в nested методах fully — будьте осторожны.
Совет: Для hybrid (JDBC + R2DBC) — используйте разные DataSource, но избегайте в одном приложении.
Совет: Мониторьте с Micrometer: метрики на запросы, соединения.
#Java #middle #Reactor #WebFlux #Mono #Flux #R2DBC
Publisher-based API: Все операции возвращают Publisher (Mono/Flux из Reactive Streams): Connection как Mono<Connection>, Statement.execute() как Flux<Row>.
Non-blocking от начала до конца: Использует асинхронные драйверы (для PostgreSQL, MySQL и т.д.), где соединения мультиплексируются — одно для многих запросов.
Backpressure встроено: Результаты (Flux<Row>) уважают request(n): если подписчик не готов, БД не шлёт данные, избегая перегрузки.
Транзакции реактивные: Поддержка @Transactional с Mono/Flux.
Интеграция с экосистемой: Spring Data R2DBC — аналог Spring Data JPA, с репозиториями, @Query и CRUD.
Драйверы: r2dbc-postgresql, r2dbc-mysql и т.д. — реализуют спецификацию, используя неблокирующие сокеты (Netty или аналог).
Пример базового R2DBC-кода (без Spring):
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import reactor.core.publisher.Flux;
public void createConnectionFactory () {
ConnectionFactory factory = ConnectionFactories.get("r2dbc:postgresql://localhost:5432/db?username=user&password=pass");
Flux<String> namesFlux = Flux.usingWhen(
factory.create(), // Асинхронно создать соединение
conn -> conn.createStatement("SELECT name FROM users").execute().flatMap(result -> result.map((row, metadata) -> row.get("name", String.class))),
conn -> conn.close() // Асинхронно закрыть
);
namesFlux.subscribe(System.out::println); // Строки приходят асинхронно
}
Здесь usingWhen — реактивный try-with-resources: создаёт соединение асинхронно, выполняет запрос как Flux<Result>, map извлекает данные. Нет блокировок: если БД медленная, поток свободен.
Spring Data R2DBC: упрощение с репозиториями и аннотациями
Spring Data R2DBC — модуль, который абстрагирует R2DBC, как Spring Data JPA для JDBC.
Добавьте зависимость:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId> <!-- Для PostgreSQL -->
</dependency>
Настройте в application.properties:
spring.r2dbc.url=r2dbc:postgresql://localhost:5432/db
spring.r2dbc.username=user
spring.r2dbc.password=pass
Репозитории:
ReactiveRepository extends ReactiveCrudRepository<Entity, ID>.
Сущность:
@Entity
public class User {
@Id
private Long id;
private String name;
// Getters/setters
}
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
@Query("SELECT * FROM users WHERE name LIKE :name")
Flux<User> findByNameLike(String name);
}
В сервисе/контроллере:
@Service
public class UserService {
private final UserRepository repo;
public UserService(UserRepository repo) {
this.repo = repo;
}
public Flux<User> findAll() {
return repo.findAll(); // Flux асинхронно
}
public Mono<User> save(User user) {
return repo.save(user);
}
}
В контроллере:
@GetMapping("/users")
public Flux<User> getAllUsers() {
return userService.findAll();
}Это декларативно: repo.findAll() — Flux, который "течёт" из БД без блокировок. Транзакции: @Transactional на методе — reactive, rollback асинхронно.
Расширенный пример: пагинация с ReactiveSortingRepository и Pageable.
public interface UserRepository extends ReactiveSortingRepository<User, Long> {}
Flux<User> paged = repo.findAll(Sort.by("name").ascending()).skip(10).take(20); // Простая пагинация
Для complex: используйте @Query с параметрами, или Criteria API.Практические советы и подводные камни
Выбор БД: PostgreSQL — лучший для R2DBC (полная поддержка async).
Тестирование: Embedded H2 с r2dbc-h2, ReactiveTest для StepVerifier.
Камень: Нет full ORM (как JPA entities с relations) — используйте ручные joins или Spring Data Projections.
Камень: Транзакции не поддерживают propagation в nested методах fully — будьте осторожны.
Совет: Для hybrid (JDBC + R2DBC) — используйте разные DataSource, но избегайте в одном приложении.
Совет: Мониторьте с Micrometer: метрики на запросы, соединения.
#Java #middle #Reactor #WebFlux #Mono #Flux #R2DBC
👍1
Реактивное программирование
Горячие и холодные Publisher’ы в реактивном программировании
Publisher — это источник данных в Reactive Streams, который "толкает" элементы подписчикам (Subscriber). Но не все Publisher’ы одинаковы по поведению при множественных подписках. Это зависит от того, генерирует ли он данные независимо от подписчиков (горячий) или заново для каждого (холодный).
Холодный Publisher (Cold Publisher): Данные генерируются лениво — только при подписке, и для каждого подписчика отдельно. Это как видео по запросу: каждый зритель получает свою копию потока. Плюс: свежие данные каждый раз. Минус: если источник дорогой (запрос в БД, вычисления), повторяется зря.
Горячий Publisher (Hot Publisher): Данные генерируются независимо от подписчиков — поток "вещает" непрерывно. Подписчики "подключаются" к существующему потоку и получают данные с момента подписки. Это как живой эфир: все слушают одно и то же, но опоздавшие пропускают начало. Плюс: экономия ресурсов (один источник). Минус: данные могут быть "старыми" или пропущенными.
В Project Reactor большинство конструкторов — холодные (just, fromIterable, range), но есть горячие (interval, push). Поведение можно менять операторами (share, cache).
Примеры холодных Publisher’ов: ленивость и независимость
Холодные — default в Reactor: подписка запускает генерацию заново.
Пример с Mono (одиночный элемент, но принцип тот же):
С Flux:
Примеры горячих Publisher’ов: общий поток и вещание
Горячие — генерируют данные один раз, подписчики "присоединяются".
Пример с Flux.push (горячий по дизайну):
Другой горячий:
#Java #middle #Reactor #WebFlux #Mono #Flux
Горячие и холодные Publisher’ы в реактивном программировании
Publisher — это источник данных в Reactive Streams, который "толкает" элементы подписчикам (Subscriber). Но не все Publisher’ы одинаковы по поведению при множественных подписках. Это зависит от того, генерирует ли он данные независимо от подписчиков (горячий) или заново для каждого (холодный).
Холодный Publisher (Cold Publisher): Данные генерируются лениво — только при подписке, и для каждого подписчика отдельно. Это как видео по запросу: каждый зритель получает свою копию потока. Плюс: свежие данные каждый раз. Минус: если источник дорогой (запрос в БД, вычисления), повторяется зря.
Горячий Publisher (Hot Publisher): Данные генерируются независимо от подписчиков — поток "вещает" непрерывно. Подписчики "подключаются" к существующему потоку и получают данные с момента подписки. Это как живой эфир: все слушают одно и то же, но опоздавшие пропускают начало. Плюс: экономия ресурсов (один источник). Минус: данные могут быть "старыми" или пропущенными.
В Project Reactor большинство конструкторов — холодные (just, fromIterable, range), но есть горячие (interval, push). Поведение можно менять операторами (share, cache).
Примеры холодных Publisher’ов: ленивость и независимость
Холодные — default в Reactor: подписка запускает генерацию заново.
Пример с Mono (одиночный элемент, но принцип тот же):
Mono<String> coldMono = Mono.fromCallable(() -> {
System.out.println("Генерация данных...");
return "Данные";
});
coldMono.subscribe(System.out::println); // Вывод: "Генерация данных..." и "Данные"
coldMono.subscribe(System.out::println); // Снова: "Генерация данных..." и "Данные"
Каждый subscribe() вызывает fromCallable заново — данные свежие, но если это запрос в API, будет два вызова.С Flux:
Flux<Integer> coldFlux = Flux.range(1, 3).doOnSubscribe(sub -> System.out.println("Новая подписка!"));
coldFlux.subscribe(val -> System.out.println("Подписчик 1: " + val));
coldFlux.subscribe(val -> System.out.println("Подписчик 2: " + val));
// Вывод: "Новая подписка!" + 1,2,3 для первого; "Новая подписка!" + 1,2,3 для второго
Каждый подписчик получает полный поток независимо. Полезно для idempotent операций (без side-effects), как чтение статичных данных.
Асинхронный пример: coldFlux = Flux.interval(Duration.ofSeconds(1)).take(3); // Каждый subscribe() запускает свой таймер.Примеры горячих Publisher’ов: общий поток и вещание
Горячие — генерируют данные один раз, подписчики "присоединяются".
Пример с Flux.push (горячий по дизайну):
ConnectableFlux<Integer> hotFlux = Flux.push(sink -> {
// Симулируем внешний источник
new Thread(() -> {
for (int i = 1; i <= 3; i++) {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
sink.next(i);
}
sink.complete();
}).start();
});
hotFlux.subscribe(val -> System.out.println("Подписчик 1: " + val));
Thread.sleep(1500); // Ждём, чтобы пропустить начало
hotFlux.subscribe(val -> System.out.println("Подписчик 2: " + val));
hotFlux.connect(); // Запуск горячего
// Вывод примерно: Подписчик 1: 1 (1с), 2 (2с), 3 (3с); Подписчик 2: 2 (присоединился после 1), 3
Второй пропустил 1 — поток общий. Connect() — триггер для ConnectableFlux (обёртка для горячих).Другой горячий:
Flux.interval(Duration.ofSeconds(1)) — бесконечный таймер, вещает независимо.
Оператор share(): Делает холодный горячим.
Flux<Integer> shared = coldFlux.share();
shared.subscribe(...); // Запускает
shared.subscribe(...); // Присоединяется к существующему
#Java #middle #Reactor #WebFlux #Mono #Flux
👍2
Переключение типов: операторы для контроля
Из холодного в горячий: share() (для Flux), cache() (кэширует элементы для повторов), publish() (ConnectableFlux с backpressure).
Пример cache:
coldMono.cache() — первый subscribe генерирует, последующие — из кэша.
Из горячего в холодный: Редко нужно, но replay() на ConnectableFlux кэширует историю для новых подписчиков.
Сценарии:
Холодный: Запросы к БД (каждый клиент — свежие данные).
Горячий: Мониторинг (один сенсор — всем подписчикам), стриминг событий (Kafka topic).
Практические советы и подводные камни
Диагностика: doOnSubscribe(() -> log("Subscribe")) — увидите, сколько раз запускается.
Камень: Холодный с side-effects (мутации) — непредсказуемо при множественных подписках; используйте defer() для ленивости.
Камень: Горячий бесконечный без take() — утечки; добавьте refCount() на publish() для авто-отписки при 0 подписчиках.
Совет: В WebFlux — Flux из БД (R2DBC, пост 17) холодный по умолчанию; share() для кэширования результатов.
Тестирование: StepVerifier с .publish() для симуляции горячих.
#Java #middle #Reactor #WebFlux #Mono #Flux
Из холодного в горячий: share() (для Flux), cache() (кэширует элементы для повторов), publish() (ConnectableFlux с backpressure).
Пример cache:
coldMono.cache() — первый subscribe генерирует, последующие — из кэша.
Из горячего в холодный: Редко нужно, но replay() на ConnectableFlux кэширует историю для новых подписчиков.
Сценарии:
Холодный: Запросы к БД (каждый клиент — свежие данные).
Горячий: Мониторинг (один сенсор — всем подписчикам), стриминг событий (Kafka topic).
Практические советы и подводные камни
Диагностика: doOnSubscribe(() -> log("Subscribe")) — увидите, сколько раз запускается.
Камень: Холодный с side-effects (мутации) — непредсказуемо при множественных подписках; используйте defer() для ленивости.
Камень: Горячий бесконечный без take() — утечки; добавьте refCount() на publish() для авто-отписки при 0 подписчиках.
Совет: В WebFlux — Flux из БД (R2DBC, пост 17) холодный по умолчанию; share() для кэширования результатов.
Тестирование: StepVerifier с .publish() для симуляции горячих.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍2