Финальный docker-compose.yaml: Java + Kafka (KRaft) + PostgreSQL (не тестировался, возможно содержит ошибки, написан для визуализации)
Ключевые решения
1. KRaft вместо ZooKeeper:
- KAFKA_CFG_PROCESS_ROLES: "broker,controller" — единый процесс для метаданных (упрощает настройку).
- Важно: KAFKA_CFG_ADVERTISED_LISTENERS должен указывать на имя сервиса (kafka), а не на localhost.
2. Healthcheck для всех сервисов:
- Для PostgreSQL: pg_isready проверяет готовность принимать подключения.
- Для Kafka: kafka-broker-api-versions.sh убеждается, что брокер принимает запросы.
- Почему это критично: depends_on без healthcheck не предотвращает race condition.
3. Лимиты ресурсов:
- deploy.resources.limits — ограничивает использование CPU/memory через cgroups.
- Без этого JVM может выделить память, превышающую лимит контейнера (падение с OutOfMemoryError).
4. Сеть:
- Все сервисы в одной сети app_net — общаются по именам (db, kafka).
- Встроенный DNS Docker резолвит имена в IP-адреса контейнеров.
#Java #middle #Docker
version: '3.8'
services:
app:
image: myapp:latest
ports:
- "8080:8080"
environment:
DB_URL: jdbc:postgresql://db:5432/mydb
DB_PASSWORD: ${DB_PASSWORD}
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
depends_on:
db:
condition: service_healthy
kafka:
condition: service_healthy
networks:
- app_net
deploy:
resources:
limits:
cpus: '1.5'
memory: 512M
db:
image: postgres:15
environment:
POSTGRES_DB: mydb
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- pg_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 10
networks:
- app_net
deploy:
resources:
limits:
memory: 256M
kafka:
image: bitnami/kafka:3.5.1
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: "broker,controller"
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
volumes:
- kafka_data:/bitnami/kafka
healthcheck:
test: ["CMD", "kafka-broker-api-versions.sh", "--bootstrap-server", "localhost:9092"]
interval: 10s
timeout: 10s
retries: 20
networks:
- app_net
deploy:
resources:
limits:
memory: 512M
volumes:
pg_data:
driver: local
kafka_data:
driver: local
networks:
app_net:
driver: bridge
Ключевые решения
1. KRaft вместо ZooKeeper:
- KAFKA_CFG_PROCESS_ROLES: "broker,controller" — единый процесс для метаданных (упрощает настройку).
- Важно: KAFKA_CFG_ADVERTISED_LISTENERS должен указывать на имя сервиса (kafka), а не на localhost.
2. Healthcheck для всех сервисов:
- Для PostgreSQL: pg_isready проверяет готовность принимать подключения.
- Для Kafka: kafka-broker-api-versions.sh убеждается, что брокер принимает запросы.
- Почему это критично: depends_on без healthcheck не предотвращает race condition.
3. Лимиты ресурсов:
- deploy.resources.limits — ограничивает использование CPU/memory через cgroups.
- Без этого JVM может выделить память, превышающую лимит контейнера (падение с OutOfMemoryError).
4. Сеть:
- Все сервисы в одной сети app_net — общаются по именам (db, kafka).
- Встроенный DNS Docker резолвит имена в IP-адреса контейнеров.
#Java #middle #Docker
👍2
CI/CD pipeline: от коммита до production
Сборка образа в GitHub Actions
Как это работает
1. Кэширование слоев:
- actions/cache сохраняет результаты сборки в /tmp/.buildx-cache.
- При следующем запуске Buildx использует кэш через cache-from, пропуская этапы с неизмененными инструкциями (например, загрузку зависимостей Maven).
- Экономия времени: Сборка с кэшем — 2 минуты вместо 10.
2. Multi-arch сборка:
- platforms: linux/amd64,linux/arm64 — собирает образы для x86 и ARM.
- Использует QEMU для эмуляции архитектур (установлен через docker/setup-qemu-action).
3. Тегирование:
- myapp:latest — для dev-окружения (не рекомендуется для production!),
- myapp:${{ github.sha }} — уникальный тег на коммит (для отката),
- При тегировании релиза (v1.0.0) — myapp:1.0.0.
Нюансы:
- Для production никогда не используйте latest — это нарушает идемпотентность.
- Вместо latest применяйте семантическое версионирование: major.minor.patch.
#Java #middle #Docker
Сборка образа в GitHub Actions
name: Build and Push Docker Image
on:
push:
branches: [ main ]
tags: [ 'v*' ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up QEMU for multi-arch
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Cache Docker layers
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: ${{ runner.os }}-buildx-
- name: Build and push
uses: docker/build-push-action@v4
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
tags: myapp:latest,myapp:${{ github.sha }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache-new
build-args: |
BUILDKIT_INLINE_CACHE=1
Как это работает
1. Кэширование слоев:
- actions/cache сохраняет результаты сборки в /tmp/.buildx-cache.
- При следующем запуске Buildx использует кэш через cache-from, пропуская этапы с неизмененными инструкциями (например, загрузку зависимостей Maven).
- Экономия времени: Сборка с кэшем — 2 минуты вместо 10.
2. Multi-arch сборка:
- platforms: linux/amd64,linux/arm64 — собирает образы для x86 и ARM.
- Использует QEMU для эмуляции архитектур (установлен через docker/setup-qemu-action).
3. Тегирование:
- myapp:latest — для dev-окружения (не рекомендуется для production!),
- myapp:${{ github.sha }} — уникальный тег на коммит (для отката),
- При тегировании релиза (v1.0.0) — myapp:1.0.0.
Нюансы:
- Для production никогда не используйте latest — это нарушает идемпотентность.
- Вместо latest применяйте семантическое версионирование: major.minor.patch.
#Java #middle #Docker
👍4
Сканирование образа на уязвимости
Добавьте в пайплайн после сборки:
Как это работает:
- Trivy сканирует образ на наличие уязвимостей в:
- Базовом образе (distroless — минимум пакетов),
- Зависимостях Java (через анализ JAR-файлов).
- ignore-unfixed: true — игнорирует уязвимости без патчей (чтобы не блокировать сборку).
- severity: CRITICAL,HIGH — падает при критических уязвимостях.
Альтернатива: Snyk
Организация production-ready релиза
Политики хранения образов в registry
1. Для DockerHub/приватного registry:
- Удаляйте образы старше 30 дней (кроме tagged релизов)
- Оставляйте последние 5 образов для каждого major-версии (например, v1.*).
2. Как автоматизировать:
- В GitLab CI используйте cleanup policy для registry,
- В AWS ECR — Lifecycle Policy:
Multi-environment setup через override.yml
Структура проекта:
Пример docker-compose.prod.yml:
Запуск для production:
Ключевые отличия:
- Dev:
- Горячая перезагрузка кода через bind mounts,
- Отключенные лимиты ресурсов.
- Production:
- Фиксированные теги образов (не latest),
- Подключение к сети мониторинга (monitoring_net),
- Приватные тома на выделенном диске (/mnt/prod/pg_data).
Будущее: переход к Kubernetes/Helm
Почему Docker Compose не для production?
- Нет оркестрации на нескольких нодах,
- Отсутствует self-healing (автовосстановление упавших сервисов),
- Нет встроенного балансировщика нагрузки.
Как мигрировать:
1. Замените `docker-compose.yml` на Helm chart:
2. Настройте values.yaml:
3. Интегрируйте Kafka через Strimzi Operator:
Преимущества Kubernetes:
- Автомасштабирование (HPA),
- Сетевая изоляция через Network Policies,
- Управление секретами через kubectl create secret.
#Java #middle #Docker
Добавьте в пайплайн после сборки:
- name: Scan with Trivy
uses: aquasecurity/trivy-action@master
with:
image-ref: 'myapp:${{ github.sha }}'
format: 'table'
exit-code: '1'
ignore-unfixed: true
severity: 'CRITICAL,HIGH'
Как это работает:
- Trivy сканирует образ на наличие уязвимостей в:
- Базовом образе (distroless — минимум пакетов),
- Зависимостях Java (через анализ JAR-файлов).
- ignore-unfixed: true — игнорирует уязвимости без патчей (чтобы не блокировать сборку).
- severity: CRITICAL,HIGH — падает при критических уязвимостях.
Альтернатива: Snyk
- name: Snyk Container scan
uses: snyk/actions/container@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
image: myapp:${{ github.sha }}
args: --severity-threshold=high --fail-on=all
Организация production-ready релиза
Политики хранения образов в registry
1. Для DockerHub/приватного registry:
- Удаляйте образы старше 30 дней (кроме tagged релизов)
- Оставляйте последние 5 образов для каждого major-версии (например, v1.*).
2. Как автоматизировать:
- В GitLab CI используйте cleanup policy для registry,
- В AWS ECR — Lifecycle Policy:
{
"rules": [
{
"rulePriority": 1,
"description": "Удалять образы старше 30 дней",
"selection": {
"tagStatus": "untagged",
"countType": "sinceImagePushed",
"countUnit": "days",
"countNumber": 30
},
"action": { "type": "expire" }
}
]
}
Multi-environment setup через override.yml
Структура проекта:
├── docker-compose.yml # Базовая конфигурация
├── docker-compose.dev.yml # Dev-окружение
├── docker-compose.prod.yml # Production
└── .env
Пример docker-compose.prod.yml:
services:
app:
environment:
SPRING_PROFILES_ACTIVE: prod
JAVA_TOOL_OPTIONS: >-
-XX:MaxRAMPercentage=75.0
-XX:+UseZGC
deploy:
replicas: 3
update_config:
parallelism: 1
order: start-first
networks:
- monitoring_net
db:
environment:
POSTGRES_PASSWORD: ${PROD_DB_PASSWORD}
volumes:
- /mnt/prod/pg_data:/var/lib/postgresql/data
Запуск для production:
docker-compose -f docker-compose.yml -f docker-compose.prod.yml up -d
Ключевые отличия:
- Dev:
- Горячая перезагрузка кода через bind mounts,
- Отключенные лимиты ресурсов.
- Production:
- Фиксированные теги образов (не latest),
- Подключение к сети мониторинга (monitoring_net),
- Приватные тома на выделенном диске (/mnt/prod/pg_data).
Будущее: переход к Kubernetes/Helm
Почему Docker Compose не для production?
- Нет оркестрации на нескольких нодах,
- Отсутствует self-healing (автовосстановление упавших сервисов),
- Нет встроенного балансировщика нагрузки.
Как мигрировать:
1. Замените `docker-compose.yml` на Helm chart:
helm create myapp
2. Настройте values.yaml:
app:
replicaCount: 3
image:
repository: myapp
tag: "v1.0.0"
resources:
limits:
memory: 512Mi
cpu: "1.5"
3. Интегрируйте Kafka через Strimzi Operator:
# kafka.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
kafka:
version: 3.5.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
Преимущества Kubernetes:
- Автомасштабирование (HPA),
- Сетевая изоляция через Network Policies,
- Управление секретами через kubectl create secret.
#Java #middle #Docker
👍3
Реактивное программирование
Вступление
В современном мире разработки программного обеспечения мы всё чаще сталкиваемся с задачами, где простая последовательная логика уже не справляется. Нагрузки на системы растут экспоненциально: миллионы пользователей ожидают мгновенных ответов, микросервисы обмениваются сотнями запросов в секунду, а данные льются рекой из баз данных, API и внешних сервисов.
Традиционный подход — "запрос приходит, запускается поток, ждём ответа" — начинает давать сбои: система тонет в блокировках, лишних потоках и перерасходе ресурсов.
Представьте: сервер обрабатывает тысячи подключений, но каждый запрос занимает отдельный поток, который висит в ожидании, пока не придёт ответ от сети или диска. Это как пытаться пропустить толпу через узкий коридор — пробки неизбежны.
Чтобы выжать максимум из аппаратных ресурсов и сделать приложения более отзывчивыми, нужно менять способ мышления. Один из ключевых инструментов для этого — реактивное программирование, подход, который фокусируется на обработке потоков данных и событий в асинхронном режиме, без лишних блокировок.
Асинхронщина и её проблемы: от основ к пределам
Что такое асинхронность в программировании?
Это когда программа не стоит на месте, ожидая завершения одной задачи, а продолжает работать с другими. В Java это реализуется через механизмы, позволяющие запускать код параллельно. Но на пути к идеалу было много подводных камней.
Потоки: базовый, но тяжёлый инструмент
Многопоточность в Java появилась ещё в ранних версиях языка. Идея проста: вы создаёте новый поток — это как отдельный "рабочий", который выполняет код независимо от основного.
Код выглядит так:
Внутри метода run() вы пишете, что нужно сделать. На первый взгляд, это решает проблему: пока один поток ждёт ответа от сервера, другой может обрабатывать новый запрос.
Но практика показывает минусы. Каждый поток — это тяжёлый объект: он требует выделения памяти (обычно от 1 МБ на стек), системных ресурсов операционной системы и времени на создание. Если нагрузка растёт — скажем, 10 тысяч одновременных запросов, — вы не сможете создать 10 тысяч потоков: система просто исчерпает ресурсы, и всё упадёт. Ещё одна боль — контекстные переключения: когда процессор переключается между потоками, это стоит CPU-времени, иногда до микросекунд на переключение, что накапливается в большие задержки.
Чтобы смягчить это, в Java ввели ExecutorService — сервис-исполнитель, который управляет пулом потоков. Вы создаёте фиксированный пул, например, Executors.newFixedThreadPool(10), и подаёте задачи:
Теперь потоки переиспользуются: закончил задачу — берёт следующую. Это экономит ресурсы, но не решает корень проблемы. Если в задаче есть блокирующий код — например, чтение из файла или ожидание сети, — поток всё равно "зависает" в ожидании, блокируя место в пуле. Другие задачи ждут в очереди, и под высокой нагрузкой пул исчерпывается. В итоге, асинхронность есть, но она неэффективна: ресурсы тратятся на ожидание, а не на полезную работу.
#Java #middle #Reactor
Вступление
В современном мире разработки программного обеспечения мы всё чаще сталкиваемся с задачами, где простая последовательная логика уже не справляется. Нагрузки на системы растут экспоненциально: миллионы пользователей ожидают мгновенных ответов, микросервисы обмениваются сотнями запросов в секунду, а данные льются рекой из баз данных, API и внешних сервисов.
Традиционный подход — "запрос приходит, запускается поток, ждём ответа" — начинает давать сбои: система тонет в блокировках, лишних потоках и перерасходе ресурсов.
Представьте: сервер обрабатывает тысячи подключений, но каждый запрос занимает отдельный поток, который висит в ожидании, пока не придёт ответ от сети или диска. Это как пытаться пропустить толпу через узкий коридор — пробки неизбежны.
Чтобы выжать максимум из аппаратных ресурсов и сделать приложения более отзывчивыми, нужно менять способ мышления. Один из ключевых инструментов для этого — реактивное программирование, подход, который фокусируется на обработке потоков данных и событий в асинхронном режиме, без лишних блокировок.
Асинхронщина и её проблемы: от основ к пределам
Что такое асинхронность в программировании?
Это когда программа не стоит на месте, ожидая завершения одной задачи, а продолжает работать с другими. В Java это реализуется через механизмы, позволяющие запускать код параллельно. Но на пути к идеалу было много подводных камней.
Потоки: базовый, но тяжёлый инструмент
Многопоточность в Java появилась ещё в ранних версиях языка. Идея проста: вы создаёте новый поток — это как отдельный "рабочий", который выполняет код независимо от основного.
Код выглядит так:
new Thread(() -> { /* ваш код */ }).start();.
Внутри метода run() вы пишете, что нужно сделать. На первый взгляд, это решает проблему: пока один поток ждёт ответа от сервера, другой может обрабатывать новый запрос.
Но практика показывает минусы. Каждый поток — это тяжёлый объект: он требует выделения памяти (обычно от 1 МБ на стек), системных ресурсов операционной системы и времени на создание. Если нагрузка растёт — скажем, 10 тысяч одновременных запросов, — вы не сможете создать 10 тысяч потоков: система просто исчерпает ресурсы, и всё упадёт. Ещё одна боль — контекстные переключения: когда процессор переключается между потоками, это стоит CPU-времени, иногда до микросекунд на переключение, что накапливается в большие задержки.
Чтобы смягчить это, в Java ввели ExecutorService — сервис-исполнитель, который управляет пулом потоков. Вы создаёте фиксированный пул, например, Executors.newFixedThreadPool(10), и подаёте задачи:
executor.execute(() -> { /* код */ });
Теперь потоки переиспользуются: закончил задачу — берёт следующую. Это экономит ресурсы, но не решает корень проблемы. Если в задаче есть блокирующий код — например, чтение из файла или ожидание сети, — поток всё равно "зависает" в ожидании, блокируя место в пуле. Другие задачи ждут в очереди, и под высокой нагрузкой пул исчерпывается. В итоге, асинхронность есть, но она неэффективна: ресурсы тратятся на ожидание, а не на полезную работу.
#Java #middle #Reactor
👍3
Future: обещание результата, но с подвохом
В Java 5 ввели Future — это как "чек" на будущий результат.
Вы подаёте задачу в executor и получаете объект Future, который обещает: "когда-нибудь я дам тебе ответ".
Пример:
Плюс в том, что вы можете продолжать работу, не дожидаясь: пока задача крутится в фоне, основной код идёт дальше. Но чтобы забрать результат, нужно вызвать future.get(). И вот здесь засада: get() блокирует текущий поток до тех пор, пока задача не завершится. Если задача задерживается — скажем, из-за сети, — ваш поток тоже висит в ожидании. Получается, асинхронность иллюзорна: да, запуск асинхронный, но использование результата синхронное и блокирующее. Это как заказать еду по доставке, но стоять у двери, не отходя, пока курьер не приедет. Выигрыш минимален, особенно в веб-приложениях, где запросы должны обрабатываться быстро.
Ещё Future неудобен в композиции: если нужно объединить результаты нескольких задач, приходится вручную ждать каждого get(), что приводит к спагетти-коду с try-catch для ошибок и таймаутами.
CompletableFuture: цепочки действий, но без избавления от ада
Java 8 принесла CompletableFuture — улучшенную версию Future, которая позволяет строить цепочки асинхронных операций без блокировок на get(). Теперь результат можно обрабатывать через "колбэки" — функции, которые вызываются автоматически по завершении.
Пример:
Есть методы для комбинации: thenCompose для последовательных цепочек, thenCombine для параллельного объединения результатов, handle для обработки ошибок. Это шаг вперёд: код становится declarative (описательным), вы фокусируетесь на "что сделать", а не "как ждать". Нет нужды в ручном get() — всё течёт само.
Но радость недолговечна. Когда приложение усложняется — например, нужно асинхронно запросить данные из базы, потом из внешнего API, обработать ошибки и объединить, — цепочки лямбд растут в "callback-ад" (ад колбэков): вложенные функции, которые трудно читать, отлаживать и тестировать. Один уровень — ок, но пять-шесть — и код превращается в пирамиду, где сложно отследить поток выполнения.
Ещё хуже: под капотом блокировки никуда не делись. Если в цепочке есть блокирующий вызов — например, Thread.sleep() для симуляции задержки или JDBC-драйвер, который ждёт ответа от базы, блокируя поток, — весь CompletableFuture теряет преимущество. Поток из пула всё равно занят ожиданием, и под нагрузкой система снова захлёбывается. Плюс, управление ошибками в цепочках требует осторожности: одна ошибка может сломать всю последовательность, если не обработать timely.
В итоге, CompletableFuture дал выразительный синтаксис и удобство для простых сценариев, но не решил системные проблемы: ресурсы тратятся впустую на блокировки, сложность растёт, а масштабируемость под вопросом.
#Java #middle #Reactor
В Java 5 ввели Future — это как "чек" на будущий результат.
Вы подаёте задачу в executor и получаете объект Future, который обещает: "когда-нибудь я дам тебе ответ".
Пример:
ExecutorService executor = Executors.newFixedThreadPool(10); Future<String> future = executor.submit(() -> { Thread.sleep(1000); return "Привет"; });.
Плюс в том, что вы можете продолжать работу, не дожидаясь: пока задача крутится в фоне, основной код идёт дальше. Но чтобы забрать результат, нужно вызвать future.get(). И вот здесь засада: get() блокирует текущий поток до тех пор, пока задача не завершится. Если задача задерживается — скажем, из-за сети, — ваш поток тоже висит в ожидании. Получается, асинхронность иллюзорна: да, запуск асинхронный, но использование результата синхронное и блокирующее. Это как заказать еду по доставке, но стоять у двери, не отходя, пока курьер не приедет. Выигрыш минимален, особенно в веб-приложениях, где запросы должны обрабатываться быстро.
Ещё Future неудобен в композиции: если нужно объединить результаты нескольких задач, приходится вручную ждать каждого get(), что приводит к спагетти-коду с try-catch для ошибок и таймаутами.
CompletableFuture: цепочки действий, но без избавления от ада
Java 8 принесла CompletableFuture — улучшенную версию Future, которая позволяет строить цепочки асинхронных операций без блокировок на get(). Теперь результат можно обрабатывать через "колбэки" — функции, которые вызываются автоматически по завершении.
Пример:
CompletableFuture.supplyAsync(() -> { return "Данные"; }).thenApply(data -> { return data.toUpperCase(); }).thenAccept(System.out::println);.
Здесь supplyAsync запускает задачу асинхронно, thenApply преобразует результат (например, переводит в верхний регистр), thenAccept выводит его.
Есть методы для комбинации: thenCompose для последовательных цепочек, thenCombine для параллельного объединения результатов, handle для обработки ошибок. Это шаг вперёд: код становится declarative (описательным), вы фокусируетесь на "что сделать", а не "как ждать". Нет нужды в ручном get() — всё течёт само.
Но радость недолговечна. Когда приложение усложняется — например, нужно асинхронно запросить данные из базы, потом из внешнего API, обработать ошибки и объединить, — цепочки лямбд растут в "callback-ад" (ад колбэков): вложенные функции, которые трудно читать, отлаживать и тестировать. Один уровень — ок, но пять-шесть — и код превращается в пирамиду, где сложно отследить поток выполнения.
Ещё хуже: под капотом блокировки никуда не делись. Если в цепочке есть блокирующий вызов — например, Thread.sleep() для симуляции задержки или JDBC-драйвер, который ждёт ответа от базы, блокируя поток, — весь CompletableFuture теряет преимущество. Поток из пула всё равно занят ожиданием, и под нагрузкой система снова захлёбывается. Плюс, управление ошибками в цепочках требует осторожности: одна ошибка может сломать всю последовательность, если не обработать timely.
В итоге, CompletableFuture дал выразительный синтаксис и удобство для простых сценариев, но не решил системные проблемы: ресурсы тратятся впустую на блокировки, сложность растёт, а масштабируемость под вопросом.
#Java #middle #Reactor
👍3
Callback-ад и блокировки: кульминация проблем
Callback-ад — это когда колбэки (функции обратного вызова) наслаиваются друг на друга, делая код нечитаемым. В CompletableFuture это проявляется в глубоких цепочках: thenApply внутри thenCompose, с handle для ошибок. Отладка — кошмар: где именно сломалось? Тестирование — тоже, потому что асинхронность добавляет неопределённость в порядок выполнения.
Блокировки — это когда код "зависает" в ожидании внешнего события, не давая потоку работать с другими задачами. В Java многие библиотеки (как старые IO или JDBC) блокирующие по природе: они используют системные вызовы, которые стопорят поток. Даже в асинхронных конструкциях, если внутри лямбды такая блокировка, весь пул потоков может исчерпаться. Представьте сервер с 100 потоками: 100 запросов с задержкой — и новые ждут в очереди, вызывая таймауты.
Это приводит к неэффективности: CPU простаивает, память тратится на "спящие" потоки, а под пиковой нагрузкой система не масштабируется горизонтально.
Почему нужен новый подход: реактивное программирование
Мы дошли до предела традиционных моделей. Потоки хороши для CPU-bound задач (расчёты), но тяжёлые для IO-bound (сеть, диски). Future дал обещания, но не избавил от блокировок. CompletableFuture улучшил код, но оставил callback-ад и зависимость от неблокирующих библиотек.
Здесь на сцену выходит реактивное программирование — подход, где мы думаем в терминах потоков данных и событий, а не отдельных задач. Вместо "запрос → блокировка в потоке → ответ" мы строим конвейеры: данные приходят асинхронно по мере готовности, обработка идёт реактивно, без выделения потока на каждое ожидание. Это как перейти от конвейера с паузами к непрерывному потоку. В следующих постах разберём Reactive Streams, Flux/Mono в Project Reactor и как это решает проблемы.
#Java #middle #Reactor
Callback-ад — это когда колбэки (функции обратного вызова) наслаиваются друг на друга, делая код нечитаемым. В CompletableFuture это проявляется в глубоких цепочках: thenApply внутри thenCompose, с handle для ошибок. Отладка — кошмар: где именно сломалось? Тестирование — тоже, потому что асинхронность добавляет неопределённость в порядок выполнения.
Блокировки — это когда код "зависает" в ожидании внешнего события, не давая потоку работать с другими задачами. В Java многие библиотеки (как старые IO или JDBC) блокирующие по природе: они используют системные вызовы, которые стопорят поток. Даже в асинхронных конструкциях, если внутри лямбды такая блокировка, весь пул потоков может исчерпаться. Представьте сервер с 100 потоками: 100 запросов с задержкой — и новые ждут в очереди, вызывая таймауты.
Это приводит к неэффективности: CPU простаивает, память тратится на "спящие" потоки, а под пиковой нагрузкой система не масштабируется горизонтально.
Почему нужен новый подход: реактивное программирование
Мы дошли до предела традиционных моделей. Потоки хороши для CPU-bound задач (расчёты), но тяжёлые для IO-bound (сеть, диски). Future дал обещания, но не избавил от блокировок. CompletableFuture улучшил код, но оставил callback-ад и зависимость от неблокирующих библиотек.
Здесь на сцену выходит реактивное программирование — подход, где мы думаем в терминах потоков данных и событий, а не отдельных задач. Вместо "запрос → блокировка в потоке → ответ" мы строим конвейеры: данные приходят асинхронно по мере готовности, обработка идёт реактивно, без выделения потока на каждое ожидание. Это как перейти от конвейера с паузами к непрерывному потоку. В следующих постах разберём Reactive Streams, Flux/Mono в Project Reactor и как это решает проблемы.
#Java #middle #Reactor
👍4
Реактивное программирование
Что такое потоки данных в реактивном мире?
В реактивном программировании данные — это не статичный объект, который вы запрашиваете и ждёте. Это динамичный поток: последовательность элементов (событий), которые могут приходить в любое время, в любом количестве. Поток может быть бесконечным (как лента новостей) или конечным (как результаты поиска). Главное — обработка идёт реактивно: программа "подписывается" на поток и реагирует на каждый элемент по мере его появления, без блокировок.
Это решает боли из предыдущего поста: вместо выделения потока на ожидание, мы используем неблокирующий механизм. Если данных нет — ничего не происходит, ресурсы свободны. Когда данные приходят — срабатывают реакции. Это как подписка на уведомления: телефон не висит в ожидании, а просто пиликает при новом сообщении.
В основе лежит спецификация Reactive Streams — стандарт, который определяет, как строить такие потоки.
Он включает четыре ключевых интерфейса:
Издатель (Publisher): источник данных. Он "публикует" элементы потока. Например, это может быть база данных, генерирующая записи по запросу.
Подписчик (Subscriber): получатель данных. Он "подписывается" на издателя и реагирует на элементы: onNext (получил элемент), onError (ошибка), onComplete (поток завершён).
Подписка (Subscription): связь между издателем и подписчиком. Через неё подписчик может запросить больше данных (request(n)) или отменить подписку (cancel). Это вводит "обратное давление" — механизм, чтобы подписчик не захлёбывался данными, если не успевает их обрабатывать.
Процессор (Processor): комбинация издателя и подписчика, для промежуточной обработки (как фильтр в конвейере).
Эти интерфейсы — основа. Они обеспечивают асинхронность без блокировок: всё работает на основе событий, а не ожиданий.
События как река: метафора и практика
Представьте реку событий: вода (данные) течёт непрерывно, иногда бурно (пиковая нагрузка), иногда спокойно. Ваша программа — не плотина, которая блокирует поток, а турбина, которая генерирует энергию по мере течения. Если река слишком быстрая — турбина сигнализирует "замедлить" (обратное давление), чтобы не перегрузиться.
В Project Reactor это реализовано через два типа потоков:
Mono: поток для нуля или одного элемента. Идеален для одиночных операций, как HTTP-запрос, возвращающий один ответ.
Пример:
Flux: поток для нуля, одного или многих элементов, возможно бесконечных. Для последовательностей, как стриминг данных.
Пример:
Flux "выдаёт" элементы по одному: 1, 2, 3... Это как река: элементы приходят последовательно, но асинхронно.
Почему это лучше традиционных подходов?
В CompletableFuture вы строите цепочки, но рискуете блокировками внутри.
В реактивном стиле всё неблокирующее: используйте операторы вроде map (преобразовать элемент), filter (отфильтровать), flatMap (развернуть в подпотоки).
Пример цепочки:
#Java #middle #Reactor #data_stream
Что такое потоки данных в реактивном мире?
В реактивном программировании данные — это не статичный объект, который вы запрашиваете и ждёте. Это динамичный поток: последовательность элементов (событий), которые могут приходить в любое время, в любом количестве. Поток может быть бесконечным (как лента новостей) или конечным (как результаты поиска). Главное — обработка идёт реактивно: программа "подписывается" на поток и реагирует на каждый элемент по мере его появления, без блокировок.
Это решает боли из предыдущего поста: вместо выделения потока на ожидание, мы используем неблокирующий механизм. Если данных нет — ничего не происходит, ресурсы свободны. Когда данные приходят — срабатывают реакции. Это как подписка на уведомления: телефон не висит в ожидании, а просто пиликает при новом сообщении.
В основе лежит спецификация Reactive Streams — стандарт, который определяет, как строить такие потоки.
Он включает четыре ключевых интерфейса:
Издатель (Publisher): источник данных. Он "публикует" элементы потока. Например, это может быть база данных, генерирующая записи по запросу.
Подписчик (Subscriber): получатель данных. Он "подписывается" на издателя и реагирует на элементы: onNext (получил элемент), onError (ошибка), onComplete (поток завершён).
Подписка (Subscription): связь между издателем и подписчиком. Через неё подписчик может запросить больше данных (request(n)) или отменить подписку (cancel). Это вводит "обратное давление" — механизм, чтобы подписчик не захлёбывался данными, если не успевает их обрабатывать.
Процессор (Processor): комбинация издателя и подписчика, для промежуточной обработки (как фильтр в конвейере).
Эти интерфейсы — основа. Они обеспечивают асинхронность без блокировок: всё работает на основе событий, а не ожиданий.
События как река: метафора и практика
Представьте реку событий: вода (данные) течёт непрерывно, иногда бурно (пиковая нагрузка), иногда спокойно. Ваша программа — не плотина, которая блокирует поток, а турбина, которая генерирует энергию по мере течения. Если река слишком быстрая — турбина сигнализирует "замедлить" (обратное давление), чтобы не перегрузиться.
В Project Reactor это реализовано через два типа потоков:
Mono: поток для нуля или одного элемента. Идеален для одиночных операций, как HTTP-запрос, возвращающий один ответ.
Пример:
Mono<string> mono = Mono.just("Привет из реки"); // Создаём простой Mono с одним элементом
mono.subscribe(
value -> System.out.println("Получено: " + value), // onNext: реакция на элемент
error -> System.err.println("Ошибка: " + error), // onError
() -> System.out.println("Завершено") // onComplete
);
Здесь subscribe — это подписка. Mono "течёт" асинхронно: если элемент готов — срабатывает onNext, потом onComplete. Нет блокировок: код continue после subscribe.
Flux: поток для нуля, одного или многих элементов, возможно бесконечных. Для последовательностей, как стриминг данных.
Пример:
Flux<integer> flux = Flux.range(1, 5); // Поток чисел от 1 до 5
flux.subscribe(
value -> System.out.println("Элемент: " + value),
error -> System.err.println("Ошибка: " + error),
() -> System.out.println("Поток завершён")
);
Flux "выдаёт" элементы по одному: 1, 2, 3... Это как река: элементы приходят последовательно, но асинхронно.
Почему это лучше традиционных подходов?
В CompletableFuture вы строите цепочки, но рискуете блокировками внутри.
В реактивном стиле всё неблокирующее: используйте операторы вроде map (преобразовать элемент), filter (отфильтровать), flatMap (развернуть в подпотоки).
Пример цепочки:
Flux.fromIterable(List.of("яблоко", "банан", "вишня"))
.filter(fruit -> fruit.startsWith("б")) // Фильтруем
.map(String::toUpperCase) // Преобразуем
.subscribe(System.out::println); // Подписываемся
Результат: "БАНАН". Всё течёт как конвейер, без callback-ада: код читаем, как последовательный, но работает асинхронно.
#Java #middle #Reactor #data_stream
👍3
Обратное давление: контроль
Одна из ключевых фишек — backpressure (обратное давление). В традиционных системах, если производитель данных быстрее потребителя, буфер переполняется, и система падает. В Reactive Streams подписчик через Subscription.request(n) говорит: "Дай мне n элементов". Издатель выдаёт ровно столько, сколько запрошено. Это как шлюзы на реке: предотвращают наводнение.
Пример в Flux:
Подписчик контролирует темп, избегая перегрузки.
Почему это новый подход, который нам нужен?
Реактивное программирование не просто добавляет инструменты — оно меняет мышление. Вместо "императивного" кода (делай то, жди это), мы пишем "декларативно": опиши, как реагировать на поток. Это масштабируется: на сервере с 4 ядрами можно обрабатывать тысячи подключений, потому что нет блокирующих потоков. Библиотеки вроде Reactor интегрируются с неблокирующими драйверами (например, reactive JDBC или WebFlux для веб), решая боли блокировок.
#Java #middle #Reactor #data_stream
Одна из ключевых фишек — backpressure (обратное давление). В традиционных системах, если производитель данных быстрее потребителя, буфер переполняется, и система падает. В Reactive Streams подписчик через Subscription.request(n) говорит: "Дай мне n элементов". Издатель выдаёт ровно столько, сколько запрошено. Это как шлюзы на реке: предотвращают наводнение.
Пример в Flux:
Flux<integer> fastFlux = Flux.range(1, 1000); // Быстрый источник
fastFlux.subscribe(new BaseSubscriber<integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // Запрашиваем только 10 элементов сначала
}
@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
if (value % 10 == 0) request(10); // Запрашиваем ещё по мере обработки
}
});
Подписчик контролирует темп, избегая перегрузки.
Почему это новый подход, который нам нужен?
Реактивное программирование не просто добавляет инструменты — оно меняет мышление. Вместо "императивного" кода (делай то, жди это), мы пишем "декларативно": опиши, как реагировать на поток. Это масштабируется: на сервере с 4 ядрами можно обрабатывать тысячи подключений, потому что нет блокирующих потоков. Библиотеки вроде Reactor интегрируются с неблокирующими драйверами (например, reactive JDBC или WebFlux для веб), решая боли блокировок.
#Java #middle #Reactor #data_stream
👍4
Реактивное программирование
Концепции реактивного программирования: Push vs Pull — кто управляет данными
Сегодня разберём модели Push (отправку данных) и Pull (получение данных), почему одна из них идеальна для реактивного стиля, и как это решает проблемы из первого поста: от тяжёлых потоков до callback-ада.
Представьте: у вас есть конвейер на фабрике. В pull-модели рабочий сам подходит к предыдущему этапу и "берет" деталь, когда ему нужно. В push-модели предыдущий этап "отправляет" деталь дальше, как только она готова, не спрашивая.
Какая модель лучше? Зависит от сценария. В программировании это то же самое: pull подходит для предсказуемых, контролируемых потоков, а push — для динамичных, событийных, где данные приходят непредсказуемо (сеть, пользователи, сенсоры).
Реактивное программирование строится на push — и вот почему это революция.
Pull-модель: вы контролируете темп, но рискуете блокировками
В традиционном императивном программировании (когда код выполняется шаг за шагом, как рецепт) данные обычно "получаются" по требованию. Это pull-модель: потребитель (ваш код) сам запрашивает данные, когда готов их обработать.
Пример — чтение из итератора в Java:
Здесь вы контролируете процесс: hasNext() проверяет наличие, next() получает элемент. Это удобно для локальных, синхронных данных — вы знаете, сколько элементов, и ничего не ждёте. Но под капотом это синхронно: если данные в потоке (файл, сеть), next() может заблокироваться, как в старых потоках. Поток висит в ожидании, ресурсы тратятся впустую.
В асинхронных сценариях pull эволюционировал в Future или CompletableFuture: вы "запрашиваете" результат через get() или цепочки, но контроль остаётся у вас.
Проблема: если источник данных медленный (БД, API), ваш pull-запрос блокирует или создаёт callback-ад. Под нагрузкой — тысячи pull-запросов — система не масштабируется, потому что каждый требует ресурса (потока). Это как толпа в очереди: каждый тянет за своим, но если касса одна, все стоят.
Ещё минус: pull не справляется с "лишними" данными. Если источник генерирует 1 млн событий в секунду, а вы получаете по одному — либо перегрузка буфера, либо вы не успеваете. Нет встроенного механизма, чтобы сказать "замедлись".
#Java #middle #Reactor #Push #Flux
Концепции реактивного программирования: Push vs Pull — кто управляет данными
Сегодня разберём модели Push (отправку данных) и Pull (получение данных), почему одна из них идеальна для реактивного стиля, и как это решает проблемы из первого поста: от тяжёлых потоков до callback-ада.
Представьте: у вас есть конвейер на фабрике. В pull-модели рабочий сам подходит к предыдущему этапу и "берет" деталь, когда ему нужно. В push-модели предыдущий этап "отправляет" деталь дальше, как только она готова, не спрашивая.
Какая модель лучше? Зависит от сценария. В программировании это то же самое: pull подходит для предсказуемых, контролируемых потоков, а push — для динамичных, событийных, где данные приходят непредсказуемо (сеть, пользователи, сенсоры).
Реактивное программирование строится на push — и вот почему это революция.
Pull-модель: вы контролируете темп, но рискуете блокировками
В традиционном императивном программировании (когда код выполняется шаг за шагом, как рецепт) данные обычно "получаются" по требованию. Это pull-модель: потребитель (ваш код) сам запрашивает данные, когда готов их обработать.
Пример — чтение из итератора в Java:
List<String> fruits = Arrays.asList("яблоко", "банан", "вишня");
Iterator<String> iterator = fruits.iterator();
while (iterator.hasNext()) {
String fruit = iterator.next(); // "Вытягиваем" следующий элемент
System.out.println(fruit.toUpperCase());
}
Здесь вы контролируете процесс: hasNext() проверяет наличие, next() получает элемент. Это удобно для локальных, синхронных данных — вы знаете, сколько элементов, и ничего не ждёте. Но под капотом это синхронно: если данные в потоке (файл, сеть), next() может заблокироваться, как в старых потоках. Поток висит в ожидании, ресурсы тратятся впустую.
В асинхронных сценариях pull эволюционировал в Future или CompletableFuture: вы "запрашиваете" результат через get() или цепочки, но контроль остаётся у вас.
Проблема: если источник данных медленный (БД, API), ваш pull-запрос блокирует или создаёт callback-ад. Под нагрузкой — тысячи pull-запросов — система не масштабируется, потому что каждый требует ресурса (потока). Это как толпа в очереди: каждый тянет за своим, но если касса одна, все стоят.
Ещё минус: pull не справляется с "лишними" данными. Если источник генерирует 1 млн событий в секунду, а вы получаете по одному — либо перегрузка буфера, либо вы не успеваете. Нет встроенного механизма, чтобы сказать "замедлись".
#Java #middle #Reactor #Push #Flux
👍3
Push-модель: данные приходят сами, реактивно и эффективно
Теперь изменим: в push-модели источник "отправляет" данные потребителю, как только они готовы, без запросов. Потребитель пассивен — он подписывается и реагирует.
Это основа реактивного программирования: события push'атся асинхронно, без блокировок. Контроль переходит к источнику, но с обратным давлением — подписчик может сказать "хватит на время".
В Reactive Streams это реализовано через Publisher и Subscriber: издатель толкает onNext(элемент), подписчик реагирует сразу.
Пример с Flux в Project Reactor (push-стиль):
Здесь Flux — издатель, который отправляет элементы по мере готовности. subscribe() — подписка, и элементы приходят автоматически: "ЯБЛОКО", "БАНАН"... Нет next() — нет pull.
Если источник асинхронный, например, чтение из сети:
Данные push'атся по мере поступления от сервера — без блокировок. Reactor использует неблокирующий IO (на базе Netty), так что поток не висит: один event-loop-цикл (цикл обработки событий) обслуживает тысячи подписок.
Почему push лучше для реактивности?
Во-первых, эффективность: нет лишних проверок hasNext().
Во-вторых, естественность для событий: клик мыши, сообщение в чате — это push по природе, они приходят сами.
В-третьих, масштабируемость: тысячи подписчиков на один издатель — ок, потому что push идёт через события, а не потоки на каждого.
Гибридные сценарии: когда mix работает
На практике модели смешиваются. В Reactor Flux может имитировать pull через операторы вроде buffer() или take(), но основа — push.
Пример: pull из локального списка, но push в сеть:
Здесь локальный pull (fromIterable) переходит в push (flatMap для API).
Это гибкость: используйте pull для контроля, push для асинхронности. Но важно избегать блокировок: Reactor проверяет и предупреждает, если в лямбде блокирующий код (onBlock()).
Ещё пример из реальной жизни: стриминг видео в Netflix. Pull — когда пользователь сам получает фреймы, но под нагрузкой лагает. Push — сервер отдает фреймы по мере готовности, с буферизацией. Реактивные библиотеки (как RxJava) позволяют строить такие конвейеры.
Почему Push — ключ к новому подходу в реактивном программировании
Возвращаясь к проблемам: потоки тяжёлые, потому что pull требует ожидания; Future блокирует на get(), потому что это pull в асинхронной обёртке; CompletableFuture даёт цепочки, но push-подход в нём слаб (колбэки — это мини-push, но без полного контроля).
Реактивный push меняет всё: данные текут как река, вы реагируете без ожидания, ресурсы на минимуме. Системы становятся resilient (устойчивыми): если один поток сломается, другие продолжают. Под нагрузкой — горизонтальное масштабирование без боли.
#Java #middle #Reactor #Push #Flux
Теперь изменим: в push-модели источник "отправляет" данные потребителю, как только они готовы, без запросов. Потребитель пассивен — он подписывается и реагирует.
Это основа реактивного программирования: события push'атся асинхронно, без блокировок. Контроль переходит к источнику, но с обратным давлением — подписчик может сказать "хватит на время".
В Reactive Streams это реализовано через Publisher и Subscriber: издатель толкает onNext(элемент), подписчик реагирует сразу.
Пример с Flux в Project Reactor (push-стиль):
Flux<String> pushFlux = Flux.fromIterable(Arrays.asList("яблоко", "банан", "вишня"))
.map(String::toUpperCase); // Преобразование в потоке
pushFlux.subscribe(
fruit -> System.out.println("Получено из push: " + fruit), // Реакция на толкание
Throwable::printStackTrace, // Обработка ошибок
() -> System.out.println("Push завершён")
);
Здесь Flux — издатель, который отправляет элементы по мере готовности. subscribe() — подписка, и элементы приходят автоматически: "ЯБЛОКО", "БАНАН"... Нет next() — нет pull.
Если источник асинхронный, например, чтение из сети:
WebClient.create()
.get()
.uri("https://api.example.com/fruits")
.retrieve()
.bodyToFlux(String.class) // Flux толкает строки из ответа
.subscribe(fruit -> System.out.println("Push из API: " + fruit));
Данные push'атся по мере поступления от сервера — без блокировок. Reactor использует неблокирующий IO (на базе Netty), так что поток не висит: один event-loop-цикл (цикл обработки событий) обслуживает тысячи подписок.
Почему push лучше для реактивности?
Во-первых, эффективность: нет лишних проверок hasNext().
Во-вторых, естественность для событий: клик мыши, сообщение в чате — это push по природе, они приходят сами.
В-третьих, масштабируемость: тысячи подписчиков на один издатель — ок, потому что push идёт через события, а не потоки на каждого.
Гибридные сценарии: когда mix работает
На практике модели смешиваются. В Reactor Flux может имитировать pull через операторы вроде buffer() или take(), но основа — push.
Пример: pull из локального списка, но push в сеть:
Flux.fromIterable(fruits)
.map(String::toUpperCase)
.flatMap(fruit -> sendToApi(fruit)) // flatMap толкает в асинхронный API
.subscribe(result -> System.out.println("Ответ: " + result));
Здесь локальный pull (fromIterable) переходит в push (flatMap для API).
Это гибкость: используйте pull для контроля, push для асинхронности. Но важно избегать блокировок: Reactor проверяет и предупреждает, если в лямбде блокирующий код (onBlock()).
Ещё пример из реальной жизни: стриминг видео в Netflix. Pull — когда пользователь сам получает фреймы, но под нагрузкой лагает. Push — сервер отдает фреймы по мере готовности, с буферизацией. Реактивные библиотеки (как RxJava) позволяют строить такие конвейеры.
Почему Push — ключ к новому подходу в реактивном программировании
Возвращаясь к проблемам: потоки тяжёлые, потому что pull требует ожидания; Future блокирует на get(), потому что это pull в асинхронной обёртке; CompletableFuture даёт цепочки, но push-подход в нём слаб (колбэки — это мини-push, но без полного контроля).
Реактивный push меняет всё: данные текут как река, вы реагируете без ожидания, ресурсы на минимуме. Системы становятся resilient (устойчивыми): если один поток сломается, другие продолжают. Под нагрузкой — горизонтальное масштабирование без боли.
#Java #middle #Reactor #Push #Flux
👍3
Реактивное программирование
Концепции реактивного программирования: Reactive Streams API — Publisher и Subscriber
Сегодня разберём Reactive Streams API — это спецификация, которая лежит в сердце реактивного программирования на Java. Она не просто набор интерфейсов, а рамка для построения асинхронных потоков данных с контролем над ними. Представьте это как правила дорожного движения для потока событий: без них — хаос и пробки, с ними — плавный поток.
Reactive Streams API решает ключевые проблемы из первого поста: вместо тяжёлых потоков и callback-ада, мы получаем унифицированный способ обмена данными между компонентами. Это не библиотека, а интерфейсы, которые реализуют фреймворки вроде Project Reactor или RxJava. Они обеспечивают совместимость: один компонент от Reactor может работать с другим от Akka Streams.
Что такое Reactive Streams API
Спецификация Reactive Streams появилась в 2015 году как ответ на хаос асинхронности. До неё каждый фреймворк изобретал велосипед: свои способы обработки потоков, ошибок и давления.
API стандартизирует это: четыре интерфейса (Publisher, Subscriber, Subscription, Processor), которые описывают, как данные текут асинхронно. Главная идея — неблокирующая коммуникация: ничего не ждём синхронно, всё через события.
Это подводит нас ближе к реактивному мышлению: вместо "запроси и жди" (как в Future.get()), мы "подпишись и реагируй". Под нагрузкой это экономит ресурсы: один поток (event-loop) может обслуживать тысячи подписок, без создания новых на каждую задачу. В итоге, системы становятся более отзывчивыми и масштабируемыми — идеально для микросервисов или реального времени.
Publisher: источник данных, который отправляет события
Publisher — это интерфейс для издателя, который генерирует поток данных.
Он как фабрика событий: производит элементы (любые объекты), ошибки или сигнал завершения. Publisher не знает, кто его слушает, — он просто готов отправлять (push) данные, когда они появятся.
Интерфейс прост: всего один метод subscribe(Subscriber s). Когда вы вызываете его, издатель регистрирует подписчика и начинает процесс. Но данные не льются сразу — всё под контролем (об этом ниже).
Пример простого издателя в Project Reactor (который реализует Reactive Streams):
Здесь publisher — источник.
Он может быть асинхронным: например, читать из сети или БД. Когда данные готовы, он отправляет их подписчику через onNext(элемент).
Если ошибка — onError(Throwable).
Если конец — onComplete().
Почему это лучше традиционных подходов? В отличие от потоков (где каждый запрос — отдельный тяжёлый объект), publisher лёгкий: он не выделяет ресурсы заранее, а реагирует на подписку. Нет блокировок: если данных нет, ничего не происходит.
#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber #Publisher
Концепции реактивного программирования: Reactive Streams API — Publisher и Subscriber
Сегодня разберём Reactive Streams API — это спецификация, которая лежит в сердце реактивного программирования на Java. Она не просто набор интерфейсов, а рамка для построения асинхронных потоков данных с контролем над ними. Представьте это как правила дорожного движения для потока событий: без них — хаос и пробки, с ними — плавный поток.
Reactive Streams API решает ключевые проблемы из первого поста: вместо тяжёлых потоков и callback-ада, мы получаем унифицированный способ обмена данными между компонентами. Это не библиотека, а интерфейсы, которые реализуют фреймворки вроде Project Reactor или RxJava. Они обеспечивают совместимость: один компонент от Reactor может работать с другим от Akka Streams.
Что такое Reactive Streams API
Спецификация Reactive Streams появилась в 2015 году как ответ на хаос асинхронности. До неё каждый фреймворк изобретал велосипед: свои способы обработки потоков, ошибок и давления.
API стандартизирует это: четыре интерфейса (Publisher, Subscriber, Subscription, Processor), которые описывают, как данные текут асинхронно. Главная идея — неблокирующая коммуникация: ничего не ждём синхронно, всё через события.
Это подводит нас ближе к реактивному мышлению: вместо "запроси и жди" (как в Future.get()), мы "подпишись и реагируй". Под нагрузкой это экономит ресурсы: один поток (event-loop) может обслуживать тысячи подписок, без создания новых на каждую задачу. В итоге, системы становятся более отзывчивыми и масштабируемыми — идеально для микросервисов или реального времени.
Publisher: источник данных, который отправляет события
Publisher — это интерфейс для издателя, который генерирует поток данных.
Он как фабрика событий: производит элементы (любые объекты), ошибки или сигнал завершения. Publisher не знает, кто его слушает, — он просто готов отправлять (push) данные, когда они появятся.
Интерфейс прост: всего один метод subscribe(Subscriber s). Когда вы вызываете его, издатель регистрирует подписчика и начинает процесс. Но данные не льются сразу — всё под контролем (об этом ниже).
Пример простого издателя в Project Reactor (который реализует Reactive Streams):
import reactor.core.publisher.Flux; // Flux реализует Publisher
Flux<Integer> publisher = Flux.range(1, 5); // Издатель: поток от 1 до 5
Здесь publisher — источник.
Он может быть асинхронным: например, читать из сети или БД. Когда данные готовы, он отправляет их подписчику через onNext(элемент).
Если ошибка — onError(Throwable).
Если конец — onComplete().
Почему это лучше традиционных подходов? В отличие от потоков (где каждый запрос — отдельный тяжёлый объект), publisher лёгкий: он не выделяет ресурсы заранее, а реагирует на подписку. Нет блокировок: если данных нет, ничего не происходит.
#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber #Publisher
👍1
Subscriber: получатель, который реагирует на события
Subscriber — интерфейс для подписчика, который "слушает" издателя. Он как потребитель на конвейере: получает элементы и решает, что с ними делать.
Методы:
- onSubscribe(Subscription s): вызывается сразу после подписки. Здесь подписчик получает подписку — объект для контроля.
- onNext(T item): срабатывает на каждый элемент. Здесь обработка: логика, трансформация и т.д.
- onError(Throwable t): если ошибка — обработка исключения.
- onComplete(): поток завершён успешно.
Подписчик пассивен: не получает данные (pull), а ждёт push. Это решает callback-ад из CompletableFuture — реакции в одном месте, код чище.
Пример подписки:
Здесь subscribe() связывает publisher и subscriber. Данные отправляются по одному, по запросу (request(1)). Это асинхронно: код после subscribe() продолжается сразу, без ожидания.
В Reactor есть BaseSubscriber для упрощения — переопределяйте только нужные методы.
Subscription: мост контроля с обратным давлением
Subscription — ключ к управлению: это объект, который подписчик получает в onSubscribe.
Методы:
- request(long n): "Дай мне n элементов". Это обратное давление (backpressure) — подписчик контролирует темп, чтобы не захлебнуться.
- cancel(): "Хватит, отпишись".
Без этого publisher мог бы зафлудить подписчика данными. Например, если источник — бесконечный поток (сенсоры), без request(n) — переполнение памяти.
В примере выше request(1) делает обработку последовательной: получил — обработал — запросил следующий. Для скорости — request(Long.MAX_VALUE) (неограниченно), но осторожно: рискуете буфером.
Это решает проблемы блокировок: вместо висящих потоков, всё в event-loop. Под нагрузкой — graceful degradation (грациозная деградация): если подписчик медленный, publisher замедляется, а не падает.
Processor: промежуточный звено для трансформаций
Processor — комбо: реализует и Publisher, и Subscriber. Он как фильтр в конвейере: принимает данные от одного издателя, обрабатывает и отправляет дальше.
Пример — в цепочках Flux: map() или filter() создают процессоры внутри.
В практике вы редко пишете свой Processor — библиотеки предоставляют готовые операторы. Но понимание помогает: весь конвейер — цепь publisher → processor → ... → subscriber.
Практические советы и подводные камни
- Всегда управляйте подпиской: без request() данные не потекут (по умолчанию unbounded — неограниченно, но лучше явно).
- Обрабатывайте ошибки: onError — ваш спасатель, чтобы не потерять исключения.
- Тестируйте с TestSubscriber (в Reactor): симулируйте сценарии.
- Камень: если в onNext блокирующий код — сломаете асинхронность. Используйте Schedulers для offload (перенос на другой поток).
В реальном коде: в Spring WebFlux контроллеры возвращают Flux/Mono — publisher'ы, клиенты подписываются.
#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber
Subscriber — интерфейс для подписчика, который "слушает" издателя. Он как потребитель на конвейере: получает элементы и решает, что с ними делать.
Методы:
- onSubscribe(Subscription s): вызывается сразу после подписки. Здесь подписчик получает подписку — объект для контроля.
- onNext(T item): срабатывает на каждый элемент. Здесь обработка: логика, трансформация и т.д.
- onError(Throwable t): если ошибка — обработка исключения.
- onComplete(): поток завершён успешно.
Подписчик пассивен: не получает данные (pull), а ждёт push. Это решает callback-ад из CompletableFuture — реакции в одном месте, код чище.
Пример подписки:
publisher.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1); // Запрашиваем первый элемент
}
@Override
public void onNext(Integer item) {
System.out.println("Получено: " + item);
subscription.request(1); // Запрашиваем следующий
}
@Override
public void onError(Throwable t) {
System.err.println("Ошибка: " + t);
}
@Override
public void onComplete() {
System.out.println("Завершено");
}
});
Здесь subscribe() связывает publisher и subscriber. Данные отправляются по одному, по запросу (request(1)). Это асинхронно: код после subscribe() продолжается сразу, без ожидания.
В Reactor есть BaseSubscriber для упрощения — переопределяйте только нужные методы.
Subscription: мост контроля с обратным давлением
Subscription — ключ к управлению: это объект, который подписчик получает в onSubscribe.
Методы:
- request(long n): "Дай мне n элементов". Это обратное давление (backpressure) — подписчик контролирует темп, чтобы не захлебнуться.
- cancel(): "Хватит, отпишись".
Без этого publisher мог бы зафлудить подписчика данными. Например, если источник — бесконечный поток (сенсоры), без request(n) — переполнение памяти.
В примере выше request(1) делает обработку последовательной: получил — обработал — запросил следующий. Для скорости — request(Long.MAX_VALUE) (неограниченно), но осторожно: рискуете буфером.
Это решает проблемы блокировок: вместо висящих потоков, всё в event-loop. Под нагрузкой — graceful degradation (грациозная деградация): если подписчик медленный, publisher замедляется, а не падает.
Processor: промежуточный звено для трансформаций
Processor — комбо: реализует и Publisher, и Subscriber. Он как фильтр в конвейере: принимает данные от одного издателя, обрабатывает и отправляет дальше.
Пример — в цепочках Flux: map() или filter() создают процессоры внутри.
В практике вы редко пишете свой Processor — библиотеки предоставляют готовые операторы. Но понимание помогает: весь конвейер — цепь publisher → processor → ... → subscriber.
Практические советы и подводные камни
- Всегда управляйте подпиской: без request() данные не потекут (по умолчанию unbounded — неограниченно, но лучше явно).
- Обрабатывайте ошибки: onError — ваш спасатель, чтобы не потерять исключения.
- Тестируйте с TestSubscriber (в Reactor): симулируйте сценарии.
- Камень: если в onNext блокирующий код — сломаете асинхронность. Используйте Schedulers для offload (перенос на другой поток).
В реальном коде: в Spring WebFlux контроллеры возвращают Flux/Mono — publisher'ы, клиенты подписываются.
#Java #middle #Reactor #Reactive_Streams_API #Processor #Subscription #Subscriber
👍1
Реактивное программирование
Концепции реактивного программирования: Backpressure — что делать, если данных слишком много
Представьте: источник данных — как бурная река, а ваш подписчик — маленькая лодка. Если вода хлынет слишком быстро в лодку, она утонет.
Здесь на помощь приходит backpressure (обратное давление) — механизм, который позволяет подписчику контролировать скорость потока, говоря "дай мне столько, сколько я могу переварить". И это не просто тормоз, а умный регулятор, который предотвращает перегрузки и делает реактивные системы устойчивыми под любой нагрузкой.
Backpressure — ключевая фишка реактивного программирования, которая отличает его от традиционных подходов. В старых моделях (как потоки или Future) вы либо блокируете всё, либо тонете в очередях данных, рискуя исчерпать память или CPU. Здесь же контроль у потребителя: он решает, когда и сколько брать. Это решает проблемы callback-ада и блокировок, позволяя строить масштабируемые приложения — от мобильных до облачных кластеров.
Что такое backpressure и почему оно нужно?
Обратное давление — это способ, при котором получатель данных сигнализирует источнику: "замедлись, если я не успеваю". В реактивном мире данные передаются асинхронно, но без контроля это может привести к проблемам: если издатель генерирует 1 млн элементов в секунду, а подписчик обрабатывает только 100, буфер переполнится, и приложение упадёт с ошибкой "израсходована память" (OutOfMemoryError).
Backpressure вводит "обратную связь": подписчик запрашивает элементы порциями, а издатель выдаёт ровно столько.
Это встроено в Reactive Streams: в методе onSubscribe подписчик получает Subscription и использует request(long n) — "запроси n элементов". Если не запросить — данные не потекут. Это как шлюзы на реке: открываешь по мере нужды, избегая наводнения. В отличие от pull-модели (где вы тянете всё сразу), здесь баланс: push для динамики, но с контролем.
Пример базового использования в Flux (поток для множества элементов):
#Java #middle #Reactor #Reactive_Streams_API #backpressure
Концепции реактивного программирования: Backpressure — что делать, если данных слишком много
Представьте: источник данных — как бурная река, а ваш подписчик — маленькая лодка. Если вода хлынет слишком быстро в лодку, она утонет.
Здесь на помощь приходит backpressure (обратное давление) — механизм, который позволяет подписчику контролировать скорость потока, говоря "дай мне столько, сколько я могу переварить". И это не просто тормоз, а умный регулятор, который предотвращает перегрузки и делает реактивные системы устойчивыми под любой нагрузкой.
Backpressure — ключевая фишка реактивного программирования, которая отличает его от традиционных подходов. В старых моделях (как потоки или Future) вы либо блокируете всё, либо тонете в очередях данных, рискуя исчерпать память или CPU. Здесь же контроль у потребителя: он решает, когда и сколько брать. Это решает проблемы callback-ада и блокировок, позволяя строить масштабируемые приложения — от мобильных до облачных кластеров.
Что такое backpressure и почему оно нужно?
Обратное давление — это способ, при котором получатель данных сигнализирует источнику: "замедлись, если я не успеваю". В реактивном мире данные передаются асинхронно, но без контроля это может привести к проблемам: если издатель генерирует 1 млн элементов в секунду, а подписчик обрабатывает только 100, буфер переполнится, и приложение упадёт с ошибкой "израсходована память" (OutOfMemoryError).
Backpressure вводит "обратную связь": подписчик запрашивает элементы порциями, а издатель выдаёт ровно столько.
Это встроено в Reactive Streams: в методе onSubscribe подписчик получает Subscription и использует request(long n) — "запроси n элементов". Если не запросить — данные не потекут. Это как шлюзы на реке: открываешь по мере нужды, избегая наводнения. В отличие от pull-модели (где вы тянете всё сразу), здесь баланс: push для динамики, но с контролем.
Пример базового использования в Flux (поток для множества элементов):
import reactor.core.publisher.Flux;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Flux<Integer> fastPublisher = Flux.range(1, 1000); // Быстрый источник: 1000 элементов
fastPublisher.subscribe(new Subscriber<Integer>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription s) {
sub = s;
sub.request(5); // Сначала запрашиваем 5 элементов
}
@Override
public void onNext(Integer item) {
System.out.println("Обработано: " + item);
// Симулируем медленную обработку: Thread.sleep(100); (но в реальности избегайте блокировок!)
sub.request(1); // После каждого — запрашиваем следующий
}
@Override
public void onError(Throwable t) { /* обработка */ }
@Override
public void onComplete() { System.out.println("Готово"); }
});
Здесь подписчик контролирует темп: запросил 5 — получил 5, обработал — запросил ещё. Если не request() — поток остановится. Это асинхронно: издатель не блокируется, а ждёт сигналов.
#Java #middle #Reactor #Reactive_Streams_API #backpressure
👍2
Стратегии обработки backpressure: что если запросов мало?
Не всегда подписчик может обрабатывать всё. Reactive Streams не диктует, что делать при переполнении — это на усмотрение реализации (как Reactor).
Вот стратегии, чтобы не упасть:
- BUFFER (буферизация): Копирует лишние элементы в очередь (буфер). Полезно для временных пиков, но рискуете памятью при бесконечном потоке. В Reactor: .onBackpressureBuffer().
- DROP (сброс): Игнорирует лишние элементы, пока подписчик не запросит. Для сценариев, где свежие данные важнее старых (например, мониторинг). .onBackpressureDrop().
- LATEST (последний): Сохраняет только самый свежий элемент, сбрасывая предыдущие. Идеально для UI-обновлений (как курс валют). .onBackpressureLatest().
- ERROR (ошибка): Если буфер полон — кидает исключение (IllegalStateException). Для строгих систем, где потеря данных недопустима. .onBackpressureError().
Пример с DROP в Reactor:
Практические советы и подводные камни
- Всегда вызывайте request() в onSubscribe, иначе ничего не потечёт. Для неограниченного — request(Long.MAX_VALUE), но только если уверены в памяти.
- Избегайте блокировок в onNext: если обработка медленная, offload на другой Scheduler (в Reactor: .publishOn(Schedulers.parallel())).
- Тестируйте под нагрузкой: используйте TestPublisher/TestSubscriber для симуляции быстрых/медленных сценариев.
- Камень: бесконечные потоки без стратегии — рецепт OOM. Всегда добавляйте .onBackpressureBuffer(размер) или drop.
В реальной жизни: в Spring WebFlux backpressure работает сквозь стек — от БД (reactive драйверы) до клиента. Например, стриминг больших файлов: клиент запрашивает chunks, сервер толкает по мере.
#Java #middle #Reactor #Reactive_Streams_API #backpressure
Не всегда подписчик может обрабатывать всё. Reactive Streams не диктует, что делать при переполнении — это на усмотрение реализации (как Reactor).
Вот стратегии, чтобы не упасть:
- BUFFER (буферизация): Копирует лишние элементы в очередь (буфер). Полезно для временных пиков, но рискуете памятью при бесконечном потоке. В Reactor: .onBackpressureBuffer().
- DROP (сброс): Игнорирует лишние элементы, пока подписчик не запросит. Для сценариев, где свежие данные важнее старых (например, мониторинг). .onBackpressureDrop().
- LATEST (последний): Сохраняет только самый свежий элемент, сбрасывая предыдущие. Идеально для UI-обновлений (как курс валют). .onBackpressureLatest().
- ERROR (ошибка): Если буфер полон — кидает исключение (IllegalStateException). Для строгих систем, где потеря данных недопустима. .onBackpressureError().
Пример с DROP в Reactor:
Flux.interval(Duration.ofMillis(1)) // Бесконечный поток: элемент каждую мс
.onBackpressureDrop(dropped -> System.out.println("Сброшено: " + dropped)) // Стратегия: сбрасываем лишнее
.subscribe(item -> {
try { Thread.sleep(100); } catch (InterruptedException e) {} // Медленный подписчик
System.out.println("Обработано: " + item);
});
Здесь быстрый Flux "замедляется" под подписчика: лишние элементы сбрасываются, система не падает.
Ещё опция — cancel(): подписчик может полностью отменить подписку, если данных слишком много или не нужно.
Практические советы и подводные камни
- Всегда вызывайте request() в onSubscribe, иначе ничего не потечёт. Для неограниченного — request(Long.MAX_VALUE), но только если уверены в памяти.
- Избегайте блокировок в onNext: если обработка медленная, offload на другой Scheduler (в Reactor: .publishOn(Schedulers.parallel())).
- Тестируйте под нагрузкой: используйте TestPublisher/TestSubscriber для симуляции быстрых/медленных сценариев.
- Камень: бесконечные потоки без стратегии — рецепт OOM. Всегда добавляйте .onBackpressureBuffer(размер) или drop.
В реальной жизни: в Spring WebFlux backpressure работает сквозь стек — от БД (reactive драйверы) до клиента. Например, стриминг больших файлов: клиент запрашивает chunks, сервер толкает по мере.
#Java #middle #Reactor #Reactive_Streams_API #backpressure
👍3
Введение в GraphQL
GraphQL — это язык запросов для приложения, который позволяет клиентам (например, мобильным приложениям или веб-сайтам) запрашивать ровно те данные, которые им нужны, без лишнего. Он был создан компанией Facebook в 2012 году и стал открытым стандартом в 2015. В отличие от традиционных подходов, где сервер диктует, какие данные отдавать, здесь клиент сам формирует запрос. Это делает систему гибкой и эффективной.
Почему GraphQL вместо привычных методов?
Представь, что у тебя есть интернет-магазин. В старом подходе, называемом REST (это аббревиатура от "Representational State Transfer", что значит "передача состояния представления" — способ организации API, где каждый запрос идёт по фиксированному адресу), ты бы создал отдельные точки входа: одну для списка товаров, другую для деталей пользователя, третью для отзывов. Клиент запрашивает всё сразу, даже если ему нужно только имя товара и цена — и получает кучу лишних данных. Это тратит трафик и замедляет приложение.
GraphQL решает это одной точкой входа (обычно /graphql). Клиент пишет запрос на языке, похожем на JSON, указывая, какие поля нужны. Сервер возвращает только их.
Плюсы:
Меньше запросов: Можно получить данные из нескольких источников за один раз.
Типизация: Всё строго описано в схеме (это как каркас данных, где указано, какие типы полей и как они связаны).
Версионирование не нужно: Клиенты сами выбирают, что запрашивать, без изменений в API.
Интроспекция: Клиент может "спросить" сервер о доступных данных.
В экосистеме Java GraphQL интегрируется легко, особенно с Spring Boot — это фреймворк для быстрой разработки приложений на Java, который упрощает настройку серверов. Есть библиотека graphql-java, но для Spring лучше использовать spring-graphql — она берёт на себя интеграцию.
Настройка проекта: Шаги для старта
Давай создадим простой проект. Предполагаем, у тебя есть Java 17+ и Maven (система сборки проектов). Если новичок, скачай Spring Initializr с сайта spring.io — это онлайн-генератор проектов.
Создай проект Spring Boot с зависимостями:
Spring Web (для HTTP-сервера).
Spring GraphQL (для поддержки GraphQL).
Spring Data JPA (если нужно база данных, для хранения данных).
H2 Database (встроенная база для тестов).
В файле pom.xml (это конфигурация Maven) добавь:
Определи схему. Схема — это файл с описанием типов данных.
Создай файл schema.graphqls в resources/graphql:
Создай модель данных. В Java это классы с аннотациями (метками для фреймворка).
Для книги:
#Java #middle #on_request #GraphQL
GraphQL — это язык запросов для приложения, который позволяет клиентам (например, мобильным приложениям или веб-сайтам) запрашивать ровно те данные, которые им нужны, без лишнего. Он был создан компанией Facebook в 2012 году и стал открытым стандартом в 2015. В отличие от традиционных подходов, где сервер диктует, какие данные отдавать, здесь клиент сам формирует запрос. Это делает систему гибкой и эффективной.
Почему GraphQL вместо привычных методов?
Представь, что у тебя есть интернет-магазин. В старом подходе, называемом REST (это аббревиатура от "Representational State Transfer", что значит "передача состояния представления" — способ организации API, где каждый запрос идёт по фиксированному адресу), ты бы создал отдельные точки входа: одну для списка товаров, другую для деталей пользователя, третью для отзывов. Клиент запрашивает всё сразу, даже если ему нужно только имя товара и цена — и получает кучу лишних данных. Это тратит трафик и замедляет приложение.
GraphQL решает это одной точкой входа (обычно /graphql). Клиент пишет запрос на языке, похожем на JSON, указывая, какие поля нужны. Сервер возвращает только их.
Плюсы:
Меньше запросов: Можно получить данные из нескольких источников за один раз.
Типизация: Всё строго описано в схеме (это как каркас данных, где указано, какие типы полей и как они связаны).
Версионирование не нужно: Клиенты сами выбирают, что запрашивать, без изменений в API.
Интроспекция: Клиент может "спросить" сервер о доступных данных.
В экосистеме Java GraphQL интегрируется легко, особенно с Spring Boot — это фреймворк для быстрой разработки приложений на Java, который упрощает настройку серверов. Есть библиотека graphql-java, но для Spring лучше использовать spring-graphql — она берёт на себя интеграцию.
Настройка проекта: Шаги для старта
Давай создадим простой проект. Предполагаем, у тебя есть Java 17+ и Maven (система сборки проектов). Если новичок, скачай Spring Initializr с сайта spring.io — это онлайн-генератор проектов.
Создай проект Spring Boot с зависимостями:
Spring Web (для HTTP-сервера).
Spring GraphQL (для поддержки GraphQL).
Spring Data JPA (если нужно база данных, для хранения данных).
H2 Database (встроенная база для тестов).
В файле pom.xml (это конфигурация Maven) добавь:
xml<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-graphql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
Определи схему. Схема — это файл с описанием типов данных.
Создай файл schema.graphqls в resources/graphql:
graphqltype Query {
bookById(id: ID): Book
allBooks: [Book]
}
type Book {
id: ID
title: String
author: Author
}
type Author {
id: ID
name: String
books: [Book]
}
Здесь Query — это корневой тип для запросов. ID — уникальный идентификатор, String — текст. [Book] значит список книг. Это позволяет запрашивать книги с авторами, и сервер сам свяжет данные.
Создай модель данных. В Java это классы с аннотациями (метками для фреймворка).
Для книги:
javaimport jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.ManyToOne;
@Entity
public class Book {
@Id
private Long id;
private String title;
@ManyToOne
private Author author;
// Геттеры и сеттеры (методы для чтения и записи полей)
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public Author getAuthor() { return author; }
public void setAuthor(Author author) { this.author = author; }
}
#Java #middle #on_request #GraphQL
🔥3👍1
Репозитории для доступа к данным. Spring Data упрощает это:
Резолверы: Где происходит магия
Резолверы — это методы, которые "разрешают" запросы, то есть вычисляют данные.
В Spring создай класс:
Чтобы связать автора с книгами, добавь резолвер для типа Book:
Запуск и тестирование
В основном классе приложения (обычно с @SpringBootApplication) ничего менять не нужно — Spring сам настроит /graphql.
Запусти приложение:
Теперь протестируй в инструменте вроде GraphiQL (встроен в Spring GraphQL, доступен по /graphiql).
Пример запроса:
Сервер вернёт:
Только то, что запрошено! Если нужно мутации (изменения данных), добавь type Mutation в схему и резолверы для createBook и т.д.
Продвинутые советы для опытных
Обработка ошибок: Используй GraphQLError для кастомных сообщений.
Производительность: Добавь DataLoader для batch-запросов, чтобы избежать N+1 проблемы (когда для списка из N элементов делается N запросов в базу).
Безопасность: Включи аутентификацию с Spring Security, ограничи поля по ролям.
Интеграция с другими сервисами: GraphQL может агрегировать данные из микросервисов.
Пример с DataLoader для оптимизации:
Сначала добавь зависимость graphql-java-tools.
Затем в конфигурации:
В резолвере используй контекст для загрузки.
Это делает систему масштабируемой.
#Java #middle #on_request #GraphQL
javaimport org.springframework.data.jpa.repository.JpaRepository;
public interface BookRepository extends JpaRepository<Book, Long> {
}
Это интерфейс, который автоматически генерирует методы вроде findById.
Резолверы: Где происходит магия
Резолверы — это методы, которые "разрешают" запросы, то есть вычисляют данные.
В Spring создай класс:
javaimport graphql.kickstart.tools.GraphQLQueryResolver;
import org.springframework.beans.factory.annotation.Autowired;
public class QueryResolver implements GraphQLQueryResolver {
@Autowired
private BookRepository bookRepository;
public Book bookById(Long id) {
return bookRepository.findById(id).orElse(null); // Возвращает книгу или null, если не найдена
}
public List<Book> allBooks() {
return bookRepository.findAll(); // Все книги
}
}
@Autowired — это инъекция зависимости, Spring сам подставит репозиторий. Для автора аналогично создай AuthorResolver для поля books.
Чтобы связать автора с книгами, добавь резолвер для типа Book:
javaimport graphql.kickstart.tools.GraphQLResolver;
public class BookResolver implements GraphQLResolver<Book> {
public Author author(Book book) {
return book.getAuthor(); // Просто возвращает автора из модели
}
}
Это позволяет в запросе GraphQL получить автора без дополнительных усилий.
Запуск и тестирование
В основном классе приложения (обычно с @SpringBootApplication) ничего менять не нужно — Spring сам настроит /graphql.
Запусти приложение:
mvn spring-boot:run.
Теперь протестируй в инструменте вроде GraphiQL (встроен в Spring GraphQL, доступен по /graphiql).
Пример запроса:
graphqlquery {
bookById(id: 1) {
title
author {
name
}
}
}
Сервер вернёт:
json{
"data": {
"bookById": {
"title": "Война и мир",
"author": {
"name": "Лев Толстой"
}
}
}
}
Только то, что запрошено! Если нужно мутации (изменения данных), добавь type Mutation в схему и резолверы для createBook и т.д.
Продвинутые советы для опытных
Обработка ошибок: Используй GraphQLError для кастомных сообщений.
Производительность: Добавь DataLoader для batch-запросов, чтобы избежать N+1 проблемы (когда для списка из N элементов делается N запросов в базу).
Безопасность: Включи аутентификацию с Spring Security, ограничи поля по ролям.
Интеграция с другими сервисами: GraphQL может агрегировать данные из микросервисов.
Пример с DataLoader для оптимизации:
Сначала добавь зависимость graphql-java-tools.
Затем в конфигурации:
@Bean
public DataLoaderRegistry dataLoaderRegistry(BookRepository bookRepo) {
DataLoaderRegistry registry = new DataLoaderRegistry();
registry.register("authorLoader", DataLoader.newDataLoader((List<Long> authorIds) ->
CompletableFuture.supplyAsync(() -> bookRepo.findAuthorsByIds(authorIds)))); // Batch-запрос
return registry;
}
В резолвере используй контекст для загрузки.
Это делает систему масштабируемой.
#Java #middle #on_request #GraphQL
🔥3👍1
Знакомство с Project Reactor: Mono и Flux
Project Reactor — это не просто обёртка, а полноценный фреймворк от команды Spring, оптимизированный для производительности. Он использует неблокирующий подход: всё работает на основе событий, с минимальным потреблением ресурсов. Ключевые типы — Mono и Flux, которые воплощают потоки данных. Mono для случаев с нуля или одним элементом (как одиночный запрос), Flux для последовательностей (как стриминг). Они реализуют Publisher из Reactive Streams, так что поддерживают подписку, реакции и обратное давление.
Что такое Project Reactor и как начать?
Project Reactor — библиотека для реактивного программирования, которая строит на Reactive Streams. Она предоставляет API для создания, трансформации и потребления асинхронных потоков. Под капотом — эффективный scheduler (планировщик задач), который распределяет работу по потокам без блокировок: использует event-loop для IO и параллельные пулы для вычислений.
Чтобы начать добавьте в pom.xml (Maven):
Импортируйте:
Reactor интегрируется с Spring (WebFlux), но работает standalone. Главное преимущество: код становится декларативным — вы описываете "что" (поток и реакции), а не "как" (ожидания и циклы). Это решает callback-ад: цепочки читаемы, как последовательный код, но асинхронны.
Mono: поток для нуля или одного элемента
Mono — это тип для сценариев, где ожидается максимум один результат: успех (элемент), ошибка или ничего. Идеален для HTTP-запросов, чтения записи из БД или вычислений с одиночным исходом. Mono реализует Publisher, так что вы можете подписаться и реагировать.
Создание Mono: используйте статические методы.
- Mono.just(value): из готового значения.
- Mono.empty(): пустой поток (завершится onComplete без onNext).
- Mono.fromCallable(() -> compute()): из синхронного вызова.
- Mono.defer(() -> createMono()): ленивое создание (выполняется при подписке).
Пример простого Mono:
Асинхронный пример: симулируем задержку.
Почему Mono лучше Future/CompletableFuture?
Нет блокирующего get(): результат приходит в onNext. Нет callback-ада: цепочки через операторы (map, flatMap — разберём в следующих постах). Под нагрузкой: тысячи Mono на одном потоке, без исчерпания пула.
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
Project Reactor — это не просто обёртка, а полноценный фреймворк от команды Spring, оптимизированный для производительности. Он использует неблокирующий подход: всё работает на основе событий, с минимальным потреблением ресурсов. Ключевые типы — Mono и Flux, которые воплощают потоки данных. Mono для случаев с нуля или одним элементом (как одиночный запрос), Flux для последовательностей (как стриминг). Они реализуют Publisher из Reactive Streams, так что поддерживают подписку, реакции и обратное давление.
Что такое Project Reactor и как начать?
Project Reactor — библиотека для реактивного программирования, которая строит на Reactive Streams. Она предоставляет API для создания, трансформации и потребления асинхронных потоков. Под капотом — эффективный scheduler (планировщик задач), который распределяет работу по потокам без блокировок: использует event-loop для IO и параллельные пулы для вычислений.
Чтобы начать добавьте в pom.xml (Maven):
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.0</version> <!-- версия может быть неактуальной -->
</dependency>
Импортируйте:
import reactor.core.publisher.Mono; import reactor.core.publisher.Flux;
Reactor интегрируется с Spring (WebFlux), но работает standalone. Главное преимущество: код становится декларативным — вы описываете "что" (поток и реакции), а не "как" (ожидания и циклы). Это решает callback-ад: цепочки читаемы, как последовательный код, но асинхронны.
Mono: поток для нуля или одного элемента
Mono — это тип для сценариев, где ожидается максимум один результат: успех (элемент), ошибка или ничего. Идеален для HTTP-запросов, чтения записи из БД или вычислений с одиночным исходом. Mono реализует Publisher, так что вы можете подписаться и реагировать.
Создание Mono: используйте статические методы.
- Mono.just(value): из готового значения.
- Mono.empty(): пустой поток (завершится onComplete без onNext).
- Mono.fromCallable(() -> compute()): из синхронного вызова.
- Mono.defer(() -> createMono()): ленивое создание (выполняется при подписке).
Пример простого Mono:
Mono<String> simpleMono = Mono.just("Привет из Reactor");
Подписка: subscribe() вызывает реакции.
simpleMono.subscribe(
value -> System.out.println("Значение: " + value), // onNext: реакция на элемент
error -> System.err.println("Ошибка: " + error), // onError
() -> System.out.println("Завершено") // onComplete
);
Здесь: "Значение: Привет из Reactor" и "Завершено". Подписка асинхронна — код после subscribe() идёт сразу, без ожидания. Если ошибка (например, Mono.error(new RuntimeException("Бум"))), сработает onError.
Асинхронный пример: симулируем задержку.
Mono<String> asyncMono = Mono.delay(Duration.ofSeconds(1)).map(ignore -> "Готово после секунды");
asyncMono.subscribe(System.out::println); // Вывод после 1 сек
Почему Mono лучше Future/CompletableFuture?
Нет блокирующего get(): результат приходит в onNext. Нет callback-ада: цепочки через операторы (map, flatMap — разберём в следующих постах). Под нагрузкой: тысячи Mono на одном потоке, без исчерпания пула.
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
👍3
Flux: поток для нуля, одного или множества элементов
Flux — для последовательностей: от конечных (список) до бесконечных (стриминг). Может быть пустым, с одним элементом (как Mono) или миллионами. Flux тоже Publisher, поддерживает backpressure.
Создание Flux:
- Flux.just(a, b, c): из значений.
- Flux.fromIterable(list): из коллекции.
- Flux.range(start, count): последовательность чисел.
- Flux.interval(Duration): бесконечный таймер.
- Flux.generate(sink -> { sink.next(value); sink.complete(); }): генератор с состоянием.
Пример базового Flux:
С backpressure используйте BaseSubscriber для контроля.
Асинхронный Flux: стриминг с задержкой.
Почему Flux лучше потоков или циклов?
Масштабируемость: обрабатывает миллионы элементов без лишних ресурсов. Реактивность: реагируйте на каждый элемент, без буферов в памяти. Интеграция с обратным давлением: если подписчик медленный, Flux замедляется.
Практические советы и подводные камни
- Ленивость: Mono/Flux не выполняются без subscribe(). Полезно для отложенного вычисления.
- Ошибки: всегда обрабатывайте onError, иначе они "проглатываются".
- Блокировки: избегайте Thread.sleep() в реакциях — используйте delayElements() для асинхронных пауз.
- Тестирование: используйте StepVerifier из reactor-test: StepVerifier.create(flux).expectNext(1,2).verifyComplete();
- Камень: бесконечный Flux без take() или limitRequest() — рискуете памятью. Добавляйте .onBackpressureBuffer() или drop.
В реальной практике: в WebFlux контроллер возвращает Mono<Response> для GET, Flux<Event> для SSE (сервер-сент события).
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
Flux — для последовательностей: от конечных (список) до бесконечных (стриминг). Может быть пустым, с одним элементом (как Mono) или миллионами. Flux тоже Publisher, поддерживает backpressure.
Создание Flux:
- Flux.just(a, b, c): из значений.
- Flux.fromIterable(list): из коллекции.
- Flux.range(start, count): последовательность чисел.
- Flux.interval(Duration): бесконечный таймер.
- Flux.generate(sink -> { sink.next(value); sink.complete(); }): генератор с состоянием.
Пример базового Flux:
Flux<Integer> numbersFlux = Flux.range(1, 5);
numbersFlux.subscribe(
value -> System.out.println("Элемент: " + value), // onNext для каждого
error -> System.err.println("Ошибка: " + error),
() -> System.out.println("Завершено")
);
Вывод: 1, 2, 3, 4, 5 и "Завершено". Асинхронно: элементы "текут" по мере готовности.
С backpressure используйте BaseSubscriber для контроля.
numbersFlux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(2); // Запрашиваем по 2 элемента
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Получено: " + value);
request(1); // После обработки — следующий
}
});
Это предотвращает перегрузку: Flux выдаёт элементы порциями.
Асинхронный Flux: стриминг с задержкой.
Flux<Long> tickingFlux = Flux.interval(Duration.ofMillis(500)).take(5); // 5 тиков каждые 0.5 сек
tickingFlux.subscribe(System.out::println); // 0, 1, 2, 3, 4 с паузами
Почему Flux лучше потоков или циклов?
Масштабируемость: обрабатывает миллионы элементов без лишних ресурсов. Реактивность: реагируйте на каждый элемент, без буферов в памяти. Интеграция с обратным давлением: если подписчик медленный, Flux замедляется.
Практические советы и подводные камни
- Ленивость: Mono/Flux не выполняются без subscribe(). Полезно для отложенного вычисления.
- Ошибки: всегда обрабатывайте onError, иначе они "проглатываются".
- Блокировки: избегайте Thread.sleep() в реакциях — используйте delayElements() для асинхронных пауз.
- Тестирование: используйте StepVerifier из reactor-test: StepVerifier.create(flux).expectNext(1,2).verifyComplete();
- Камень: бесконечный Flux без take() или limitRequest() — рискуете памятью. Добавляйте .onBackpressureBuffer() или drop.
В реальной практике: в WebFlux контроллер возвращает Mono<Response> для GET, Flux<Event> для SSE (сервер-сент события).
#Java #middle #Reactor #Reactive_Streams_API #Mono #Flux
👍4
Реактивное программирование
Подписка и жизненный цикл в Reactor: onNext, onError, onComplete
Reactor делает код декларативным — вы описываете поток событий, а не управляете ожиданиями вручную.
Но чтобы потоки "ожили", нужна подписка: это момент, когда издатель начинает передавать данные, а подписчик реагирует. Подписка и жизненный цикл в Reactor - это фундамент: без понимания, как срабатывают onNext (реакция на элемент), onError (обработка ошибки) и onComplete (завершение), ваши реактивные конвейеры останутся мёртвыми.
Представьте жизненный цикл как этапы реки: подписка — запуск потока, onNext — течение воды, onError — буря, onComplete — устье.
Подписка — это связь между издателем (Publisher, как Mono/Flux) и подписчиком (Subscriber). Она запускает поток данных асинхронно, без блокировок. Жизненный цикл определяет порядок событий: от старта до конца или ошибки. Это решает проблемы из первого поста — вместо callback-ада и ручных get(), вы получаете структурированные реакции. Всё построено на Reactive Streams API, с обратным давлением.
Подписка: запуск потока данных
В Reactor Mono или Flux — ленивые: они ничего не делают, пока не подпишетесь. Метод subscribe() — это триггер: он регистрирует подписчика и начинает передавать события (push, из поста 3).
Подписка асинхронна: после subscribe() код продолжается сразу, без ожидания завершения.
Базовые варианты subscribe():
subscribe(): без параметров — данные "проглатываются", но поток запускается. Полезно для fire-and-forget (запустил и забыл).
subscribe(Consumer<T> onNext): только реакция на элементы.
subscribe(Consumer<T> onNext, Consumer<Throwable> onError): плюс обработка ошибок.
subscribe(Consumer<T> onNext, Consumer<Throwable> onError, Runnable onComplete): полный цикл.
subscribe(Subscriber<T> subscriber): кастомный подписчик для полного контроля (с onSubscribe для Subscription).
Пример простейшей подписки на Flux:
Почему это лучше традиционных? В CompletableFuture вы цепляете thenApply/thenAccept, но рискуете вложенностью. В Reactor subscribe() — точка входа, а реакции — в одном месте. Плюс, подписка возвращает Disposable: объект для отмены (dispose()) в любой момент.
Пример:
#Java #middle #Reactor #Reactive_Streams_API #onNext #onError #onComplet
Подписка и жизненный цикл в Reactor: onNext, onError, onComplete
Reactor делает код декларативным — вы описываете поток событий, а не управляете ожиданиями вручную.
Но чтобы потоки "ожили", нужна подписка: это момент, когда издатель начинает передавать данные, а подписчик реагирует. Подписка и жизненный цикл в Reactor - это фундамент: без понимания, как срабатывают onNext (реакция на элемент), onError (обработка ошибки) и onComplete (завершение), ваши реактивные конвейеры останутся мёртвыми.
Представьте жизненный цикл как этапы реки: подписка — запуск потока, onNext — течение воды, onError — буря, onComplete — устье.
Подписка — это связь между издателем (Publisher, как Mono/Flux) и подписчиком (Subscriber). Она запускает поток данных асинхронно, без блокировок. Жизненный цикл определяет порядок событий: от старта до конца или ошибки. Это решает проблемы из первого поста — вместо callback-ада и ручных get(), вы получаете структурированные реакции. Всё построено на Reactive Streams API, с обратным давлением.
Подписка: запуск потока данных
В Reactor Mono или Flux — ленивые: они ничего не делают, пока не подпишетесь. Метод subscribe() — это триггер: он регистрирует подписчика и начинает передавать события (push, из поста 3).
Подписка асинхронна: после subscribe() код продолжается сразу, без ожидания завершения.
Базовые варианты subscribe():
subscribe(): без параметров — данные "проглатываются", но поток запускается. Полезно для fire-and-forget (запустил и забыл).
subscribe(Consumer<T> onNext): только реакция на элементы.
subscribe(Consumer<T> onNext, Consumer<Throwable> onError): плюс обработка ошибок.
subscribe(Consumer<T> onNext, Consumer<Throwable> onError, Runnable onComplete): полный цикл.
subscribe(Subscriber<T> subscriber): кастомный подписчик для полного контроля (с onSubscribe для Subscription).
Пример простейшей подписки на Flux:
import reactor.core.publisher.Flux;
Flux<String> fruitsFlux = Flux.just("яблоко", "банан", "вишня");
fruitsFlux.subscribe(
fruit -> System.out.println("Съедено: " + fruit), // onNext: реакция на каждый элемент
error -> System.err.println("Проблема: " + error.getMessage()), // onError: если ошибка
() -> System.out.println("Фрукты закончились") // onComplete: завершение
);
Вывод: "Съедено: яблоко", "Съедено: банан", "Съедено: вишня", "Фрукты закончились". Если добавить ошибку — Flux.just("яблоко").concatWith(Flux.error(new RuntimeException("Гнилой фрукт"))) — сработает onError, и onComplete не вызовется.
Почему это лучше традиционных? В CompletableFuture вы цепляете thenApply/thenAccept, но рискуете вложенностью. В Reactor subscribe() — точка входа, а реакции — в одном месте. Плюс, подписка возвращает Disposable: объект для отмены (dispose()) в любой момент.
Пример:
Disposable disposable = fruitsFlux.subscribe(...);
disposable.dispose(); // Отмена: поток остановится, onComplete не сработает.
Это полезно для UI или долгоживущих потоков: отпишись, когда компонент уничтожен, чтобы избежать утечек памяти.
#Java #middle #Reactor #Reactive_Streams_API #onNext #onError #onComplet
👍2
Жизненный цикл: этапы от старта до финиша
Жизненный цикл в Reactive Streams — последовательность вызовов: onSubscribe → (onNext)* → (onError | onComplete). Это правило: после onError или onComplete ничего не будет, и поток считается завершённым.
onSubscribe(Subscription s): первый вызов после subscribe(). Здесь подписчик получает Subscription для контроля (request(n) для backpressure или cancel()). Без request() данные не потекут — это защита от перегрузки.
onNext(T item): для каждого элемента. Здесь основная логика: обработка, логирование, трансформация. Может вызываться много раз (в Flux) или раз/никогда (в Mono).
Важно: держите onNext быстрым и неблокирующим — если медленный, перенесите на отдельный планировщик (Schedulers).
onError(Throwable t): если ошибка (исключение). Поток прерывается, onComplete не сработает. Обработайте, чтобы не потерять: логируйте, retry (повторите) или fallback (запасной вариант).
onComplete(): успешное завершение. Нет элементов после, но сигнал, что всё ок.
Пример полного цикла с кастомным подписчиком (BaseSubscriber в Reactor упрощает):
Обработка ошибок и завершения: стратегии для устойчивости
Ошибки — часть жизни: сеть упала, БД не ответила. В Reactor onError — ваш щит. Не игнорируйте: используйте операторы для восстановления.
doOnError(Consumer<Throwable>): дополнительная реакция перед onError подписчика.
onErrorReturn(T value): fallback — верни значение вместо ошибки.
onErrorResume(Function<Throwable, Publisher<T>>): замени на другой поток.
retry(long times): повтори попытку.
Пример с восстановлением:
Практические советы и подводные камни
Всегда реализуйте все методы в Subscriber: иначе дефолтные могут "проглотить" ошибки (onError кидает RuntimeException, если не переопределён).
Используйте Hooks: doOnSubscribe, doOnNext для логирования без изменения потока.
Отмена: dispose() не гарантирует мгновенную остановку — upstream (источник) может продолжить, но данные не дойдут.
Камень: в onNext избегайте блокировок (sleep, IO) — используйте publishOn(Schedulers.boundedElastic()) для переноса.
Тестирование: StepVerifier.create(flux).expectSubscription().expectNext(1,2).expectError().verify(); — проверяет цикл.
В практике: в WebFlux сервис возвращает Flux, клиент subscribe() в контроллере — реакции на события в реальном времени.
#Java #middle #Reactor #Reactive_Streams_API #onNext #onError #onComplet
Жизненный цикл в Reactive Streams — последовательность вызовов: onSubscribe → (onNext)* → (onError | onComplete). Это правило: после onError или onComplete ничего не будет, и поток считается завершённым.
onSubscribe(Subscription s): первый вызов после subscribe(). Здесь подписчик получает Subscription для контроля (request(n) для backpressure или cancel()). Без request() данные не потекут — это защита от перегрузки.
onNext(T item): для каждого элемента. Здесь основная логика: обработка, логирование, трансформация. Может вызываться много раз (в Flux) или раз/никогда (в Mono).
Важно: держите onNext быстрым и неблокирующим — если медленный, перенесите на отдельный планировщик (Schedulers).
onError(Throwable t): если ошибка (исключение). Поток прерывается, onComplete не сработает. Обработайте, чтобы не потерять: логируйте, retry (повторите) или fallback (запасной вариант).
onComplete(): успешное завершение. Нет элементов после, но сигнал, что всё ок.
Пример полного цикла с кастомным подписчиком (BaseSubscriber в Reactor упрощает):
import reactor.core.publisher.BaseSubscriber;
Flux<Integer> numbersFlux = Flux.range(1, 10).map(i -> {
if (i == 5) throw new RuntimeException("Ошибка на 5"); // Симулируем ошибку
return i;
});
numbersFlux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Подписка готова");
request(3); // Запрашиваем первые 3
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("Элемент: " + value);
request(1); // Запрашиваем по одному дальше
}
@Override
protected void hookOnError(Throwable throwable) {
System.err.println("Ошибка: " + throwable.getMessage());
}
@Override
protected void hookOnComplete() {
System.out.println("Цикл завершён");
}
});
Вывод: "Подписка готова", "Элемент: 1", "Элемент: 2", "Элемент: 3", "Элемент: 4", "Ошибка: Ошибка на 5". onComplete не сработает, потому что ошибка. Без ошибки — все 10 элементов и "Цикл завершён".
Это демонстрирует контроль: request() управляет темпом, как в backpressure (пост 5). Если не request() — только "Подписка готова".
Обработка ошибок и завершения: стратегии для устойчивости
Ошибки — часть жизни: сеть упала, БД не ответила. В Reactor onError — ваш щит. Не игнорируйте: используйте операторы для восстановления.
doOnError(Consumer<Throwable>): дополнительная реакция перед onError подписчика.
onErrorReturn(T value): fallback — верни значение вместо ошибки.
onErrorResume(Function<Throwable, Publisher<T>>): замени на другой поток.
retry(long times): повтори попытку.
Пример с восстановлением:
Mono<String> riskyMono = Mono.fromCallable(() -> {
if (Math.random() > 0.5) throw new RuntimeException("Сбой");
return "Успех";
}).onErrorReturn("Fallback").retry(2); // Retry 2 раза
riskyMono.subscribe(
System.out::println,
error -> System.out.println("Не удалось: " + error),
() -> System.out.println("Завершено")
);
Если сбой — retry, потом fallback. onComplete сработает только при успехе или fallback.
Для onComplete: используйте doFinally(Runnable) — сработает всегда, после onComplete или onError. Полезно для закрытия ресурсов: doFinally(() -> connection.close()).
Практические советы и подводные камни
Всегда реализуйте все методы в Subscriber: иначе дефолтные могут "проглотить" ошибки (onError кидает RuntimeException, если не переопределён).
Используйте Hooks: doOnSubscribe, doOnNext для логирования без изменения потока.
Отмена: dispose() не гарантирует мгновенную остановку — upstream (источник) может продолжить, но данные не дойдут.
Камень: в onNext избегайте блокировок (sleep, IO) — используйте publishOn(Schedulers.boundedElastic()) для переноса.
Тестирование: StepVerifier.create(flux).expectSubscription().expectNext(1,2).expectError().verify(); — проверяет цикл.
В практике: в WebFlux сервис возвращает Flux, клиент subscribe() в контроллере — реакции на события в реальном времени.
#Java #middle #Reactor #Reactive_Streams_API #onNext #onError #onComplet
👍2