Java for Beginner
742 subscribers
708 photos
196 videos
12 files
1.14K links
Канал от новичков для новичков!
Изучайте Java вместе с нами!
Здесь мы обмениваемся опытом и постоянно изучаем что-то новое!

Наш YouTube канал - https://www.youtube.com/@Java_Beginner-Dev

Наш канал на RUTube - https://rutube.ru/channel/37896292/
Download Telegram
Минимизация attack surface (удаление ненужных слоёв)

Техники:
1. Многоступенчатая сборка (multi-stage build):
   # Stage 1: Сборка
FROM maven:3.8.6 AS builder
COPY . .
RUN mvn package

# Stage 2: Финальный образ
FROM openjdk:17-jre-slim
COPY --from=builder target/myapp.jar /app.jar

Промежуточные слои (Maven, исходники) не попадают в финальный образ.

2. Очистка кэша в том же слое:
   RUN apt-get update && \
apt-get install -y curl && \
rm -rf /var/lib/apt/lists/*

- Если очистка в отдельном слое, размер образа не уменьшится (слой сохранится в истории).

3. Удаление ненужных файлов:
   RUN rm -rf $JAVA_HOME/jmods/java.corba.jmod

- Некоторые модули (CORBA, RMI) не используются в современных приложениях.


Performance tuning: настройка под контейнеры

CPU/Memory ограничения (--cpus, --memory)
Как это работает:
- Docker использует cgroups v2 для ограничения ресурсов:
  docker run -m 512m --cpus=1.5 myapp

- -m 512m — лимит памяти (записывается в /sys/fs/cgroup/memory.max),
- --cpus=1.5 — лимит CPU (эквивалентно --cpu-period=100000 --cpu-quota=150000).


Нюансы для JVM:
- По умолчанию JVM не видит лимиты cgroups и выделяет память, равную объему хоста.
- Решение:
  environment:
JAVA_TOOL_OPTIONS: "-XX:MaxRAMPercentage=75.0" //указывает JVM использовать 75% от лимита cgroups.


Проверка лимитов
:
docker exec myapp cat /sys/fs/cgroup/memory.max
docker exec myapp cat /sys/fs/cgroup/cpu.max



Thread-pool tuning под контейнеры

Проблема:
- Стандартные пулы потоков (например, ForkJoinPool) используют Runtime.getRuntime().availableProcessors(),
- В контейнере это значение равно количеству CPU хоста, а не лимиту (--cpus).


Решение:
1. Для Spring Boot:
   # application.properties
spring.task.execution.pool.core-size=4
spring.task.execution.pool.max-size=8


2. Для ванильного Java:
   int availableCpus = (int) Math.min(
Runtime.getRuntime().availableProcessors(),
Double.parseDouble(System.getenv("CPU_LIMIT"))
);
ExecutorService executor = Executors.newFixedThreadPool(availableCpus * 2);


Как получить лимит CPU:
public static int getContainerCpuLimit() {
try (BufferedReader reader = new BufferedReader(
new FileReader("/sys/fs/cgroup/cpu.max"))) {
String line = reader.readLine();
String[] parts = line.split(" ");
long quota = Long.parseLong(parts[0]);
long period = Long.parseLong(parts[1]);
return (int) (quota / period); // Например, 1.5 → 1
}
}



JVM-параметры для контейнеров

Критические параметры:
environment:
JAVA_TOOL_OPTIONS: >-
-XX:+UseContainerSupport
-XX:MaxRAMPercentage=75.0
-XX:+UseZGC
-Djava.security.egd=file:/dev/./urandom


Объяснение:
- -XX:+UseContainerSupport (включено по умолчанию в JDK 8u191+):
- Позволяет JVM читать лимиты из cgroups.
- Без него -XX:MaxRAMPercentage игнорируется.

- -XX:MaxRAMPercentage=75.0:
- Heap = 75% от лимита памяти контейнера.
- Оставшиеся 25% — для Metaspace, native-памяти, стеков потоков.

- -XX:+UseZGC:
- Низколатентный GC (макс. задержки ~1 мс).
- Альтернативы: Shenandoah (JDK 12+), G1GC (по умолчанию).

- -Djava.security.egd=file:/dev/./urandom:
- Ускоряет старт приложений, заменяя блокирующий /dev/random на неблокирующий /dev/urandom.


Проверка работы GC:
docker exec myapp jstat -gcutil 1 1000

- Столбец ZGC покажет использование памяти и паузы сборки мусора.

#Java #middle #Docker #Debug
👍4
Debug и мониторинг: production-ready диагностика

docker exec, attach к процессу

Стандартные команды:
# Стэк-трейс
docker exec myapp jstack 1 > thread_dump.txt

# Мониторинг GC
docker exec myapp jstat -gcutil 1 1000

# Heap-дамп
docker exec myapp jcmd 1 GC.heap_dump /tmp/heap.hprof

Нюансы:
- В distroless-образах нет jstack/jstat.

Решение:
- Используйте sidecar-контейнер с OpenJDK:

    debug-tools:
image: openjdk:17
volumes:
- /var/run/docker.sock:/var/run/docker.sock
command: ["jstack", "myapp"]


- Или добавьте инструменты в образ через jlink:
    RUN jlink \
--add-modules jdk.jfr,jdk.management \
--output /jlinked



JMX/JFR в контейнере

Настройка JMX:
environment:
JAVA_TOOL_OPTIONS: >-
-Dcom.sun.management.jmxremote.port=9090
-Dcom.sun.management.jmxremote.rmi.port=9090
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
-Djava.rmi.server.hostname=myapp
ports:
- "9090:9090"


Проблемы:
- RMI-порт: JMX использует динамические порты для RMI. Чтобы зафиксировать их:
  -Dcom.sun.management.jmxremote.rmi.port=9090


- DNS-имя:
-Djava.rmi.server.hostname=myapp — имя сервиса в Docker Compose.

JFR (Java Flight Recorder):
# Запуск записи
docker exec myapp jcmd 1 JFR.start name=recording duration=60s filename=/tmp/recording.jfr

# Экспорт метрик в Prometheus
RUN jcmd 1 JFR.configure stackdepth=128


Интеграция с Prometheus/Grafana


Шаги:
1. Добавьте Micrometer в приложение:
   <dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>


2. Настройте эндпоинт:
   management.endpoints.web.exposure.include=prometheus


3. Docker Compose:
   services:
app:
ports:
- "8080:8080"
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"


4. prometheus.yml:
   scrape_configs:
- job_name: 'java'
metrics_path: '/actuator/prometheus'
static_configs:
- targets: ['app:8080']


Ключевые метрики:
- jvm_memory_used_bytes — использование heap,
- http_server_requests_seconds_count — количество HTTP-запросов,
- jvm_gc_pause_seconds — длительность GC-пауз.


#Java #middle #Docker #Debug
👍4
CI/CD и финальный проект: production-ready Java-приложение с Docker

Финальный проект: архитектура и реализация


Сборка Java-приложения с Kafka и PostgreSQL
Требования к системе:
- Java 17 (сборка через Maven),
- PostgreSQL 15 (для хранения данных),
- Kafka 3.5+ (в режиме KRaft, без ZooKeeper),
- Безопасность: non-root пользователь, distroless-образы,
- Production-ready: healthcheck, лимиты ресурсов, мониторинг.


Финальный Dockerfile: multi-stage, non-root, distroless (не тестировался, возможно содержит ошибки, написан для визуализации)
# Stage 1: Сборка с Maven
FROM maven:3.8.6-openjdk-17 AS builder
WORKDIR /app
COPY pom.xml .
RUN mvn dependency:go-offline -B # Загружаем зависимости без исходников
COPY src ./src
RUN mvn package -DskipTests

# Stage 2: Минимальная JVM через jlink
FROM openjdk:17 AS jlink
ARG TARGETARCH
RUN jlink \
--add-modules java.base,java.logging,java.xml,java.management,java.naming \
--output /jlinked \
--strip-debug \
--compress 2 \
--no-header-files \
--no-man-pages

# Stage 3: Distroless-образ
FROM gcr.io/distroless/java17-debian11
COPY --from=jlink /jlinked /jlinked
COPY --from=builder /app/target/myapp.jar /app.jar

# Создаем non-root пользователя
RUN addgroup --gid 1001 appgroup && \
adduser --uid 1001 --gid 1001 --disabled-password --gecos "" appuser
USER 1001

# Настройка JVM под контейнеры
ENV JAVA_OPTS="\
-XX:+UseContainerSupport \
-XX:MaxRAMPercentage=75.0 \
-XX:+UseZGC \
-Djava.security.egd=file:/dev/./urandom"
ENTRYPOINT ["/jlinked/bin/java", "${JAVA_OPTS}", "-jar", "/app.jar"]


Объяснение

1. Multi-stage build:
- Stage 1 (builder):
- Использует официальный образ Maven для загрузки зависимостей (dependency:go-offline ускоряет сборку в CI).
- Сборка происходит в изолированном окружении — зависимости не попадут в финальный образ.


- Stage 2 (jlink):
- Создает минимальную JVM через jlink, включая только необходимые модули (анализируйте зависимости через jdeps --print-module-deps).
- --compress 2 сжимает классы (уменьшает размер на 30%).


- Stage 3 (distroless):
- Базовый образ без ОС — только JVM и приложение.
- Нет shell, curl, apt — нулевой attack surface.


2. Non-root пользователь:
- adduser --uid 1001 создает пользователя с фиксированным UID для совместимости с томами.
- Почему это важно: Если контейнер скомпрометирован, злоумышленник не получит root-доступ к хосту.


3. JVM-параметры:
- -XX:MaxRAMPercentage=75.0 — ограничивает heap 75% от лимита контейнера (оставшиеся 25% — для Metaspace и native-памяти).
- -XX:+UseZGC — низколатентный GC (макс. паузы ~1 мс).


Критические нюансы:
- В distroless нет jcmd/jstack.

Для дампов используйте:
  docker cp myapp:/tmp/heap.hprof .  # Если heap_dump настроен в JAVA_TOOL_OPTIONS


- Проверяйте лимиты памяти через:
  docker exec myapp cat /sys/fs/cgroup/memory.max



#Java #middle #Docker
👍3
Финальный docker-compose.yaml: Java + Kafka (KRaft) + PostgreSQL (не тестировался, возможно содержит ошибки, написан для визуализации)
version: '3.8'
services:
app:
image: myapp:latest
ports:
- "8080:8080"
environment:
DB_URL: jdbc:postgresql://db:5432/mydb
DB_PASSWORD: ${DB_PASSWORD}
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
depends_on:
db:
condition: service_healthy
kafka:
condition: service_healthy
networks:
- app_net
deploy:
resources:
limits:
cpus: '1.5'
memory: 512M

db:
image: postgres:15
environment:
POSTGRES_DB: mydb
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- pg_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 10
networks:
- app_net
deploy:
resources:
limits:
memory: 256M

kafka:
image: bitnami/kafka:3.5.1
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: "broker,controller"
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
volumes:
- kafka_data:/bitnami/kafka
healthcheck:
test: ["CMD", "kafka-broker-api-versions.sh", "--bootstrap-server", "localhost:9092"]
interval: 10s
timeout: 10s
retries: 20
networks:
- app_net
deploy:
resources:
limits:
memory: 512M

volumes:
pg_data:
driver: local
kafka_data:
driver: local

networks:
app_net:
driver: bridge


Ключевые решения

1. KRaft вместо ZooKeeper:
- KAFKA_CFG_PROCESS_ROLES: "broker,controller" — единый процесс для метаданных (упрощает настройку).
- Важно: KAFKA_CFG_ADVERTISED_LISTENERS должен указывать на имя сервиса (kafka), а не на localhost.


2. Healthcheck для всех сервисов:
- Для PostgreSQL: pg_isready проверяет готовность принимать подключения.
- Для Kafka:
kafka-broker-api-versions.sh убеждается, что брокер принимает запросы.
- Почему это критично: depends_on без healthcheck не предотвращает race condition.


3. Лимиты ресурсов:
- deploy.resources.limits — ограничивает использование CPU/memory через cgroups.
- Без этого JVM может выделить память, превышающую лимит контейнера (падение с OutOfMemoryError).


4. Сеть:
- Все сервисы в одной сети app_net — общаются по именам (db, kafka).
- Встроенный DNS Docker резолвит имена в IP-адреса контейнеров.



#Java #middle #Docker
👍2
CI/CD pipeline: от коммита до production

Сборка образа в GitHub Actions
name: Build and Push Docker Image
on:
push:
branches: [ main ]
tags: [ 'v*' ]

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up QEMU for multi-arch
uses: docker/setup-qemu-action@v2

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Cache Docker layers
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: ${{ runner.os }}-buildx-

- name: Build and push
uses: docker/build-push-action@v4
with:
context: .
platforms: linux/amd64,linux/arm64
push: true
tags: myapp:latest,myapp:${{ github.sha }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache-new
build-args: |
BUILDKIT_INLINE_CACHE=1



Как это работает


1. Кэширование слоев:
- actions/cache сохраняет результаты сборки в /tmp/.buildx-cache.
- При следующем запуске Buildx использует кэш через cache-from, пропуская этапы с неизмененными инструкциями (например, загрузку зависимостей Maven).
- Экономия времени: Сборка с кэшем — 2 минуты вместо 10.


2. Multi-arch сборка:
- platforms: linux/amd64,linux/arm64 — собирает образы для x86 и ARM.
- Использует QEMU для эмуляции архитектур (установлен через docker/setup-qemu-action).


3. Тегирование:
- myapp:latest — для dev-окружения (не рекомендуется для production!),
- myapp:${{ github.sha }} — уникальный тег на коммит (для отката),
- При тегировании релиза (v1.0.0) — myapp:1.0.0.


Нюансы:
- Для production никогда не используйте latest — это нарушает идемпотентность.
- Вместо latest применяйте семантическое версионирование: major.minor.patch.



#Java #middle #Docker
👍4
Сканирование образа на уязвимости

Добавьте в пайплайн после сборки:
- name: Scan with Trivy
uses: aquasecurity/trivy-action@master
with:
image-ref: 'myapp:${{ github.sha }}'
format: 'table'
exit-code: '1'
ignore-unfixed: true
severity: 'CRITICAL,HIGH'


Как это работает:
- Trivy сканирует образ на наличие уязвимостей в:
- Базовом образе (distroless — минимум пакетов),
- Зависимостях Java (через анализ JAR-файлов).
- ignore-unfixed: true — игнорирует уязвимости без патчей (чтобы не блокировать сборку).
- severity: CRITICAL,HIGH — падает при критических уязвимостях.


Альтернатива: Snyk
- name: Snyk Container scan
uses: snyk/actions/container@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
image: myapp:${{ github.sha }}
args: --severity-threshold=high --fail-on=all



Организация production-ready релиза

Политики хранения образов в registry

1. Для DockerHub/приватного registry:
- Удаляйте образы старше 30 дней (кроме tagged релизов)
- Оставляйте последние 5 образов для каждого major-версии (например, v1.*).


2. Как автоматизировать:
- В GitLab CI используйте cleanup policy для registry,
- В AWS ECR — Lifecycle Policy:

     {
"rules": [
{
"rulePriority": 1,
"description": "Удалять образы старше 30 дней",
"selection": {
"tagStatus": "untagged",
"countType": "sinceImagePushed",
"countUnit": "days",
"countNumber": 30
},
"action": { "type": "expire" }
}
]
}


Multi-environment setup через override.yml

Структура проекта:

├── docker-compose.yml        # Базовая конфигурация
├── docker-compose.dev.yml # Dev-окружение
├── docker-compose.prod.yml # Production
└── .env


Пример docker-compose.prod.yml:
services:
app:
environment:
SPRING_PROFILES_ACTIVE: prod
JAVA_TOOL_OPTIONS: >-
-XX:MaxRAMPercentage=75.0
-XX:+UseZGC
deploy:
replicas: 3
update_config:
parallelism: 1
order: start-first
networks:
- monitoring_net

db:
environment:
POSTGRES_PASSWORD: ${PROD_DB_PASSWORD}
volumes:
- /mnt/prod/pg_data:/var/lib/postgresql/data


Запуск для production:
docker-compose -f docker-compose.yml -f docker-compose.prod.yml up -d


Ключевые отличия:
- Dev:
- Горячая перезагрузка кода через bind mounts,
- Отключенные лимиты ресурсов.

- Production:
- Фиксированные теги образов (не latest),
- Подключение к сети мониторинга (monitoring_net),
- Приватные тома на выделенном диске (/mnt/prod/pg_data).



Будущее: переход к Kubernetes/Helm

Почему Docker Compose не для production?
- Нет оркестрации на нескольких нодах,
- Отсутствует self-healing (автовосстановление упавших сервисов),
- Нет встроенного балансировщика нагрузки.


Как мигрировать
:
1. Замените `docker-compose.yml` на Helm chart:
helm create myapp


2. Настройте values.yaml:

   app:
replicaCount: 3
image:
repository: myapp
tag: "v1.0.0"
resources:
limits:
memory: 512Mi
cpu: "1.5"


3. Интегрируйте Kafka через Strimzi Operator:
   # kafka.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
kafka:
version: 3.5.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false


Преимущества Kubernetes:
- Автомасштабирование (HPA),
- Сетевая изоляция через Network Policies,
- Управление секретами через kubectl create secret.



#Java #middle #Docker
👍3
Реактивное программирование

Вступление

В современном мире разработки программного обеспечения мы всё чаще сталкиваемся с задачами, где простая последовательная логика уже не справляется. Нагрузки на системы растут экспоненциально: миллионы пользователей ожидают мгновенных ответов, микросервисы обмениваются сотнями запросов в секунду, а данные льются рекой из баз данных, API и внешних сервисов.
Традиционный подход — "запрос приходит, запускается поток, ждём ответа" — начинает давать сбои: система тонет в блокировках, лишних потоках и перерасходе ресурсов.


Представьте: сервер обрабатывает тысячи подключений, но каждый запрос занимает отдельный поток, который висит в ожидании, пока не придёт ответ от сети или диска. Это как пытаться пропустить толпу через узкий коридор — пробки неизбежны.

Чтобы выжать максимум из аппаратных ресурсов и сделать приложения более отзывчивыми, нужно менять способ мышления. Один из ключевых инструментов для этого — реактивное программирование, подход, который фокусируется на обработке потоков данных и событий в асинхронном режиме, без лишних блокировок.


Асинхронщина и её проблемы: от основ к пределам

Что такое асинхронность в программировании?
Это когда программа не стоит на месте, ожидая завершения одной задачи, а продолжает работать с другими. В Java это реализуется через механизмы, позволяющие запускать код параллельно. Но на пути к идеалу было много подводных камней.



Потоки: базовый, но тяжёлый инструмент

Многопоточность в Java появилась ещё в ранних версиях языка. Идея проста: вы создаёте новый поток — это как отдельный "рабочий", который выполняет код независимо от основного.


Код выглядит так:
new Thread(() -> { /* ваш код */ }).start();. 


