Java for Beginner
743 subscribers
709 photos
199 videos
12 files
1.15K links
Канал от новичков для новичков!
Изучайте Java вместе с нами!
Здесь мы обмениваемся опытом и постоянно изучаем что-то новое!

Наш YouTube канал - https://www.youtube.com/@Java_Beginner-Dev

Наш канал на RUTube - https://rutube.ru/channel/37896292/
Download Telegram
Реестр схем: именование субъектов, версионность, режимы совместимости (назад, вперёд, полная), формат передачи: магический байт + идентификатор схемы

Реестр схем (Schema Registry от Confluent, с открытым кодом) — централизованный сервис для хранения и проверки схем, интегрированный с Kafka. Он обеспечивает контракт: отправители регистрируют схемы, получатели загружают по идентификатору.

- Именование субъектов: Субъект — ключ для схемы, обычно имя темы с суффиксом -value или -key (стратегия именования: по умолчанию, по теме, по имени записи). Например, "orders-value" для значения в теме orders. Версионность: каждая схема под субъектом имеет версии (1, 2, ...), автоматически увеличивающиеся при регистрации, если совместима.

- Режимы совместимости: Правила проверки новой схемы по отношению к существующим.
- Назад (BACKWARD): Новая схема может читать старые данные (добавление необязательных полей нормально, удаление — нет). Для развёртки сначала потребителей.
- Вперёд (FORWARD): Старые схемы могут читать новые данные (добавление обязательных — нет, удаление необязательных — нормально). Для развёртки сначала отправителей.
- Полная (FULL): И назад, и вперёд (транзитивно: проверка со всеми предыдущими).
Режим настраивается по субъекту (по умолчанию назад с транзитивностью). В реестре: при отправке на /subjects/{subject}/versions сервер разбирает схему и проверяет совместимость через библиотеки (для Avro — валидатор схем, для Protobuf — аналогично).


- Формат передачи: Сообщение = магический байт (0 для Confluent) + идентификатор схемы (4-байтовое целое) + полезная нагрузка. В памяти сериализатора: идентификатор загружается из реестра (кэшируется локально в клиенте реестра: карта субъектов на версии и схемы), затем кодируется нагрузка. Десериализатор: читает магический байт и идентификатор, загружает схему (из кэша), декодирует. Кэш снижает задержку (срок жизни настраивается), но устаревший кэш может вызвать ошибку несовместимой схемы.

В памяти реестра (REST-сервис на Java): схемы хранятся в бэкенде (тема Kafka _schemas или база данных), с кэшем в памяти для быстрого доступа. Нюанс: высокая доступность через несколько экземпляров с выбором лидера.


Сериализаторы в Java: Avro (от Confluent), сериализатор/десериализатор Protobuf

В клиентах Kafka сериализаторы — реализации интерфейсов для сериализации и десериализации.

- Avro (от Confluent): io.confluent.kafka.serializers.KafkaAvroSerializer. Интегрирован с реестром: в методе сериализации загружает или регистрирует схему, пишет магический байт, идентификатор и данные. Для общих данных — общая запись; для конкретных — классы с генерированным кодом через плагин avro-maven. Десериализатор: KafkaAvroDeserializer, с настройкой автоматической регистрации схем false для производства. В памяти: использует писатели и читатели данных, пул буферов байтов для повторного использования.

- Protobuf: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer. Аналогично: сообщения Protobuf генерируются из .proto, сериализатор регистрирует схему (Protobuf преобразуется во внутреннюю JSON-схему). В памяти: динамическое сообщение Protobuf, но предпочтительны сгенерированные для безопасности типов. Десериализатор разбирает с загруженной схемой.

Нюанс: кастомные сериализаторы расширяют абстрактный класс; затраты — загрузка схемы при инициализации (блокирующая), потом асинхронная.


#Java #middle #Kafka #Kafka_serializers #Kafka_deserializers
👍3
Пример producer с Avro

Вот базовый пример отправителя с сериализацией Avro, интегрированным с реестром схем:
import org.apache.kafka.clients.producer.*;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Адреса серверов Kafka
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Сериализатор для ключей
props.put("value.serializer", KafkaAvroSerializer.class.getName()); // Сериализатор для значений Avro
props.put("schema.registry.url", "http://schema-registry:8081"); // URL реестра схем

KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

// Пример схемы и записи
String schemaStr = "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"amount\",\"type\":\"double\"}]}";
Schema schema = new Schema.Parser().parse(schemaStr); // Разбор схемы
GenericRecord record = new GenericData.Record(schema); // Создание записи
record.put("id", 1); // Заполнение поля id
record.put("amount", 100.5); // Заполнение поля amount

ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>("orders", "key", record); // Создание записи для отправки
producer.send(producerRecord); // Отправка
producer.close(); // Закрытие отправителя


В реальных системах: используйте конкретные классы с генерированным кодом, добавьте обработку ошибок для исключений реестра схем.


Нюансы

