Реактивное программирование
Управление потоками в Reactor: Schedulers
Schedulers — часть Reactor Core, они интегрируют с Java's ExecutorService, но абстрагируют детали: вы выбираете тип, а Reactor управляет жизненным циклом. По умолчанию Reactor использует Schedulers.parallel() для вычислений, но вы можете переключать с помощью операторов publishOn (для downstream) и subscribeOn (для upstream). Это даёт контроль: CPU-bound задачи на параллельных потоках, IO-bound — на эластичных. Давайте разберём типы schedulers, операторы и примеры — пробуйте в коде с reactor-core.
Типы Schedulers: выбор под задачу
Reactor предоставляет готовые планировщики для разных сценариев. Каждый — фабрика потоков с настройками (размер пула, демонизация).
- parallel(): Для CPU-bound задач (расчёты). Фиксированный пул = кол-во CPU ядер. Не демоны — ждёт завершения. Идеален для map с тяжёлыми вычислениями.
- boundedElastic(): Для IO-bound (сеть, файлы, БД) с блокирующими операциями. Эластичный пул: до 10*CPU, но переиспользует. Демоны — приложение не ждёт. Полезен, если в лямбде "грязный" код (sleep, blocking IO).
- single(): Один поток для всех задач. Для последовательных операций, где параллелизм не нужен (тесты, простые цепочки).
- immediate(): Выполняет на текущем потоке. Для синхронных тестов или когда асинхронность не требуется.
- fromExecutor(Executor): Кастомный из вашего пула.
Пример создания:
Операторы для переключения: publishOn и subscribeOn
- publishOn(Scheduler): Переключает downstream (операции после него) на указанный scheduler. Влияет на onNext, map, filter и т.д. после точки.
- subscribeOn(Scheduler): Переключает upstream (источник и подписку) на scheduler. Влияет на весь поток от subscribe().
Пример комбинации:
Почему важно? Без schedulers все на default (parallel), но IO может заблокировать CPU-потоки. PublishOn изолирует блокирующие части, subscribeOn — для источников (например, blocking чтение файла на отдельном потоке).
Управление контекстом и параллелизмом
Schedulers помогают с контекстом: в Reactor есть Context для хранения данных (как ThreadLocal, но реактивный). Операторы как withScheduler сохраняют его.
Для параллелизма: в flatMap добавьте concurrency: flatMap(func, concurrency, prefetch), где concurrency — max параллельных подпотоков, prefetch — буфер.
Пример с параллелизмом:
#Java #middle #Reactor #WebClient #Schedulers
Управление потоками в Reactor: Schedulers
Schedulers — часть Reactor Core, они интегрируют с Java's ExecutorService, но абстрагируют детали: вы выбираете тип, а Reactor управляет жизненным циклом. По умолчанию Reactor использует Schedulers.parallel() для вычислений, но вы можете переключать с помощью операторов publishOn (для downstream) и subscribeOn (для upstream). Это даёт контроль: CPU-bound задачи на параллельных потоках, IO-bound — на эластичных. Давайте разберём типы schedulers, операторы и примеры — пробуйте в коде с reactor-core.
Типы Schedulers: выбор под задачу
Reactor предоставляет готовые планировщики для разных сценариев. Каждый — фабрика потоков с настройками (размер пула, демонизация).
- parallel(): Для CPU-bound задач (расчёты). Фиксированный пул = кол-во CPU ядер. Не демоны — ждёт завершения. Идеален для map с тяжёлыми вычислениями.
- boundedElastic(): Для IO-bound (сеть, файлы, БД) с блокирующими операциями. Эластичный пул: до 10*CPU, но переиспользует. Демоны — приложение не ждёт. Полезен, если в лямбде "грязный" код (sleep, blocking IO).
- single(): Один поток для всех задач. Для последовательных операций, где параллелизм не нужен (тесты, простые цепочки).
- immediate(): Выполняет на текущем потоке. Для синхронных тестов или когда асинхронность не требуется.
- fromExecutor(Executor): Кастомный из вашего пула.
Пример создания:
import reactor.core.scheduler.Schedulers;
Schedulers.parallel(); // Готов к использованию
Schedulers не создают потоки заранее — ленивые, как Mono/Flux.
Операторы для переключения: publishOn и subscribeOn
- publishOn(Scheduler): Переключает downstream (операции после него) на указанный scheduler. Влияет на onNext, map, filter и т.д. после точки.
- subscribeOn(Scheduler): Переключает upstream (источник и подписку) на scheduler. Влияет на весь поток от subscribe().
Пример комбинации:
import reactor.core.publisher.Flux;
import java.time.Duration;
Flux<Integer> flux = Flux.range(1, 3)
.map(i -> { // На subscribeOn
System.out.println("Map1 на потоке: " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.boundedElastic()) // Переключаем downstream
.delayElements(Duration.ofSeconds(1)) // На boundedElastic
.map(i -> { // На boundedElastic
System.out.println("Map2 на потоке: " + Thread.currentThread().getName());
return i + 1;
});
flux.subscribeOn(Schedulers.single()) // Upstream на single
.subscribe(System.out::println);
Вывод покажет: Map1 на "single-1", delay и Map2 на "boundedElastic-X". Это асинхронно: основной поток не блокируется.
Почему важно? Без schedulers все на default (parallel), но IO может заблокировать CPU-потоки. PublishOn изолирует блокирующие части, subscribeOn — для источников (например, blocking чтение файла на отдельном потоке).
Управление контекстом и параллелизмом
Schedulers помогают с контекстом: в Reactor есть Context для хранения данных (как ThreadLocal, но реактивный). Операторы как withScheduler сохраняют его.
Для параллелизма: в flatMap добавьте concurrency: flatMap(func, concurrency, prefetch), где concurrency — max параллельных подпотоков, prefetch — буфер.
Пример с параллелизмом:
Flux.range(1, 10)
.flatMap(i -> Mono.just(i).delayElement(Duration.ofMillis(100)), 2) // Max 2 параллельно
.subscribeOn(Schedulers.parallel())
.blockLast(); // Для теста, в prod не блокируйте!
Это ограничивает: не 10 задержек сразу, а по 2.
#Java #middle #Reactor #WebClient #Schedulers
👍2
Практические советы и подводные камни
- Диагностика: используйте doOnNext(() -> Thread.currentThread().getName()) для логирования потоков.
- Shutdown: Schedulers.shutdownNow() для cleanup, но в Spring — автоматом.
- проблема: blocking в parallel() — заблокирует все CPU; всегда publishOn(boundedElastic()) перед blocking.
- Оптимизация: для веб — WebFlux использует Schedulers.default() на Netty event-loop (для IO).
- Тестирование: VirtualTimeScheduler для симуляции времени в StepVerifier.
В практике: в микросервисах — subscribeOn(parallel()) для вычислений, publishOn(boundedElastic()) для БД-запросов.
Реактивные API-запросы с WebClient
WebClient — declarative: строите запрос, получаете Mono/Flux с ответом. Под капотом — Netty или HttpClient для IO, с Schedulers.boundedElastic() по умолчанию. Давайте разберём создание, методы (GET/POST), обработку и примеры — добавьте spring-webflux в зависимости и пробуйте.
Создание и базовая настройка WebClient
WebClient.builder() для кастомизации: baseUrl, headers, client (Netty/HttpClient).
Пример:
Реактивные запросы: GET, POST и обработка
- GET: retrieve() возвращает Mono<ResponseEntity> или Flux для стриминга.
- POST: bodyValue() для данных, exchange() для полного контроля.
Пример GET с Mono:
Пример POST с Flux:
Обработка ответов и ошибок
Retrieve() даёт bodyToMono/Flux, onStatus для ошибок (4xx/5xx).
Пример с обработкой:
Практические советы и подводные камни
- Таймауты: client.mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5)))).build();
- HTTPS: автоматом, но настройте trustStore если нужно.
- Тестирование: WebClient с mock (WireMock) и StepVerifier.
В практике: в Spring Boot контроллер вызывает WebClient для агрегации от сервисов — асинхронно, без ожидания.
#Java #middle #Reactor #WebClient #Schedulers
- Диагностика: используйте doOnNext(() -> Thread.currentThread().getName()) для логирования потоков.
- Shutdown: Schedulers.shutdownNow() для cleanup, но в Spring — автоматом.
- проблема: blocking в parallel() — заблокирует все CPU; всегда publishOn(boundedElastic()) перед blocking.
- Оптимизация: для веб — WebFlux использует Schedulers.default() на Netty event-loop (для IO).
- Тестирование: VirtualTimeScheduler для симуляции времени в StepVerifier.
В практике: в микросервисах — subscribeOn(parallel()) для вычислений, publishOn(boundedElastic()) для БД-запросов.
Реактивные API-запросы с WebClient
WebClient — declarative: строите запрос, получаете Mono/Flux с ответом. Под капотом — Netty или HttpClient для IO, с Schedulers.boundedElastic() по умолчанию. Давайте разберём создание, методы (GET/POST), обработку и примеры — добавьте spring-webflux в зависимости и пробуйте.
Создание и базовая настройка WebClient
WebClient.builder() для кастомизации: baseUrl, headers, client (Netty/HttpClient).
Пример:
import org.springframework.web.reactive.function.client.WebClient;
WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader("Authorization", "Bearer token")
.build();
Это готово к запросам. Для кодеков (JSON) — добавьте Jackson или Gson.
Реактивные запросы: GET, POST и обработка
- GET: retrieve() возвращает Mono<ResponseEntity> или Flux для стриминга.
- POST: bodyValue() для данных, exchange() для полного контроля.
Пример GET с Mono:
Mono<String> responseMono = client.get()
.uri("/users/{id}", 1) // URI с параметрами
.retrieve()
.bodyToMono(String.class); // Декодируем в String
responseMono.subscribe(System.out::println); // Асинхронно: ответ придёт позже
Для JSON: bodyToMono(User.class) с Jackson.
Пример POST с Flux:
Flux<User> usersFlux = Flux.just(new User("Alice"), new User("Bob"));
Mono<Void> postMono = client.post()
.uri("/users/batch")
.body(usersFlux, User.class) // Тело как Flux
.retrieve()
.bodyToMono(Void.class);
postMono.subscribe(); // Отправка асинхронно
WebClient уважает backpressure: если сервер шлёт Flux (SSE), клиент запрашивает по мере обработки.
С Schedulers: по умолчанию на event-loop, но publishOn для кастом.
Обработка ответов и ошибок
Retrieve() даёт bodyToMono/Flux, onStatus для ошибок (4xx/5xx).
Пример с обработкой:
client.get()
.uri("/data")
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, resp -> Mono.error(new BadRequestException("Ошибка клиента")))
.bodyToFlux(Data.class)
.doOnError(e -> System.err.println("Сбой: " + e)) // Из поста 10
.retry(3) // Retry на сетевые ошибки
.subscribe(data -> process(data));
Это интегрирует с Reactor: ошибки как onError, retry для flaky API.
Для стриминга: bodyToFlux для больших данных, без загрузки всего в память.
Практические советы и подводные камни
- Таймауты: client.mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5)))).build();
- HTTPS: автоматом, но настройте trustStore если нужно.
- Тестирование: WebClient с mock (WireMock) и StepVerifier.
В практике: в Spring Boot контроллер вызывает WebClient для агрегации от сервисов — асинхронно, без ожидания.
#Java #middle #Reactor #WebClient #Schedulers
👍2