Внутри метода run() вы пишете, что нужно сделать. На первый взгляд, это решает проблему: пока один поток ждёт ответа от сервера, другой может обрабатывать новый запрос.

Но практика показывает минусы. Каждый поток — это тяжёлый объект: он требует выделения памяти (обычно от 1 МБ на стек), системных ресурсов операционной системы и времени на создание. Если нагрузка растёт — скажем, 10 тысяч одновременных запросов, — вы не сможете создать 10 тысяч потоков: система просто исчерпает ресурсы, и всё упадёт. Ещё одна боль — контекстные переключения: когда процессор переключается между потоками, это стоит CPU-времени, иногда до микросекунд на переключение, что накапливается в большие задержки.

Чтобы смягчить это, в Java ввели ExecutorService — сервис-исполнитель, который управляет пулом потоков. Вы создаёте фиксированный пул, например, Executors.newFixedThreadPool(10), и подаёте задачи:
executor.execute(() -> { /* код */ });


Теперь потоки переиспользуются: закончил задачу — берёт следующую. Это экономит ресурсы, но не решает корень проблемы. Если в задаче есть блокирующий код — например, чтение из файла или ожидание сети, — поток всё равно "зависает" в ожидании, блокируя место в пуле. Другие задачи ждут в очереди, и под высокой нагрузкой пул исчерпывается. В итоге, асинхронность есть, но она неэффективна: ресурсы тратятся на ожидание, а не на полезную работу.