- Тестирование эволюции схем: Создайте интеграционные тесты с имитацией реестра (встроенный Kafka плюс mock-клиент реестра). Шаги: зарегистрируйте первую версию схемы, отправьте данные; эволюционируйте ко второй (добавьте поле), проверьте режим совместимости; получите данные с десериализатором первой версии на данных второй (совместимость вперёд) и наоборот (назад). Используйте инструменты: avro-tools для сравнения, или тесты JUnit с проверкой совместимости схем. Нюанс: тестируйте транзитивную совместимость (третья версия с первой); имитируйте сброс кэша. В непрерывной интеграции: автоматизируйте с плагинами Gradle или Maven для валидации схем.

- Управление схемами (процесс одобрения): В крупных организациях внедрите рабочий процесс: схемы в репозитории Git, запросы на слияние с проверками совместимости (через плагин schema-registry-maven). Одобрение: ревью коллег плюс автоматические тесты; развёртка только после слияния. Используйте политику совместимости реестра по субъекту; для строгого контроля — полная транзитивная. Нюанс: семантика версионности (семантическое версионирование: мажорная для breaking changes), журнал аудита в реестре. Интегрируйте с процессами непрерывной интеграции и доставки: блокируйте развёртку при несовместимости.

- Сложности миграции с одного формата на другой: Миграция (например, с JSON на Avro) требует фазы двойной записи и чтения: отправители пишут в новую тему с новым форматом, получатели мигрируют постепенно. Сложности: преобразование данных (кастомный трансформер в Streams), обеспечение согласованности (атомарный переключатель невозможен из-за смещений). Затраты: двойное хранение во время перехода; конфликты схем при смешанных данных. Нюанс: для миграции в реальном времени используйте MirrorMaker с кастомными сериализаторами; риски — ошибки десериализации при коллизиях идентификаторов. Тестируйте с канареечными развёртками; время миграции — недели или месяцы для больших наборов данных.


#Java #middle #Kafka #Kafka_serializers #Kafka_deserializers
👍3
Apache Kafka

Безопасность, мониторинг и эксплуатация


Apache Kafka в производственной среде требует тщательного подхода к безопасности, мониторингу и эксплуатации для обеспечения стабильности, масштабируемости и восстановления после сбоев.


Безопасность: TLS/SSL, SASL (SCRAM, OAUTHBEARER), ACLs

Безопасность в Kafka строится на шифровании трафика, аутентификации и авторизации для предотвращения утечек данных и атак.

- TLS/SSL: Шифрование соединений для защиты данных в полёте. Настраивается через listeners (прослушиватели), где указывается протокол SASL_SSL или SSL. Брокеры используют keystore (хранилище ключей) для серверных сертификатов и truststore для проверки клиентов.
В памяти брокера: SSL контекст загружается при старте, handshake происходит на сетевом уровне с использованием Java Secure Socket Extension (JSSE). Нюанс: overhead на CPU для шифрования (до 20% при высокой нагрузке), используйте hardware acceleration если возможно.


- SASL (Simple Authentication and Security Layer): Аутентификация клиентов.

Поддерживает механизмы:
- SCRAM (Salted Challenge Response Authentication Mechanism): Безопасный парольный механизм (SHA-256 или SHA-512). Пароли хранятся в ZooKeeper или KRaft в зашифрованном виде. В процессе: клиент посылает challenge-response, брокер проверяет без передачи пароля.
- OAUTHBEARER: Интеграция с OAuth 2.0 для токенов (JWT). Полезно для интеграции с внешними системами (например, Keycloak).

В памяти: токены валидируются через callback handler, overhead на верификацию сигнатуры.


- ACLs (Access Control Lists): Авторизация операций (READ, WRITE, CREATE и т.д.) по пользователям и ресурсам (темы, группы потребителей). Настраивается через authorizer.class.name (KafkaAuthorizer). ACL хранятся в ZooKeeper/KRaft, загружаются в память брокера как Map<Principal, Set<AclEntry>>. При запросе брокер проверяет ACL перед выполнением, добавляя минимальный latency (миллисекунды).

Компромиссы: полная безопасность (TLS + SASL + ACL) снижает производительность на 10-15%, но обязательна для compliance (GDPR, PCI DSS).


Мониторинг: JMX, Prometheus, key metrics, consumer lag tracking


Мониторинг позволяет выявлять узкие места и предотвращать сбои.

- JMX (Java Management Extensions): Встроенный в Kafka для экспорта метрик (MBeans). Брокеры экспортируют метрики вроде kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec.
В памяти: JMX registry держит beans, polling через JmxExporter.


- Prometheus: Интеграция через JMX Exporter (sidecar) или Kafka Exporter. Собирает метрики в формате для Grafana.

Ключевые метрики:
- Брокеры: under-replicated-partitions (недореплицированные разделы), active-controller-count (активный контроллер, должен быть 1), request-queue-size (очередь запросов, >100 — bottleneck).
- Продюсеры: produce-throttle-time-avg (задержка из-за квот), record-error-rate (ошибки отправки).
- Потребители: records-lag-max (максимальное отставание).


