Реактивное программирование
Концепции реактивного программирования: Reactive Streams API — Publisher и Subscriber
Сегодня разберём Reactive Streams API — это спецификация, которая лежит в сердце реактивного программирования на Java. Она не просто набор интерфейсов, а рамка для построения асинхронных потоков данных с контролем над ними. Представьте это как правила дорожного движения для потока событий: без них — хаос и пробки, с ними — плавный поток.
Reactive Streams API решает ключевые проблемы из первого поста: вместо тяжёлых потоков и callback-ада, мы получаем унифицированный способ обмена данными между компонентами. Это не библиотека, а интерфейсы, которые реализуют фреймворки вроде Project Reactor или RxJava. Они обеспечивают совместимость: один компонент от Reactor может работать с другим от Akka Streams.
Что такое Reactive Streams API
Спецификация Reactive Streams появилась в 2015 году как ответ на хаос асинхронности. До неё каждый фреймворк изобретал велосипед: свои способы обработки потоков, ошибок и давления.
API стандартизирует это: четыре интерфейса (Publisher, Subscriber, Subscription, Processor), которые описывают, как данные текут асинхронно. Главная идея — неблокирующая коммуникация: ничего не ждём синхронно, всё через события.
Это подводит нас ближе к реактивному мышлению: вместо "запроси и жди" (как в Future.get()), мы "подпишись и реагируй". Под нагрузкой это экономит ресурсы: один поток (event-loop) может обслуживать тысячи подписок, без создания новых на каждую задачу. В итоге, системы становятся более отзывчивыми и масштабируемыми — идеально для микросервисов или реального времени.
Publisher: источник данных, который отправляет события
Publisher — это интерфейс для издателя, который генерирует поток данных.
Он как фабрика событий: производит элементы (любые объекты), ошибки или сигнал завершения. Publisher не знает, кто его слушает, — он просто готов отправлять (push) данные, когда они появятся.
Интерфейс прост: всего один метод subscribe(Subscriber s). Когда вы вызываете его, издатель регистрирует подписчика и начинает процесс. Но данные не льются сразу — всё под контролем (об этом ниже).
Пример простого издателя в Project Reactor (который реализует Reactive Streams):
Здесь publisher — источник.
Он может быть асинхронным: например, читать из сети или БД. Когда данные готовы, он отправляет их подписчику через onNext(элемент).
Если ошибка — onError(Throwable).
Если конец — onComplete().
Почему это лучше традиционных подходов? В отличие от потоков (где каждый запрос — отдельный тяжёлый объект), publisher лёгкий: он не выделяет ресурсы заранее, а реагирует на подписку. Нет блокировок: если данных нет, ничего не происходит.
#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber #Publisher
Концепции реактивного программирования: Reactive Streams API — Publisher и Subscriber
Сегодня разберём Reactive Streams API — это спецификация, которая лежит в сердце реактивного программирования на Java. Она не просто набор интерфейсов, а рамка для построения асинхронных потоков данных с контролем над ними. Представьте это как правила дорожного движения для потока событий: без них — хаос и пробки, с ними — плавный поток.
Reactive Streams API решает ключевые проблемы из первого поста: вместо тяжёлых потоков и callback-ада, мы получаем унифицированный способ обмена данными между компонентами. Это не библиотека, а интерфейсы, которые реализуют фреймворки вроде Project Reactor или RxJava. Они обеспечивают совместимость: один компонент от Reactor может работать с другим от Akka Streams.
Что такое Reactive Streams API
Спецификация Reactive Streams появилась в 2015 году как ответ на хаос асинхронности. До неё каждый фреймворк изобретал велосипед: свои способы обработки потоков, ошибок и давления.
API стандартизирует это: четыре интерфейса (Publisher, Subscriber, Subscription, Processor), которые описывают, как данные текут асинхронно. Главная идея — неблокирующая коммуникация: ничего не ждём синхронно, всё через события.
Это подводит нас ближе к реактивному мышлению: вместо "запроси и жди" (как в Future.get()), мы "подпишись и реагируй". Под нагрузкой это экономит ресурсы: один поток (event-loop) может обслуживать тысячи подписок, без создания новых на каждую задачу. В итоге, системы становятся более отзывчивыми и масштабируемыми — идеально для микросервисов или реального времени.
Publisher: источник данных, который отправляет события
Publisher — это интерфейс для издателя, который генерирует поток данных.
Он как фабрика событий: производит элементы (любые объекты), ошибки или сигнал завершения. Publisher не знает, кто его слушает, — он просто готов отправлять (push) данные, когда они появятся.
Интерфейс прост: всего один метод subscribe(Subscriber s). Когда вы вызываете его, издатель регистрирует подписчика и начинает процесс. Но данные не льются сразу — всё под контролем (об этом ниже).
Пример простого издателя в Project Reactor (который реализует Reactive Streams):
import reactor.core.publisher.Flux; // Flux реализует Publisher
Flux<Integer> publisher = Flux.range(1, 5); // Издатель: поток от 1 до 5
Здесь publisher — источник.
Он может быть асинхронным: например, читать из сети или БД. Когда данные готовы, он отправляет их подписчику через onNext(элемент).
Если ошибка — onError(Throwable).
Если конец — onComplete().
Почему это лучше традиционных подходов? В отличие от потоков (где каждый запрос — отдельный тяжёлый объект), publisher лёгкий: он не выделяет ресурсы заранее, а реагирует на подписку. Нет блокировок: если данных нет, ничего не происходит.
#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber #Publisher
👍1
Subscriber: получатель, который реагирует на события
Subscriber — интерфейс для подписчика, который "слушает" издателя. Он как потребитель на конвейере: получает элементы и решает, что с ними делать.
Методы:
- onSubscribe(Subscription s): вызывается сразу после подписки. Здесь подписчик получает подписку — объект для контроля.
- onNext(T item): срабатывает на каждый элемент. Здесь обработка: логика, трансформация и т.д.
- onError(Throwable t): если ошибка — обработка исключения.
- onComplete(): поток завершён успешно.
Подписчик пассивен: не получает данные (pull), а ждёт push. Это решает callback-ад из CompletableFuture — реакции в одном месте, код чище.
Пример подписки:
Здесь subscribe() связывает publisher и subscriber. Данные отправляются по одному, по запросу (request(1)). Это асинхронно: код после subscribe() продолжается сразу, без ожидания.
В Reactor есть BaseSubscriber для упрощения — переопределяйте только нужные методы.
Subscription: мост контроля с обратным давлением
Subscription — ключ к управлению: это объект, который подписчик получает в onSubscribe.
Методы:
- request(long n): "Дай мне n элементов". Это обратное давление (backpressure) — подписчик контролирует темп, чтобы не захлебнуться.
- cancel(): "Хватит, отпишись".
Без этого publisher мог бы зафлудить подписчика данными. Например, если источник — бесконечный поток (сенсоры), без request(n) — переполнение памяти.
В примере выше request(1) делает обработку последовательной: получил — обработал — запросил следующий. Для скорости — request(Long.MAX_VALUE) (неограниченно), но осторожно: рискуете буфером.
Это решает проблемы блокировок: вместо висящих потоков, всё в event-loop. Под нагрузкой — graceful degradation (грациозная деградация): если подписчик медленный, publisher замедляется, а не падает.
Processor: промежуточный звено для трансформаций
Processor — комбо: реализует и Publisher, и Subscriber. Он как фильтр в конвейере: принимает данные от одного издателя, обрабатывает и отправляет дальше.
Пример — в цепочках Flux: map() или filter() создают процессоры внутри.
В практике вы редко пишете свой Processor — библиотеки предоставляют готовые операторы. Но понимание помогает: весь конвейер — цепь publisher → processor → ... → subscriber.
Практические советы и подводные камни
- Всегда управляйте подпиской: без request() данные не потекут (по умолчанию unbounded — неограниченно, но лучше явно).
- Обрабатывайте ошибки: onError — ваш спасатель, чтобы не потерять исключения.
- Тестируйте с TestSubscriber (в Reactor): симулируйте сценарии.
- Камень: если в onNext блокирующий код — сломаете асинхронность. Используйте Schedulers для offload (перенос на другой поток).
В реальном коде: в Spring WebFlux контроллеры возвращают Flux/Mono — publisher'ы, клиенты подписываются.
#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber
Subscriber — интерфейс для подписчика, который "слушает" издателя. Он как потребитель на конвейере: получает элементы и решает, что с ними делать.
Методы:
- onSubscribe(Subscription s): вызывается сразу после подписки. Здесь подписчик получает подписку — объект для контроля.
- onNext(T item): срабатывает на каждый элемент. Здесь обработка: логика, трансформация и т.д.
- onError(Throwable t): если ошибка — обработка исключения.
- onComplete(): поток завершён успешно.
Подписчик пассивен: не получает данные (pull), а ждёт push. Это решает callback-ад из CompletableFuture — реакции в одном месте, код чище.
Пример подписки:
publisher.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1); // Запрашиваем первый элемент
}
@Override
public void onNext(Integer item) {
System.out.println("Получено: " + item);
subscription.request(1); // Запрашиваем следующий
}
@Override
public void onError(Throwable t) {
System.err.println("Ошибка: " + t);
}
@Override
public void onComplete() {
System.out.println("Завершено");
}
});
Здесь subscribe() связывает publisher и subscriber. Данные отправляются по одному, по запросу (request(1)). Это асинхронно: код после subscribe() продолжается сразу, без ожидания.
В Reactor есть BaseSubscriber для упрощения — переопределяйте только нужные методы.
Subscription: мост контроля с обратным давлением
Subscription — ключ к управлению: это объект, который подписчик получает в onSubscribe.
Методы:
- request(long n): "Дай мне n элементов". Это обратное давление (backpressure) — подписчик контролирует темп, чтобы не захлебнуться.
- cancel(): "Хватит, отпишись".
Без этого publisher мог бы зафлудить подписчика данными. Например, если источник — бесконечный поток (сенсоры), без request(n) — переполнение памяти.
В примере выше request(1) делает обработку последовательной: получил — обработал — запросил следующий. Для скорости — request(Long.MAX_VALUE) (неограниченно), но осторожно: рискуете буфером.
Это решает проблемы блокировок: вместо висящих потоков, всё в event-loop. Под нагрузкой — graceful degradation (грациозная деградация): если подписчик медленный, publisher замедляется, а не падает.
Processor: промежуточный звено для трансформаций
Processor — комбо: реализует и Publisher, и Subscriber. Он как фильтр в конвейере: принимает данные от одного издателя, обрабатывает и отправляет дальше.
Пример — в цепочках Flux: map() или filter() создают процессоры внутри.
В практике вы редко пишете свой Processor — библиотеки предоставляют готовые операторы. Но понимание помогает: весь конвейер — цепь publisher → processor → ... → subscriber.
Практические советы и подводные камни
- Всегда управляйте подпиской: без request() данные не потекут (по умолчанию unbounded — неограниченно, но лучше явно).
- Обрабатывайте ошибки: onError — ваш спасатель, чтобы не потерять исключения.
- Тестируйте с TestSubscriber (в Reactor): симулируйте сценарии.
- Камень: если в onNext блокирующий код — сломаете асинхронность. Используйте Schedulers для offload (перенос на другой поток).
В реальном коде: в Spring WebFlux контроллеры возвращают Flux/Mono — publisher'ы, клиенты подписываются.
#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber
👍1