#Java #middle #Reactor
👍3
Future: обещание результата, но с подвохом

В Java 5 ввели Future — это как "чек" на будущий результат.
Вы подаёте задачу в executor и получаете объект Future, который обещает: "когда-нибудь я дам тебе ответ".


Пример:
ExecutorService executor = Executors.newFixedThreadPool(10); Future<String> future = executor.submit(() -> { Thread.sleep(1000); return "Привет"; });.


Плюс в том, что вы можете продолжать работу, не дожидаясь: пока задача крутится в фоне, основной код идёт дальше. Но чтобы забрать результат, нужно вызвать future.get(). И вот здесь засада: get() блокирует текущий поток до тех пор, пока задача не завершится. Если задача задерживается — скажем, из-за сети, — ваш поток тоже висит в ожидании. Получается, асинхронность иллюзорна: да, запуск асинхронный, но использование результата синхронное и блокирующее. Это как заказать еду по доставке, но стоять у двери, не отходя, пока курьер не приедет. Выигрыш минимален, особенно в веб-приложениях, где запросы должны обрабатываться быстро.


Ещё Future неудобен в композиции: если нужно объединить результаты нескольких задач, приходится вручную ждать каждого get(), что приводит к спагетти-коду с try-catch для ошибок и таймаутами.


CompletableFuture: цепочки действий, но без избавления от ада

