Реактивное программирование
Что такое потоки данных в реактивном мире?
В реактивном программировании данные — это не статичный объект, который вы запрашиваете и ждёте. Это динамичный поток: последовательность элементов (событий), которые могут приходить в любое время, в любом количестве. Поток может быть бесконечным (как лента новостей) или конечным (как результаты поиска). Главное — обработка идёт реактивно: программа "подписывается" на поток и реагирует на каждый элемент по мере его появления, без блокировок.
Это решает боли из предыдущего поста: вместо выделения потока на ожидание, мы используем неблокирующий механизм. Если данных нет — ничего не происходит, ресурсы свободны. Когда данные приходят — срабатывают реакции. Это как подписка на уведомления: телефон не висит в ожидании, а просто пиликает при новом сообщении.
В основе лежит спецификация Reactive Streams — стандарт, который определяет, как строить такие потоки.
Он включает четыре ключевых интерфейса:
Издатель (Publisher): источник данных. Он "публикует" элементы потока. Например, это может быть база данных, генерирующая записи по запросу.
Подписчик (Subscriber): получатель данных. Он "подписывается" на издателя и реагирует на элементы: onNext (получил элемент), onError (ошибка), onComplete (поток завершён).
Подписка (Subscription): связь между издателем и подписчиком. Через неё подписчик может запросить больше данных (request(n)) или отменить подписку (cancel). Это вводит "обратное давление" — механизм, чтобы подписчик не захлёбывался данными, если не успевает их обрабатывать.
Процессор (Processor): комбинация издателя и подписчика, для промежуточной обработки (как фильтр в конвейере).
Эти интерфейсы — основа. Они обеспечивают асинхронность без блокировок: всё работает на основе событий, а не ожиданий.
События как река: метафора и практика
Представьте реку событий: вода (данные) течёт непрерывно, иногда бурно (пиковая нагрузка), иногда спокойно. Ваша программа — не плотина, которая блокирует поток, а турбина, которая генерирует энергию по мере течения. Если река слишком быстрая — турбина сигнализирует "замедлить" (обратное давление), чтобы не перегрузиться.
В Project Reactor это реализовано через два типа потоков:
Mono: поток для нуля или одного элемента. Идеален для одиночных операций, как HTTP-запрос, возвращающий один ответ.
Пример:
Flux: поток для нуля, одного или многих элементов, возможно бесконечных. Для последовательностей, как стриминг данных.
Пример:
Flux "выдаёт" элементы по одному: 1, 2, 3... Это как река: элементы приходят последовательно, но асинхронно.
Почему это лучше традиционных подходов?
В CompletableFuture вы строите цепочки, но рискуете блокировками внутри.
В реактивном стиле всё неблокирующее: используйте операторы вроде map (преобразовать элемент), filter (отфильтровать), flatMap (развернуть в подпотоки).
Пример цепочки:
#Java #middle #Reactor #data_stream
Что такое потоки данных в реактивном мире?
В реактивном программировании данные — это не статичный объект, который вы запрашиваете и ждёте. Это динамичный поток: последовательность элементов (событий), которые могут приходить в любое время, в любом количестве. Поток может быть бесконечным (как лента новостей) или конечным (как результаты поиска). Главное — обработка идёт реактивно: программа "подписывается" на поток и реагирует на каждый элемент по мере его появления, без блокировок.
Это решает боли из предыдущего поста: вместо выделения потока на ожидание, мы используем неблокирующий механизм. Если данных нет — ничего не происходит, ресурсы свободны. Когда данные приходят — срабатывают реакции. Это как подписка на уведомления: телефон не висит в ожидании, а просто пиликает при новом сообщении.
В основе лежит спецификация Reactive Streams — стандарт, который определяет, как строить такие потоки.
Он включает четыре ключевых интерфейса:
Издатель (Publisher): источник данных. Он "публикует" элементы потока. Например, это может быть база данных, генерирующая записи по запросу.
Подписчик (Subscriber): получатель данных. Он "подписывается" на издателя и реагирует на элементы: onNext (получил элемент), onError (ошибка), onComplete (поток завершён).
Подписка (Subscription): связь между издателем и подписчиком. Через неё подписчик может запросить больше данных (request(n)) или отменить подписку (cancel). Это вводит "обратное давление" — механизм, чтобы подписчик не захлёбывался данными, если не успевает их обрабатывать.
Процессор (Processor): комбинация издателя и подписчика, для промежуточной обработки (как фильтр в конвейере).
Эти интерфейсы — основа. Они обеспечивают асинхронность без блокировок: всё работает на основе событий, а не ожиданий.
События как река: метафора и практика
Представьте реку событий: вода (данные) течёт непрерывно, иногда бурно (пиковая нагрузка), иногда спокойно. Ваша программа — не плотина, которая блокирует поток, а турбина, которая генерирует энергию по мере течения. Если река слишком быстрая — турбина сигнализирует "замедлить" (обратное давление), чтобы не перегрузиться.
В Project Reactor это реализовано через два типа потоков:
Mono: поток для нуля или одного элемента. Идеален для одиночных операций, как HTTP-запрос, возвращающий один ответ.
Пример:
Mono<string> mono = Mono.just("Привет из реки"); // Создаём простой Mono с одним элементом
mono.subscribe(
value -> System.out.println("Получено: " + value), // onNext: реакция на элемент
error -> System.err.println("Ошибка: " + error), // onError
() -> System.out.println("Завершено") // onComplete
);
Здесь subscribe — это подписка. Mono "течёт" асинхронно: если элемент готов — срабатывает onNext, потом onComplete. Нет блокировок: код continue после subscribe.
Flux: поток для нуля, одного или многих элементов, возможно бесконечных. Для последовательностей, как стриминг данных.
Пример:
Flux<integer> flux = Flux.range(1, 5); // Поток чисел от 1 до 5
flux.subscribe(
value -> System.out.println("Элемент: " + value),
error -> System.err.println("Ошибка: " + error),
() -> System.out.println("Поток завершён")
);
Flux "выдаёт" элементы по одному: 1, 2, 3... Это как река: элементы приходят последовательно, но асинхронно.
Почему это лучше традиционных подходов?
В CompletableFuture вы строите цепочки, но рискуете блокировками внутри.
В реактивном стиле всё неблокирующее: используйте операторы вроде map (преобразовать элемент), filter (отфильтровать), flatMap (развернуть в подпотоки).
Пример цепочки:
Flux.fromIterable(List.of("яблоко", "банан", "вишня"))
.filter(fruit -> fruit.startsWith("б")) // Фильтруем
.map(String::toUpperCase) // Преобразуем
.subscribe(System.out::println); // Подписываемся
Результат: "БАНАН". Всё течёт как конвейер, без callback-ада: код читаем, как последовательный, но работает асинхронно.
#Java #middle #Reactor #data_stream
👍3
Обратное давление: контроль
Одна из ключевых фишек — backpressure (обратное давление). В традиционных системах, если производитель данных быстрее потребителя, буфер переполняется, и система падает. В Reactive Streams подписчик через Subscription.request(n) говорит: "Дай мне n элементов". Издатель выдаёт ровно столько, сколько запрошено. Это как шлюзы на реке: предотвращают наводнение.
Пример в Flux:
Подписчик контролирует темп, избегая перегрузки.
Почему это новый подход, который нам нужен?
Реактивное программирование не просто добавляет инструменты — оно меняет мышление. Вместо "императивного" кода (делай то, жди это), мы пишем "декларативно": опиши, как реагировать на поток. Это масштабируется: на сервере с 4 ядрами можно обрабатывать тысячи подключений, потому что нет блокирующих потоков. Библиотеки вроде Reactor интегрируются с неблокирующими драйверами (например, reactive JDBC или WebFlux для веб), решая боли блокировок.
#Java #middle #Reactor #data_stream
Одна из ключевых фишек — backpressure (обратное давление). В традиционных системах, если производитель данных быстрее потребителя, буфер переполняется, и система падает. В Reactive Streams подписчик через Subscription.request(n) говорит: "Дай мне n элементов". Издатель выдаёт ровно столько, сколько запрошено. Это как шлюзы на реке: предотвращают наводнение.
Пример в Flux:
Flux<integer> fastFlux = Flux.range(1, 1000); // Быстрый источник
fastFlux.subscribe(new BaseSubscriber<integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // Запрашиваем только 10 элементов сначала
}
@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
if (value % 10 == 0) request(10); // Запрашиваем ещё по мере обработки
}
});
Подписчик контролирует темп, избегая перегрузки.
Почему это новый подход, который нам нужен?
Реактивное программирование не просто добавляет инструменты — оно меняет мышление. Вместо "императивного" кода (делай то, жди это), мы пишем "декларативно": опиши, как реагировать на поток. Это масштабируется: на сервере с 4 ядрами можно обрабатывать тысячи подключений, потому что нет блокирующих потоков. Библиотеки вроде Reactor интегрируются с неблокирующими драйверами (например, reactive JDBC или WebFlux для веб), решая боли блокировок.
#Java #middle #Reactor #data_stream
👍4