Java for Beginner
746 subscribers
719 photos
202 videos
12 files
1.16K links
Канал от новичков для новичков!
Изучайте Java вместе с нами!
Здесь мы обмениваемся опытом и постоянно изучаем что-то новое!

Наш YouTube канал - https://www.youtube.com/@Java_Beginner-Dev

Наш канал на RUTube - https://rutube.ru/channel/37896292/
Download Telegram
Реактивное программирование

Управление потоками в Reactor: Schedulers

Schedulers — часть Reactor Core, они интегрируют с Java's ExecutorService, но абстрагируют детали: вы выбираете тип, а Reactor управляет жизненным циклом. По умолчанию Reactor использует Schedulers.parallel() для вычислений, но вы можете переключать с помощью операторов publishOn (для downstream) и subscribeOn (для upstream). Это даёт контроль: CPU-bound задачи на параллельных потоках, IO-bound — на эластичных. Давайте разберём типы schedulers, операторы и примеры — пробуйте в коде с reactor-core.


Типы Schedulers: выбор под задачу


Reactor предоставляет готовые планировщики для разных сценариев. Каждый — фабрика потоков с настройками (размер пула, демонизация).

- parallel(): Для CPU-bound задач (расчёты). Фиксированный пул = кол-во CPU ядер. Не демоны — ждёт завершения. Идеален для map с тяжёлыми вычислениями.
- boundedElastic(): Для IO-bound (сеть, файлы, БД) с блокирующими операциями. Эластичный пул: до 10*CPU, но переиспользует. Демоны — приложение не ждёт. Полезен, если в лямбде "грязный" код (sleep, blocking IO).
- single(): Один поток для всех задач. Для последовательных операций, где параллелизм не нужен (тесты, простые цепочки).
- immediate(): Выполняет на текущем потоке. Для синхронных тестов или когда асинхронность не требуется.
- fromExecutor(Executor): Кастомный из вашего пула.


Пример создания:
import reactor.core.scheduler.Schedulers;

Schedulers.parallel(); // Готов к использованию

Schedulers не создают потоки заранее — ленивые, как Mono/Flux.



Операторы для переключения: publishOn и subscribeOn

- publishOn(Scheduler): Переключает downstream (операции после него) на указанный scheduler. Влияет на onNext, map, filter и т.д. после точки.
- subscribeOn(Scheduler): Переключает upstream (источник и подписку) на scheduler. Влияет на весь поток от subscribe().


Пример комбинации:
import reactor.core.publisher.Flux;
import java.time.Duration;

Flux<Integer> flux = Flux.range(1, 3)
.map(i -> { // На subscribeOn
System.out.println("Map1 на потоке: " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.boundedElastic()) // Переключаем downstream
.delayElements(Duration.ofSeconds(1)) // На boundedElastic
.map(i -> { // На boundedElastic
System.out.println("Map2 на потоке: " + Thread.currentThread().getName());
return i + 1;
});

flux.subscribeOn(Schedulers.single()) // Upstream на single
.subscribe(System.out::println);

Вывод покажет: Map1 на "single-1", delay и Map2 на "boundedElastic-X". Это асинхронно: основной поток не блокируется.


Почему важно? Без schedulers все на default (parallel), но IO может заблокировать CPU-потоки. PublishOn изолирует блокирующие части, subscribeOn — для источников (например, blocking чтение файла на отдельном потоке).


Управление контекстом и параллелизмом

Schedulers помогают с контекстом: в Reactor есть Context для хранения данных (как ThreadLocal, но реактивный). Операторы как withScheduler сохраняют его.

Для параллелизма: в flatMap добавьте concurrency: flatMap(func, concurrency, prefetch), где concurrency — max параллельных подпотоков, prefetch — буфер.

Пример с параллелизмом:
Flux.range(1, 10)
.flatMap(i -> Mono.just(i).delayElement(Duration.ofMillis(100)), 2) // Max 2 параллельно
.subscribeOn(Schedulers.parallel())
.blockLast(); // Для теста, в prod не блокируйте!

Это ограничивает: не 10 задержек сразу, а по 2.


#Java #middle #Reactor #WebClient #Schedulers
👍2
Практические советы и подводные камни

- Диагностика: используйте doOnNext(() -> Thread.currentThread().getName()) для логирования потоков.
- Shutdown: Schedulers.shutdownNow() для cleanup, но в Spring — автоматом.
- проблема: blocking в parallel() — заблокирует все CPU; всегда publishOn(boundedElastic()) перед blocking.
- Оптимизация: для веб — WebFlux использует Schedulers.default() на Netty event-loop (для IO).
- Тестирование: VirtualTimeScheduler для симуляции времени в StepVerifier.

В практике: в микросервисах — subscribeOn(parallel()) для вычислений, publishOn(boundedElastic()) для БД-запросов.



Реактивные API-запросы с
WebClient

WebClient — declarative: строите запрос, получаете Mono/Flux с ответом. Под капотом — Netty или HttpClient для IO, с Schedulers.boundedElastic() по умолчанию. Давайте разберём создание, методы (GET/POST), обработку и примеры — добавьте spring-webflux в зависимости и пробуйте.


Создание и базовая настройка WebClient

WebClient.builder() для кастомизации: baseUrl, headers, client (Netty/HttpClient).

Пример:
import org.springframework.web.reactive.function.client.WebClient;

WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader("Authorization", "Bearer token")
.build();

Это готово к запросам. Для кодеков (JSON) — добавьте Jackson или Gson.


Реактивные запросы: GET, POST и обработка

- GET: retrieve() возвращает Mono<ResponseEntity> или Flux для стриминга.
- POST: bodyValue() для данных, exchange() для полного контроля.


Пример GET с Mono:
Mono<String> responseMono = client.get()
.uri("/users/{id}", 1) // URI с параметрами
.retrieve()
.bodyToMono(String.class); // Декодируем в String

responseMono.subscribe(System.out::println); // Асинхронно: ответ придёт позже

Для JSON: bodyToMono(User.class) с Jackson.


Пример POST с Flux:

Flux<User> usersFlux = Flux.just(new User("Alice"), new User("Bob"));

Mono<Void> postMono = client.post()
.uri("/users/batch")
.body(usersFlux, User.class) // Тело как Flux
.retrieve()
.bodyToMono(Void.class);

postMono.subscribe(); // Отправка асинхронно

WebClient уважает backpressure: если сервер шлёт Flux (SSE), клиент запрашивает по мере обработки.

С Schedulers: по умолчанию на event-loop, но publishOn для кастом.



Обработка ответов и ошибок

Retrieve() даёт bodyToMono/Flux, onStatus для ошибок (4xx/5xx).

Пример с обработкой:
client.get()
.uri("/data")
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, resp -> Mono.error(new BadRequestException("Ошибка клиента")))
.bodyToFlux(Data.class)
.doOnError(e -> System.err.println("Сбой: " + e)) // Из поста 10
.retry(3) // Retry на сетевые ошибки
.subscribe(data -> process(data));

Это интегрирует с Reactor: ошибки как onError, retry для flaky API.

Для стриминга: bodyToFlux для больших данных, без загрузки всего в память.


Практические советы и подводные камни

- Таймауты: client.mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5)))).build();
- HTTPS: автоматом, но настройте trustStore если нужно.
- Тестирование:
WebClient с mock (WireMock) и StepVerifier.

В практике: в Spring Boot контроллер вызывает
WebClient для агрегации от сервисов — асинхронно, без ожидания.


#Java #middle #Reactor #WebClient #Schedulers
👍2