Java 8 принесла CompletableFuture — улучшенную версию Future, которая позволяет строить цепочки асинхронных операций без блокировок на get(). Теперь результат можно обрабатывать через "колбэки" — функции, которые вызываются автоматически по завершении.

Пример:
CompletableFuture.supplyAsync(() -> { return "Данные"; }).thenApply(data -> { return data.toUpperCase(); }).thenAccept(System.out::println);.

Здесь supplyAsync запускает задачу асинхронно, thenApply преобразует результат (например, переводит в верхний регистр), thenAccept выводит его.


Есть методы для комбинации: thenCompose для последовательных цепочек, thenCombine для параллельного объединения результатов, handle для обработки ошибок. Это шаг вперёд: код становится declarative (описательным), вы фокусируетесь на "что сделать", а не "как ждать". Нет нужды в ручном get() — всё течёт само.

Но радость недолговечна. Когда приложение усложняется — например, нужно асинхронно запросить данные из базы, потом из внешнего API, обработать ошибки и объединить, — цепочки лямбд растут в "callback-ад" (ад колбэков): вложенные функции, которые трудно читать, отлаживать и тестировать. Один уровень — ок, но пять-шесть — и код превращается в пирамиду, где сложно отследить поток выполнения.

Ещё хуже: под капотом блокировки никуда не делись. Если в цепочке есть блокирующий вызов — например, Thread.sleep() для симуляции задержки или JDBC-драйвер, который ждёт ответа от базы, блокируя поток, — весь CompletableFuture теряет преимущество. Поток из пула всё равно занят ожиданием, и под нагрузкой система снова захлёбывается. Плюс, управление ошибками в цепочках требует осторожности: одна ошибка может сломать всю последовательность, если не обработать timely.

