Реактивное программирование
Введение в Spring WebFlux
Spring WebFlux — это часть Spring Framework, предназначенная для создания неблокирующих, асинхронных веб-приложений. Он основан на Project Reactor и Reactive Streams API, что позволяет использовать Mono и Flux прямо в коде сервера. Это не замена Spring MVC (традиционному веб-фреймворку), а альтернатива для сценариев с высокой нагрузкой: микросервисы, реальное время, стриминг. Почему это новый подход? В традиционном веб (как Servlet с потоками на запрос) под пиковой нагрузкой сервер исчерпывает ресурсы — каждый запрос занимает поток, который висит в ожидании IO. WebFlux меняет это: использует event-loop (цикл обработки событий), где один поток обслуживает тысячи подключений, реагируя на события асинхронно. Это экономит CPU и память, делая приложения более отзывчивыми и масштабируемыми.
Почему Spring WebFlux: связь с реактивным мышлением
WebFlux воплощает принципы, которые мы разбирали: push-модель, где данные отправляются по готовности; обратное давление для контроля темпа; операторы для трансформаций. В экосистеме Spring он интегрируется с другими компонентами: Spring Boot для быстрого старта, Spring Data Reactive для баз данных (как R2DBC), Spring Security Reactive для безопасности. Ключевой выигрыш — для IO-bound задач: запросы к БД, API или файлам идут асинхронно, без блокировок. Если ваш сервис ждёт внешних ответов 90% времени, WebFlux высвобождает ресурсы, позволяя обрабатывать в 10-100 раз больше запросов на том же железе.
Чтобы начать: добавьте spring-boot-starter-webflux в зависимости (Maven/Gradle). Spring Boot автоматически настроит Reactor Netty как сервер (неблокирующий HTTP на базе Netty). Нет нужды в Tomcat — всё реактивно.
Ключевые компоненты Spring WebFlux
WebFlux предлагает два стиля разработки: аннотированный (похож на Spring MVC) и функциональный (роутеры как в Express.js). Оба используют Mono/Flux для ответов.
- Аннотированные контроллеры: @RestController с методами, возвращающими Mono или Flux.
Пример простого GET:
- Функциональные роутеры: Для маршрутизации без контроллеров.
Пример:
- WebClient: реактивный клиент для исходящих запросов, интегрируется seamlessly.
Ещё фишки: поддержка WebSockets для bidirectional стриминга, Server-Sent Events для push-уведомлений, интеграция с Schedulers для распределения задач.
Практические советы и подводные камни
- Миграция: Начните с аннотированных контроллеров — синтаксис похож на MVC, но возвращайте Mono/Flux.
- Тестирование: WebTestClient вместо MockMvc — асинхронные тесты с StepVerifier (из Reactor).
- Камень: Блокирующий код в контроллерах (JDBC, sleep) сломает асинхронность — используйте reactive драйверы (R2DBC) и publishOn(Schedulers.boundedElastic()).
- Производительность: Под нагрузкой мониторьте с Micrometer — WebFlux даёт метрики из коробки.
В реальной жизни: Netflix использует похожий стек для стриминга, где WebFlux обрабатывает миллионы подключений.
#Java #middle #Reactor #WebFlux
Введение в Spring WebFlux
Spring WebFlux — это часть Spring Framework, предназначенная для создания неблокирующих, асинхронных веб-приложений. Он основан на Project Reactor и Reactive Streams API, что позволяет использовать Mono и Flux прямо в коде сервера. Это не замена Spring MVC (традиционному веб-фреймворку), а альтернатива для сценариев с высокой нагрузкой: микросервисы, реальное время, стриминг. Почему это новый подход? В традиционном веб (как Servlet с потоками на запрос) под пиковой нагрузкой сервер исчерпывает ресурсы — каждый запрос занимает поток, который висит в ожидании IO. WebFlux меняет это: использует event-loop (цикл обработки событий), где один поток обслуживает тысячи подключений, реагируя на события асинхронно. Это экономит CPU и память, делая приложения более отзывчивыми и масштабируемыми.
Почему Spring WebFlux: связь с реактивным мышлением
WebFlux воплощает принципы, которые мы разбирали: push-модель, где данные отправляются по готовности; обратное давление для контроля темпа; операторы для трансформаций. В экосистеме Spring он интегрируется с другими компонентами: Spring Boot для быстрого старта, Spring Data Reactive для баз данных (как R2DBC), Spring Security Reactive для безопасности. Ключевой выигрыш — для IO-bound задач: запросы к БД, API или файлам идут асинхронно, без блокировок. Если ваш сервис ждёт внешних ответов 90% времени, WebFlux высвобождает ресурсы, позволяя обрабатывать в 10-100 раз больше запросов на том же железе.
Чтобы начать: добавьте spring-boot-starter-webflux в зависимости (Maven/Gradle). Spring Boot автоматически настроит Reactor Netty как сервер (неблокирующий HTTP на базе Netty). Нет нужды в Tomcat — всё реактивно.
Ключевые компоненты Spring WebFlux
WebFlux предлагает два стиля разработки: аннотированный (похож на Spring MVC) и функциональный (роутеры как в Express.js). Оба используют Mono/Flux для ответов.
- Аннотированные контроллеры: @RestController с методами, возвращающими Mono или Flux.
Пример простого GET:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class HelloController {
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.just("Привет из WebFlux!"); // Асинхронный ответ
}
}
Здесь метод возвращает Mono — сервер не блокируется, ответ "течёт" асинхронно. Для Flux: стриминг данных, как SSE (сервер-сент события).
- Функциональные роутеры: Для маршрутизации без контроллеров.
Пример:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> routes() {
return RouterFunctions.route()
.GET("/hello", request -> ServerResponse.ok().bodyValue("Привет из роутера!"))
.build();
}
}
Это декларативно: опишите маршруты, обработчики возвращают ServerResponse с Mono/Flux.
- WebClient: реактивный клиент для исходящих запросов, интегрируется seamlessly.
Ещё фишки: поддержка WebSockets для bidirectional стриминга, Server-Sent Events для push-уведомлений, интеграция с Schedulers для распределения задач.
Практические советы и подводные камни
- Миграция: Начните с аннотированных контроллеров — синтаксис похож на MVC, но возвращайте Mono/Flux.
- Тестирование: WebTestClient вместо MockMvc — асинхронные тесты с StepVerifier (из Reactor).
- Камень: Блокирующий код в контроллерах (JDBC, sleep) сломает асинхронность — используйте reactive драйверы (R2DBC) и publishOn(Schedulers.boundedElastic()).
- Производительность: Под нагрузкой мониторьте с Micrometer — WebFlux даёт метрики из коробки.
В реальной жизни: Netflix использует похожий стек для стриминга, где WebFlux обрабатывает миллионы подключений.
#Java #middle #Reactor #WebFlux
👍3
Реактивное программирование
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
В контроллере:
Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
#Java #middle #Reactor #WebFlux #Mono #Flux
Простой REST-контроллер с Mono и Flux в Spring WebFlux
Spring WebFlux работает на Spring Boot — это упрощает запуск.
Создайте новый проект в IDE (IntelliJ, Eclipse) или через spring.io/initializr с зависимостями:
- Spring Reactive Web (для WebFlux).
- Spring Boot Starter WebFlux.
В pom.xml (Maven) это выглядит так:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Для Gradle: implementation 'org.springframework.boot:spring-boot-starter-webflux'
Запустите приложение с @SpringBootApplication в главном классе. По умолчанию сервер — Reactor Netty (неблокирующий HTTP-сервер), порт 8080. Нет нужды в Tomcat — всё асинхронно. Если у вас уже есть Spring MVC, исключите starter-web, чтобы избежать конфликтов.
Теперь к контроллерам: в WebFlux они аннотированы @RestController (как в MVC), но методы возвращают Mono или Flux. Это значит: контроллер не блокирует поток — он возвращает "реактивный объект", а фреймворк сам подпишется и отправит ответ по мере готовности. Это решает боли блокировок: пока данные готовятся (например, запрос в БД), поток свободен для других запросов.
Mono в контроллере: для одиночных ответов
Mono — это поток для нуля или одного элемента, идеален для типичных REST-операций: GET одного ресурса, POST с созданием, DELETE с подтверждением. В контроллере метод возвращает Mono<T>, где T — ваш DTO или простая строка. Фреймворк автоматически сериализует его в JSON (с Jackson) и отправляет, когда элемент готов.
Простой пример: контроллер для приветствия.
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class GreetingController {
@GetMapping("/greet/{name}")
public Mono<String> greet(@PathVariable String name) {
return Mono.just("Привет, " + name + "! Добро пожаловать в реактивный мир.");
}
}
Здесь @GetMapping — аннотация для маршрута, @PathVariable — параметр из URL. Метод возвращает Mono.just — простой синхронный элемент. Но под капотом: WebFlux подпишется на Mono, и когда значение готово (сразу), отправит HTTP 200 с телом. Если бы внутри был асинхронный вызов (например, Mono.delay(Duration.ofSeconds(2)).map(ignore -> "Задержанный привет")), сервер не заблокируется — запрос обработается асинхронно.
Почему Mono лучше Object в MVC? В MVC метод ждёт выполнения (блокирует поток), здесь — нет. Под нагрузкой: тысячи /greet запросов — WebFlux использует event-loop (один поток на все), Reactor распределяет через Schedulers.
Если внутри Mono — запрос к сервису:
@Service
public class UserService {
public Mono<User> findUserById(Long id) {
return Mono.fromCallable(() -> /* симуляция БД */ new User(id, "Имя")); // Асинхронно
}
}
В контроллере:
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findUserById(id);
}
Это цепочка: контроллер возвращает Mono от сервиса, фреймворк ждёт готовности без блокировки. Если ошибка в сервисе — onError преобразуется в HTTP 500, но вы можете настроить с @ExceptionHandler.
Расширим: обработка параметров запроса (@RequestParam) и тела (@RequestBody).
Для POST:
@PostMapping("/create-user")
public Mono<User> createUser(@RequestBody Mono<UserRequest> requestMono) {
return requestMono.flatMap(req -> {
// Асинхронная логика: создать пользователя
return userService.createUser(req.getName()).map(saved -> new User(saved.getId(), saved.getName()));
});
}
Здесь @RequestBody — Mono, потому что тело может приходить асинхронно (большие данные). flatMap — для цепочки (из поста 8), чтобы "развернуть" подпоток.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍1
Flux в контроллере: для коллекций и стриминга
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
Расширим: пагинация с @RequestParam.
Для POST с Flux телом (batch-операции):
Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
Flux — для нуля, одного или многих элементов, подходит для GET списков, пагинации или реального времени (стриминг событий). Возвращайте Flux<T>, и WebFlux отправит ответ как JSON-массив (для конечного) или text/event-stream для SSE (сервер-сент событий).
Простой пример: список элементов.
@GetMapping("/users")
public Flux<User> getAllUsers() {
return Flux.fromIterable(List.of(new User(1L, "Алиса"), new User(2L, "Боб")));
}
Фреймворк соберёт Flux в JSON-массив и отправит одним ответом.
Но если Flux асинхронный:
public Flux<User> findAllUsers() {
return Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1)) // Симуляция задержки
.map(i -> new User((long) i, "Пользователь " + i));
}
Ответ будет стриминговым: клиенту придут данные по мере готовности (chunked transfer).
Для явного SSE добавьте produces = MediaType.TEXT_EVENT_STREAM_VALUE:
@GetMapping(value = "/stream-users", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return findAllUsers();
}
Теперь ответ — SSE: каждый элемент как event: data: {"id":1,"name":"Пользователь 1"}\n\n. Клиент (браузер или WebClient) может реагировать на каждый по отдельности, без ожидания всего. Это решает боли callback-ада: вместо polling (опроса сервера), сервер толкает данные (push из поста 3).
Расширим: пагинация с @RequestParam.
@GetMapping("/users-paged")
public Flux<User> getPagedUsers(@RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "10") int size) {
return userService.findPaged(page, size); // Сервис возвращает Flux<User>
}
Для POST с Flux телом (batch-операции):
@PostMapping("/batch-users")
public Flux<User> createBatch(@RequestBody Flux<UserRequest> requestsFlux) {
return requestsFlux.flatMap(req -> userService.createUser(req.getName()));
}
flatMap — для параллельной обработки, возвращает Flux сохранённых пользователей.
Обработка ошибок и валидация в контроллерах
Ошибки — часть жизни: в WebFlux используйте @ExceptionHandler в контроллере или глобально (@ControllerAdvice).
Пример:
@ExceptionHandler(UserNotFoundException.class)
public Mono<ResponseEntity<String>> handleNotFound(UserNotFoundException ex) {
return Mono.just(ResponseEntity.notFound().build());
}
Для валидации: @Validated на контроллере, @Valid на @RequestBody. Ошибки валидации — WebExchangeBindException, обработайте в handler.
Глобально: @ControllerAdvice с @ExceptionHandler для централизованной обработки. Это интегрируется с onErrorResume/retry: в сервисе добавьте, и контроллер получит восстановленный поток.
Практические советы и подводные камни
- Тестирование: Используйте WebTestClient — webTestClient.get().uri("/greet/Name").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Привет, Name!");
- Schedulers: Если в сервисе blocking код, добавьте .publishOn(Schedulers.boundedElastic()) перед возвратом в контроллер.
- Проблема: Возврат Flux без produces = TEXT_EVENT_STREAM — соберётся в массив, но под большой нагрузкой рискуете буфером; для стриминга укажите media type.
- Оптимизация: Для больших ответов используйте Flux<DataBuffer> — стриминг байтов без сериализации в память.
- Интеграция: С Spring Security Reactive — фильтры асинхронны, не блокируют.
#Java #middle #Reactor #WebFlux #Mono #Flux
👍1