Java for Beginner
743 subscribers
708 photos
196 videos
12 files
1.14K links
Канал от новичков для новичков!
Изучайте Java вместе с нами!
Здесь мы обмениваемся опытом и постоянно изучаем что-то новое!

Наш YouTube канал - https://www.youtube.com/@Java_Beginner-Dev

Наш канал на RUTube - https://rutube.ru/channel/37896292/
Download Telegram
Реактивное программирование

Что такое потоки данных в реактивном мире?

В реактивном программировании данные — это не статичный объект, который вы запрашиваете и ждёте. Это динамичный поток: последовательность элементов (событий), которые могут приходить в любое время, в любом количестве. Поток может быть бесконечным (как лента новостей) или конечным (как результаты поиска). Главное — обработка идёт реактивно: программа "подписывается" на поток и реагирует на каждый элемент по мере его появления, без блокировок.

Это решает боли из предыдущего поста: вместо выделения потока на ожидание, мы используем неблокирующий механизм. Если данных нет — ничего не происходит, ресурсы свободны. Когда данные приходят — срабатывают реакции. Это как подписка на уведомления: телефон не висит в ожидании, а просто пиликает при новом сообщении.

В основе лежит спецификация Reactive Streams — стандарт, который определяет, как строить такие потоки.

Он включает четыре ключевых интерфейса:
Издатель (Publisher): источник данных. Он "публикует" элементы потока. Например, это может быть база данных, генерирующая записи по запросу.
Подписчик (Subscriber): получатель данных. Он "подписывается" на издателя и реагирует на элементы: onNext (получил элемент), onError (ошибка), onComplete (поток завершён).
Подписка (Subscription): связь между издателем и подписчиком. Через неё подписчик может запросить больше данных (request(n)) или отменить подписку (cancel). Это вводит "обратное давление" — механизм, чтобы подписчик не захлёбывался данными, если не успевает их обрабатывать.
Процессор (Processor): комбинация издателя и подписчика, для промежуточной обработки (как фильтр в конвейере).


Эти интерфейсы — основа. Они обеспечивают асинхронность без блокировок: всё работает на основе событий, а не ожиданий.


События как река: метафора и практика

Представьте реку событий: вода (данные) течёт непрерывно, иногда бурно (пиковая нагрузка), иногда спокойно. Ваша программа — не плотина, которая блокирует поток, а турбина, которая генерирует энергию по мере течения. Если река слишком быстрая — турбина сигнализирует "замедлить" (обратное давление), чтобы не перегрузиться.

В Project Reactor это реализовано через два типа потоков:
Mono: поток для нуля или одного элемента. Идеален для одиночных операций, как HTTP-запрос, возвращающий один ответ.

Пример:
Mono<string> mono = Mono.just("Привет из реки");  // Создаём простой Mono с одним элементом
mono.subscribe(
value -> System.out.println("Получено: " + value), // onNext: реакция на элемент
error -> System.err.println("Ошибка: " + error), // onError
() -> System.out.println("Завершено") // onComplete
);

Здесь subscribe — это подписка. Mono "течёт" асинхронно: если элемент готов — срабатывает onNext, потом onComplete. Нет блокировок: код continue после subscribe.


Flux: поток для нуля, одного или многих элементов, возможно бесконечных. Для последовательностей, как стриминг данных.

Пример:
Flux<integer> flux = Flux.range(1, 5);   // Поток чисел от 1 до 5
flux.subscribe(
value -> System.out.println("Элемент: " + value),
error -> System.err.println("Ошибка: " + error),
() -> System.out.println("Поток завершён")
);


Flux "выдаёт" элементы по одному: 1, 2, 3... Это как река: элементы приходят последовательно, но асинхронно.

Почему это лучше традиционных подходов?
В CompletableFuture вы строите цепочки, но рискуете блокировками внутри.
В реактивном стиле всё неблокирующее: используйте операторы вроде map (преобразовать элемент), filter (отфильтровать), flatMap (развернуть в подпотоки).


Пример цепочки:
Flux.fromIterable(List.of("яблоко", "банан", "вишня"))
.filter(fruit -> fruit.startsWith("б")) // Фильтруем
.map(String::toUpperCase) // Преобразуем
.subscribe(System.out::println); // Подписываемся

Результат: "БАНАН". Всё течёт как конвейер, без callback-ада: код читаем, как последовательный, но работает асинхронно.



#Java #middle #Reactor #data_stream
👍3
Обратное давление: контроль

Одна из ключевых фишек — backpressure (обратное давление). В традиционных системах, если производитель данных быстрее потребителя, буфер переполняется, и система падает. В Reactive
Streams подписчик через Subscription.request(n) говорит: "Дай мне n элементов". Издатель выдаёт ровно столько, сколько запрошено. Это как шлюзы на реке: предотвращают наводнение.

Пример в Flux:
Flux<integer> fastFlux = Flux.range(1, 1000); // Быстрый источник
fastFlux.subscribe(new BaseSubscriber<integer>() {

@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // Запрашиваем только 10 элементов сначала
}

@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
if (value % 10 == 0) request(10); // Запрашиваем ещё по мере обработки
}
});


Подписчик контролирует темп, избегая перегрузки.


Почему это новый подход, который нам нужен?

Реактивное программирование не просто добавляет инструменты — оно меняет мышление. Вместо "императивного" кода (делай то, жди это), мы пишем "декларативно": опиши, как реагировать на поток. Это масштабируется: на сервере с 4 ядрами можно обрабатывать тысячи подключений, потому что нет блокирующих потоков. Библиотеки вроде Reactor интегрируются с неблокирующими драйверами (например, reactive JDBC или WebFlux для веб), решая боли блокировок.


#Java #middle #Reactor #data_stream
👍4