В итоге, CompletableFuture дал выразительный синтаксис и удобство для простых сценариев, но не решил системные проблемы: ресурсы тратятся впустую на блокировки, сложность растёт, а масштабируемость под вопросом.


#Java #middle #Reactor
👍3
Callback-ад и блокировки: кульминация проблем

Callback-ад — это когда колбэки (функции обратного вызова) наслаиваются друг на друга, делая код нечитаемым. В CompletableFuture это проявляется в глубоких цепочках: thenApply внутри thenCompose, с handle для ошибок. Отладка — кошмар: где именно сломалось? Тестирование — тоже, потому что асинхронность добавляет неопределённость в порядок выполнения.

Блокировки — это когда код "зависает" в ожидании внешнего события, не давая потоку работать с другими задачами. В Java многие библиотеки (как старые IO или JDBC) блокирующие по природе: они используют системные вызовы, которые стопорят поток. Даже в асинхронных конструкциях, если внутри лямбды такая блокировка, весь пул потоков может исчерпаться. Представьте сервер с 100 потоками: 100 запросов с задержкой — и новые ждут в очереди, вызывая таймауты.

Это приводит к неэффективности: CPU простаивает, память тратится на "спящие" потоки, а под пиковой нагрузкой система не масштабируется горизонтально.


Почему нужен новый подход: реактивное программирование

