Практические советы и подводные камни
- Диагностика: используйте doOnNext(() -> Thread.currentThread().getName()) для логирования потоков.
- Shutdown: Schedulers.shutdownNow() для cleanup, но в Spring — автоматом.
- проблема: blocking в parallel() — заблокирует все CPU; всегда publishOn(boundedElastic()) перед blocking.
- Оптимизация: для веб — WebFlux использует Schedulers.default() на Netty event-loop (для IO).
- Тестирование: VirtualTimeScheduler для симуляции времени в StepVerifier.
В практике: в микросервисах — subscribeOn(parallel()) для вычислений, publishOn(boundedElastic()) для БД-запросов.
Реактивные API-запросы с WebClient
WebClient — declarative: строите запрос, получаете Mono/Flux с ответом. Под капотом — Netty или HttpClient для IO, с Schedulers.boundedElastic() по умолчанию. Давайте разберём создание, методы (GET/POST), обработку и примеры — добавьте spring-webflux в зависимости и пробуйте.
Создание и базовая настройка WebClient
WebClient.builder() для кастомизации: baseUrl, headers, client (Netty/HttpClient).
Пример:
Реактивные запросы: GET, POST и обработка
- GET: retrieve() возвращает Mono<ResponseEntity> или Flux для стриминга.
- POST: bodyValue() для данных, exchange() для полного контроля.
Пример GET с Mono:
Пример POST с Flux:
Обработка ответов и ошибок
Retrieve() даёт bodyToMono/Flux, onStatus для ошибок (4xx/5xx).
Пример с обработкой:
Практические советы и подводные камни
- Таймауты: client.mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5)))).build();
- HTTPS: автоматом, но настройте trustStore если нужно.
- Тестирование: WebClient с mock (WireMock) и StepVerifier.
В практике: в Spring Boot контроллер вызывает WebClient для агрегации от сервисов — асинхронно, без ожидания.
#Java #middle #Reactor #WebClient #Schedulers
- Диагностика: используйте doOnNext(() -> Thread.currentThread().getName()) для логирования потоков.
- Shutdown: Schedulers.shutdownNow() для cleanup, но в Spring — автоматом.
- проблема: blocking в parallel() — заблокирует все CPU; всегда publishOn(boundedElastic()) перед blocking.
- Оптимизация: для веб — WebFlux использует Schedulers.default() на Netty event-loop (для IO).
- Тестирование: VirtualTimeScheduler для симуляции времени в StepVerifier.
В практике: в микросервисах — subscribeOn(parallel()) для вычислений, publishOn(boundedElastic()) для БД-запросов.
Реактивные API-запросы с WebClient
WebClient — declarative: строите запрос, получаете Mono/Flux с ответом. Под капотом — Netty или HttpClient для IO, с Schedulers.boundedElastic() по умолчанию. Давайте разберём создание, методы (GET/POST), обработку и примеры — добавьте spring-webflux в зависимости и пробуйте.
Создание и базовая настройка WebClient
WebClient.builder() для кастомизации: baseUrl, headers, client (Netty/HttpClient).
Пример:
import org.springframework.web.reactive.function.client.WebClient;
WebClient client = WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader("Authorization", "Bearer token")
.build();
Это готово к запросам. Для кодеков (JSON) — добавьте Jackson или Gson.
Реактивные запросы: GET, POST и обработка
- GET: retrieve() возвращает Mono<ResponseEntity> или Flux для стриминга.
- POST: bodyValue() для данных, exchange() для полного контроля.
Пример GET с Mono:
Mono<String> responseMono = client.get()
.uri("/users/{id}", 1) // URI с параметрами
.retrieve()
.bodyToMono(String.class); // Декодируем в String
responseMono.subscribe(System.out::println); // Асинхронно: ответ придёт позже
Для JSON: bodyToMono(User.class) с Jackson.
Пример POST с Flux:
Flux<User> usersFlux = Flux.just(new User("Alice"), new User("Bob"));
Mono<Void> postMono = client.post()
.uri("/users/batch")
.body(usersFlux, User.class) // Тело как Flux
.retrieve()
.bodyToMono(Void.class);
postMono.subscribe(); // Отправка асинхронно
WebClient уважает backpressure: если сервер шлёт Flux (SSE), клиент запрашивает по мере обработки.
С Schedulers: по умолчанию на event-loop, но publishOn для кастом.
Обработка ответов и ошибок
Retrieve() даёт bodyToMono/Flux, onStatus для ошибок (4xx/5xx).
Пример с обработкой:
client.get()
.uri("/data")
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, resp -> Mono.error(new BadRequestException("Ошибка клиента")))
.bodyToFlux(Data.class)
.doOnError(e -> System.err.println("Сбой: " + e)) // Из поста 10
.retry(3) // Retry на сетевые ошибки
.subscribe(data -> process(data));
Это интегрирует с Reactor: ошибки как onError, retry для flaky API.
Для стриминга: bodyToFlux для больших данных, без загрузки всего в память.
Практические советы и подводные камни
- Таймауты: client.mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5)))).build();
- HTTPS: автоматом, но настройте trustStore если нужно.
- Тестирование: WebClient с mock (WireMock) и StepVerifier.
В практике: в Spring Boot контроллер вызывает WebClient для агрегации от сервисов — асинхронно, без ожидания.
#Java #middle #Reactor #WebClient #Schedulers
👍2
Что выведет код?
#Tasks
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.time.Duration;
public class Task141025 {
public static void main(String[] args) {
WebClient webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Mono<String> result = webClient.get()
.uri("/api/data")
.retrieve()
.bodyToMono(String.class)
.timeout(Duration.ofMillis(100))
.onErrorResume(throwable -> {
System.out.println("Timeout occurred!");
return Mono.just("Fallback");
})
.flatMap(response -> {
System.out.println("Processing: " + response);
return Mono.just("Processed: " + response);
});
System.out.println("Final: " + result.block());
}
}
#Tasks
👍1
Вопрос с собеседований
Что такое livelock?🤓
Ответ:
Livelock — это ситуация, когда потоки не блокируются, но из-за постоянных изменений состояния они не могут выполнить задачу.
Это похоже на танец: два потока уступают друг другу, но никто не продвигается.
Решение — алгоритмическая корректировка.
#собеседование
Что такое livelock?
Ответ:
Livelock
Это похоже на танец: два потока уступают друг другу, но никто не продвигается.
Решение — алгоритмическая корректировка.
#собеседование
Please open Telegram to view this post
VIEW IN TELEGRAM
👍4