- Отслеживание отставания потребителей (consumer lag): Lag = log-end-offset - committed-offset. Мониторится через KafkaConsumer.metrics() или Burrow/LinkedIn's Kafka Monitor.
В памяти потребителя: fetch metadata включает LEO, lag вычисляется локально. Нюанс: высокий lag (>1M) указывает на slow processing; alerting на threshold.


В эксплуатации: настройте dashboards в Grafana для визуализации, с retention метрик 15-30 дней.


#Java #middle #Kafka #Kafka_securiy
👍1
Тюнинг: параметры брокеров (num.partitions, min.insync.replicas, log.retention.hours), GC tuning для Java 21

Тюнинг оптимизирует под нагрузку.

- Параметры брокеров:
- num.partitions: Количество разделов по умолчанию для новых топиков (default 1). Больше — выше parallelism, но overhead на метаданные (каждый раздел — ~1 МБ RAM на брокере).
- min.insync.replicas: Минимальное количество синхронизированных реплик для acks=all (default 1). Установите 2 для durability, но снижает availability при сбоях.
- log.retention.hours: Время хранения логов (default 168 часов). Баланс: дольше — больше диск, короче — риск потери для replay.


- GC tuning для Java 21: Kafka работает на JVM, GC (сборка мусора) влияет на latency. Используйте Shenandoah или ZGC для low-pause (sub-ms).
Параметры: -XX:+UseZGC -XX:ZCollectionInterval=10 (интервал 10 сек), -XX:MaxGCPauseMillis=5.
В памяти: ZGC использует colored pointers для concurrent marking, снижая паузы. Нюанс: для heap >64GB добавьте -XX:+ZGenerational для generational mode. Тестируйте с GC logs (-Xlog:gc*).


Компромиссы: агрессивный тюнинг снижает latency, но требует benchmark (k6 или Kafka's perf tools).


Восстановление после катастроф: MirrorMaker 2, репликация между регионами

Disaster recovery обеспечивает continuity.


- MirrorMaker 2 (MM2): Инструмент для зеркалирования тем между кластерами (active-passive или active-active). Работает как consumer-producer: читает из source, пишет в target. Поддерживает offset translation, topic renaming. В памяти: MM2 использует Connect framework, с tasks для parallelism.

- Репликация между регионами: Настройте MM2 с remote clusters, используя topics like mm2-offset-syncs для синхронизации offset'ов. Для cross-DC: настройте replication.factor>1, с geo-aware rack.aware.mode. Нюанс: latency добавляет 100-500 мс, мониторьте replication lag.

Стратегия: регулярные drills, с failover скриптами (изменение bootstrap.servers в клиентах).


Пример конфигурации безопасности (ZooKeeper и KRaft)

Вот пример конфигурации брокера для SASL_SSL с SCRAM (работает как с ZooKeeper, так и KRaft):
# Прослушиватели
listeners=SASL_SSL://:9092
advertised.listeners=SASL_SSL://broker-host:9092

# SSL настройки
ssl.keystore.location=/etc/kafka/secrets/kafka.keystore.jks
ssl.keystore.password=keystore-pass
ssl.truststore.location=/etc/kafka/secrets/kafka.truststore.jks
ssl.truststore.password=truststore-pass
ssl.client.auth=required # Требовать клиентские сертификаты

# SASL
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

# Для KRaft добавьте:
process.roles=broker,controller
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,SASL_SSL:SASL_SSL

# ACL
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=false


В production: генерируйте сертификаты с CA (Let's Encrypt или self-signed), храните секреты в Vault.


#Java #middle #Kafka #Kafka_securiy
👍1
Нюансы

- Планирование ёмкости по количеству разделов: Разделы — единица parallelism, но лимит ~4K на брокер (из-за открытых файлов, RAM ~10 КБ на раздел). Планируйте: throughput / partition-throughput (например, 10 МБ/с на раздел). Для 100K TPS — 100 разделов. Нюанс: слишком много (>1M в кластере) замедляет контроллер (leader election до минут); используйте формулу: num_partitions = (desired_throughput / partition_throughput) * replication_factor.

- Оповещения по недореплицированным разделам: Метрика kafka.server:ReplicaManager:UnderReplicatedPartitions >0 сигнализирует о проблемах (сбой фолловера). Alerting в Prometheus: alert если >0 >5 мин, с playbook (проверить network, restart брокер). Нюанс: chronic under-replication приводит к data loss при unclean election.

- Типичные антипаттерны:
- Слишком много разделов: Приводит к overhead (метаданные, ZK load в старых версиях). Решение: consolidate темы, используйте compaction.
- Unclean election (unclean.leader.election.enable=true): Позволяет non-ISR лидеру, рискуя потерей данных. Держите false, жертвуйте availability.
- Отключенный мониторинг: Без alerting на lag или CPU>80% сбои незаметны. Всегда интегрируйте с ELK/Prometheus, с on-call ротацией.


#Java #middle #Kafka #Kafka_securiy
👍3