Мы дошли до предела традиционных моделей. Потоки хороши для CPU-bound задач (расчёты), но тяжёлые для IO-bound (сеть, диски). Future дал обещания, но не избавил от блокировок. CompletableFuture улучшил код, но оставил callback-ад и зависимость от неблокирующих библиотек.


Здесь на сцену выходит реактивное программирование — подход, где мы думаем в терминах потоков данных и событий, а не отдельных задач. Вместо "запрос → блокировка в потоке → ответ" мы строим конвейеры: данные приходят асинхронно по мере готовности, обработка идёт реактивно, без выделения потока на каждое ожидание. Это как перейти от конвейера с паузами к непрерывному потоку. В следующих постах разберём Reactive Streams, Flux/Mono в Project Reactor и как это решает проблемы.


#Java #middle #Reactor
👍4
Реактивное программирование

Что такое потоки данных в реактивном мире?

В реактивном программировании данные — это не статичный объект, который вы запрашиваете и ждёте. Это динамичный поток: последовательность элементов (событий), которые могут приходить в любое время, в любом количестве. Поток может быть бесконечным (как лента новостей) или конечным (как результаты поиска). Главное — обработка идёт реактивно: программа "подписывается" на поток и реагирует на каждый элемент по мере его появления, без блокировок.

Это решает боли из предыдущего поста: вместо выделения потока на ожидание, мы используем неблокирующий механизм. Если данных нет — ничего не происходит, ресурсы свободны. Когда данные приходят — срабатывают реакции. Это как подписка на уведомления: телефон не висит в ожидании, а просто пиликает при новом сообщении.

В основе лежит спецификация Reactive Streams — стандарт, который определяет, как строить такие потоки.

Он включает четыре ключевых интерфейса:
Издатель (Publisher): источник данных. Он "публикует" элементы потока. Например, это может быть база данных, генерирующая записи по запросу.
Подписчик (Subscriber): получатель данных. Он "подписывается" на издателя и реагирует на элементы: onNext (получил элемент), onError (ошибка), onComplete (поток завершён).
Подписка (Subscription): связь между издателем и подписчиком. Через неё подписчик может запросить больше данных (request(n)) или отменить подписку (cancel). Это вводит "обратное давление" — механизм, чтобы подписчик не захлёбывался данными, если не успевает их обрабатывать.
Процессор (Processor): комбинация издателя и подписчика, для промежуточной обработки (как фильтр в конвейере).


Эти интерфейсы — основа. Они обеспечивают асинхронность без блокировок: всё работает на основе событий, а не ожиданий.


События как река: метафора и практика

Представьте реку событий: вода (данные) течёт непрерывно, иногда бурно (пиковая нагрузка), иногда спокойно. Ваша программа — не плотина, которая блокирует поток, а турбина, которая генерирует энергию по мере течения. Если река слишком быстрая — турбина сигнализирует "замедлить" (обратное давление), чтобы не перегрузиться.

В Project Reactor это реализовано через два типа потоков:
Mono: поток для нуля или одного элемента. Идеален для одиночных операций, как HTTP-запрос, возвращающий один ответ.

Пример:
Mono<string> mono = Mono.just("Привет из реки");  // Создаём простой Mono с одним элементом
mono.subscribe(
value -> System.out.println("Получено: " + value), // onNext: реакция на элемент
error -> System.err.println("Ошибка: " + error), // onError
() -> System.out.println("Завершено") // onComplete
);

Здесь subscribe — это подписка. Mono "течёт" асинхронно: если элемент готов — срабатывает onNext, потом onComplete. Нет блокировок: код continue после subscribe.


Flux: поток для нуля, одного или многих элементов, возможно бесконечных. Для последовательностей, как стриминг данных.

Пример:
Flux<integer> flux = Flux.range(1, 5);   // Поток чисел от 1 до 5
flux.subscribe(
value -> System.out.println("Элемент: " + value),
error -> System.err.println("Ошибка: " + error),
() -> System.out.println("Поток завершён")
);


Flux "выдаёт" элементы по одному: 1, 2, 3... Это как река: элементы приходят последовательно, но асинхронно.

Почему это лучше традиционных подходов?
В CompletableFuture вы строите цепочки, но рискуете блокировками внутри.
В реактивном стиле всё неблокирующее: используйте операторы вроде map (преобразовать элемент), filter (отфильтровать), flatMap (развернуть в подпотоки).


Пример цепочки:
Flux.fromIterable(List.of("яблоко", "банан", "вишня"))
.filter(fruit -> fruit.startsWith("б")) // Фильтруем
.map(String::toUpperCase) // Преобразуем
.subscribe(System.out::println); // Подписываемся

Результат: "БАНАН". Всё течёт как конвейер, без callback-ада: код читаем, как последовательный, но работает асинхронно.



#Java #middle #Reactor #data_stream
👍3