Apache Kafka.
Введение и архитектура
Apache Kafka представляет собой распределенную платформу для обработки потоков данных в реальном времени, которая сочетает в себе функции очереди сообщений, хранилища данных и системы обработки событий. Разработанная изначально в LinkedIn для решения задач высоконагруженных систем, Kafka эволюционировала в мощный инструмент для построения масштабируемых конвейеров данных.
Основные концепции: topic, partition, offset, segment, log, leader/follower, ISR
Kafka строится вокруг понятия Topic — логическая категория для потоков сообщений.
Topic — это не монолитная структура, а распределенная очередь, разделенная на партиции (partitions). Каждая партиция представляет собой упорядоченную, неизменяемую последовательность записей (records), которая хранится как append-only лог. Это значит, что данные в партиции добавляются только в конец, без возможности модификации существующих записей. Партиции позволяют параллелизовать обработку: разные партиции могут обрабатываться независимо, что обеспечивает масштабируемость.
Внутри партиции каждая запись идентифицируется offset'ом — это монотонно возрастающее целое число, начиная с 0, которое указывает позицию записи в логе. Offset уникален только в пределах партиции; для разных партиций offset'ы независимы. Когда потребитель (consumer) читает данные, он отслеживает текущий offset, чтобы знать, с какой позиции продолжить чтение. В памяти потребителя offset хранится локально, но для надежности Kafka предоставляет механизм коммита offset'ов в специальную внутреннюю тему
Партиция физически хранится как лог (log) — последовательность файлов на диске брокера. Лог разбивается на сегменты (segments) для управления размером: каждый сегмент — это файл с записями, начиная с определенного offset'а (base offset). Когда сегмент достигает заданного размера (по умолчанию 1 ГБ, создается новый. Старые сегменты могут удаляться по политикам retention. В памяти брокера сегменты не загружаются целиком; вместо этого Kafka использует memory-mapped files (mmap) для доступа к диску, что позволяет ОС кэшировать горячие данные в page cache, минимизируя реальные I/O-операции.
Для репликации партиции имеют лидера (leader) и фолловеров (followers). Лидер — это реплика партиции на одном брокере, которая принимает все записи от продюсеров (producers) и обслуживает чтение от потребителей. Фолловеры — реплики на других брокерах, которые синхронизируют данные с лидером. ISR (In-Sync Replicas) — это подмножество реплик (включая лидера), которые полностью синхронизированы с лидером. ISR определяется по отставанию фолловеров: если фолловер не запрашивает данные в течение
В памяти брокера для каждой партиции лидер хранит в RAM метаданные, такие как текущий high-watermark (максимальный offset, закоммиченный на всех ISR), а также буферы для входящих запросов. Фолловеры используют отдельные потоки (replica fetchers) для pull-запросов к лидеру, копируя данные в свои логи.
Архитектура брокера: роль брокера, контроллера, брокерная конфигурация
Брокер (broker) — это основной узел (сервер) Kafka-кластера, отвечающий за хранение и обслуживание данных. Каждый брокер управляет подмножеством партиций: для каждой партиции один брокер является лидером, а другие — фолловерами. Брокеры образуют кластер, координируемый через ZooKeeper (до Kafka 2.8) или встроенный KRaft (Kafka Raft) в новых версиях, который устраняет зависимость от ZooKeeper.
#Java #middle #Kafka
Введение и архитектура
Apache Kafka представляет собой распределенную платформу для обработки потоков данных в реальном времени, которая сочетает в себе функции очереди сообщений, хранилища данных и системы обработки событий. Разработанная изначально в LinkedIn для решения задач высоконагруженных систем, Kafka эволюционировала в мощный инструмент для построения масштабируемых конвейеров данных.
Основные концепции: topic, partition, offset, segment, log, leader/follower, ISR
Kafka строится вокруг понятия Topic — логическая категория для потоков сообщений.
Topic — это не монолитная структура, а распределенная очередь, разделенная на партиции (partitions). Каждая партиция представляет собой упорядоченную, неизменяемую последовательность записей (records), которая хранится как append-only лог. Это значит, что данные в партиции добавляются только в конец, без возможности модификации существующих записей. Партиции позволяют параллелизовать обработку: разные партиции могут обрабатываться независимо, что обеспечивает масштабируемость.
Внутри партиции каждая запись идентифицируется offset'ом — это монотонно возрастающее целое число, начиная с 0, которое указывает позицию записи в логе. Offset уникален только в пределах партиции; для разных партиций offset'ы независимы. Когда потребитель (consumer) читает данные, он отслеживает текущий offset, чтобы знать, с какой позиции продолжить чтение. В памяти потребителя offset хранится локально, но для надежности Kafka предоставляет механизм коммита offset'ов в специальную внутреннюю тему
__consumer_offsets
, где они реплицируются как обычные записи.Партиция физически хранится как лог (log) — последовательность файлов на диске брокера. Лог разбивается на сегменты (segments) для управления размером: каждый сегмент — это файл с записями, начиная с определенного offset'а (base offset). Когда сегмент достигает заданного размера (по умолчанию 1 ГБ, создается новый. Старые сегменты могут удаляться по политикам retention. В памяти брокера сегменты не загружаются целиком; вместо этого Kafka использует memory-mapped files (mmap) для доступа к диску, что позволяет ОС кэшировать горячие данные в page cache, минимизируя реальные I/O-операции.
Для репликации партиции имеют лидера (leader) и фолловеров (followers). Лидер — это реплика партиции на одном брокере, которая принимает все записи от продюсеров (producers) и обслуживает чтение от потребителей. Фолловеры — реплики на других брокерах, которые синхронизируют данные с лидером. ISR (In-Sync Replicas) — это подмножество реплик (включая лидера), которые полностью синхронизированы с лидером. ISR определяется по отставанию фолловеров: если фолловер не запрашивает данные в течение
replica.lag.time.max.ms
(по умолчанию 30 секунд), он исключается из ISR. Это обеспечивает баланс между доступностью и consistency: записи считаются закоммиченными, когда они реплицированы на все реплики в ISR (min.insync.replicas
).В памяти брокера для каждой партиции лидер хранит в RAM метаданные, такие как текущий high-watermark (максимальный offset, закоммиченный на всех ISR), а также буферы для входящих запросов. Фолловеры используют отдельные потоки (replica fetchers) для pull-запросов к лидеру, копируя данные в свои логи.
Архитектура брокера: роль брокера, контроллера, брокерная конфигурация
Брокер (broker) — это основной узел (сервер) Kafka-кластера, отвечающий за хранение и обслуживание данных. Каждый брокер управляет подмножеством партиций: для каждой партиции один брокер является лидером, а другие — фолловерами. Брокеры образуют кластер, координируемый через ZooKeeper (до Kafka 2.8) или встроенный KRaft (Kafka Raft) в новых версиях, который устраняет зависимость от ZooKeeper.
#Java #middle #Kafka
👍6
Контроллер (controller) — это специальный брокер, избираемый кластером для управления метаданными: распределение партиций, лидер-элекшн, обработка изменений в топиках. Контроллер мониторит состояние брокеров через heartbeat'ы и перераспределяет партиции при сбоях. В памяти контроллера хранится глобальное состояние кластера: mapping партиций к брокерам, ISR для каждой партиции. При сбое контроллера избирается новый, что занимает миллисекунды благодаря репликации метаданных в
Брокерная конфигурация определяет поведение:
В памяти брокера значительная часть heap'а (до 50% по умолчанию) выделяется под off-heap буферы для сетевых операций, чтобы избежать GC-пауз. Конфигурация влияет на производительность: слишком малое
Хранение данных: лог-сегменты, индексные файлы, retention vs compaction
Данные в партиции хранятся в лог-сегментах: каждый сегмент состоит из двух файлов —
Запись в
Retention — это политика удаления старых данных:
Trade-offs: retention подходит для временных данных (например, логи), но тратит диск на устаревшие записи; compaction экономит место для stateful данных (например, конфиги), но увеличивает CPU/IO на cleanup. Влияние
Репликация: лидеры, ISR, replica fetcher, лидер-элекшн
Репликация обеспечивает надежность: каждая партиция имеет
Лидер-элекшн происходит при сбое лидера: контроллер выбирает нового лидера из ISR (предпочтительно unclean.leader.election.enable=false, чтобы избежать data loss). Элекшн использует epoch для предотвращения split-brain: новый лидер увеличивает epoch, уведомляя фолловеров. В памяти брокера реплика хранит log end offset (LEO) — максимальный offset в логе, и high-watermark.
#Java #middle #Kafka
__controller_epoch
теме.Брокерная конфигурация определяет поведение:
broker.id
— уникальный ID, log.dirs
— директории для логов, num.network.threads
— количество потоков для сетевых запросов (по умолчанию 3), num.io.threads
— для дисковых операций (по умолчанию 8). В памяти брокера значительная часть heap'а (до 50% по умолчанию) выделяется под off-heap буферы для сетевых операций, чтобы избежать GC-пауз. Конфигурация влияет на производительность: слишком малое
num.io.threads
может привести к bottleneck'у на диске, а большое message.max.bytes
увеличивает потребление памяти для батчинга.Хранение данных: лог-сегменты, индексные файлы, retention vs compaction
Данные в партиции хранятся в лог-сегментах: каждый сегмент состоит из двух файлов —
.log
(сами записи) и .index
(спарс-индекс для быстрого поиска по offset'у). Запись в
.log
— это последовательность байт: заголовок (magic byte, attributes, timestamp), ключ, значение, headers. Индексный файл содержит пары (offset, position), где position — байтовое смещение в .log
. Индекс спарсный (по умолчанию каждые 4 КБ, configurable via index.interval.bytes
), чтобы минимизировать размер: поиск offset'а начинается с ближайшей индексной записи, за которой следует линейный скан.Retention — это политика удаления старых данных:
log.retention.hours
(по умолчанию 168) удаляет сегменты по времени, log.retention.bytes
— по размеру. Compaction — альтернатива для key-based тем: сохраняет только последнюю запись для каждого ключа, удаляя дубликаты. Compaction работает в фоне: cleaner thread сканирует сегменты, строит в памяти map ключей к последним offset'ам, затем перезаписывает сегмент. В памяти это требует heap'а пропорционально количеству уникальных ключей в сегменте.Trade-offs: retention подходит для временных данных (например, логи), но тратит диск на устаревшие записи; compaction экономит место для stateful данных (например, конфиги), но увеличивает CPU/IO на cleanup. Влияние
segment.bytes
: маленькие сегменты (например, 100 МБ) ускоряют deletion/compaction, снижая GC (меньше объектов в heap во время cleanup), но увеличивают overhead на открытые файлы и индексы. Большие сегменты минимизируют фрагментацию, но замедляют GC/IO при compaction, так как требуют больше памяти для временных структур.Репликация: лидеры, ISR, replica fetcher, лидер-элекшн
Репликация обеспечивает надежность: каждая партиция имеет
replication.factor
(по умолчанию 1-3) реплик. Продюсеры пишут только в лидера, который реплицирует данные в ISR. Replica fetcher — это поток на фолловере, который периодически посылает FetchRequest к лидеру, запрашивая данные с последнего fetched offset'а. Лидер проверяет, в ISR ли фолловер, и отправляет данные. High-watermark продвигается только когда все ISR зареплицировали запись, обеспечивая durability.Лидер-элекшн происходит при сбое лидера: контроллер выбирает нового лидера из ISR (предпочтительно unclean.leader.election.enable=false, чтобы избежать data loss). Элекшн использует epoch для предотвращения split-brain: новый лидер увеличивает epoch, уведомляя фолловеров. В памяти брокера реплика хранит log end offset (LEO) — максимальный offset в логе, и high-watermark.
#Java #middle #Kafka
👍6
Сетевая модель: request/response, metadata, fetch/produce
Kafka использует асинхронный, бинарный протокол на TCP: клиенты посылают requests (ProduceRequest, FetchRequest), брокеры отвечают responses. MetadataRequest запрашивает топик-метаданные (партиции, лидеры) от любого брокера, который перенаправляет к контроллеру если нужно. Produce — для записи: продюсер батчит записи по партициям, посылая в лидера. Fetch — для чтения: потребитель запрашивает с offset'а, получая chunk данных.
В памяти: брокер использует NIO selectors для multiplexing соединений, буферы (ByteBuffer) для zero-copy передачи. Zero-copy с sendfile() позволяет передавать данные из page cache напрямую в socket, без копирования в user space.
Обзор API-моделей
- Producer API: Для отправки записей. Создает ProducerRecord (topic, partition, key, value), использует KafkaProducer с конфигами (acks=all для durability, batch.size для батчинга).
- Consumer API: Для чтения. KafkaConsumer с poll(), который возвращает ConsumerRecords. Поддерживает группы для балансировки партиций.
- Admin API: Для управления топиками (create, delete, describe).
- Streams API: Для обработки потоков (KStream, KTable) с state stores.
- Connect API: Для интеграции с внешними системами (sources/sinks).
Ordering guarantees, масштабирование vs ordering
Kafka гарантирует ordering только внутри партиции: записи с одним ключом (если key-based partitioning) идут в порядке отправки. Нет глобального ordering по топику. Масштабирование добавлением партиций улучшает throughput, но жертвует ordering: для строгого ordering используйте одну партицию, что лимитирует parallelism.
Почему Kafka быстрая
- Sequential I/O: Запись/чтение в append-only лог — последовательные операции на диске, эффективные для HDD/SSD (миллионы IOPS vs random access).
- Zero-copy: Sendfile() копирует данные из kernel cache в socket без user space, снижая CPU и latency.
- Batching: Продюсеры/потребители батчат записи (linger.ms), амортизируя overhead сети/диска. В памяти батчи сжимаются (compression.type).
Минимальный producer/consumer
Producer (Java):
Consumer (Java):
#Java #middle #Kafka
Kafka использует асинхронный, бинарный протокол на TCP: клиенты посылают requests (ProduceRequest, FetchRequest), брокеры отвечают responses. MetadataRequest запрашивает топик-метаданные (партиции, лидеры) от любого брокера, который перенаправляет к контроллеру если нужно. Produce — для записи: продюсер батчит записи по партициям, посылая в лидера. Fetch — для чтения: потребитель запрашивает с offset'а, получая chunk данных.
В памяти: брокер использует NIO selectors для multiplexing соединений, буферы (ByteBuffer) для zero-copy передачи. Zero-copy с sendfile() позволяет передавать данные из page cache напрямую в socket, без копирования в user space.
Обзор API-моделей
- Producer API: Для отправки записей. Создает ProducerRecord (topic, partition, key, value), использует KafkaProducer с конфигами (acks=all для durability, batch.size для батчинга).
- Consumer API: Для чтения. KafkaConsumer с poll(), который возвращает ConsumerRecords. Поддерживает группы для балансировки партиций.
- Admin API: Для управления топиками (create, delete, describe).
- Streams API: Для обработки потоков (KStream, KTable) с state stores.
- Connect API: Для интеграции с внешними системами (sources/sinks).
Ordering guarantees, масштабирование vs ordering
Kafka гарантирует ordering только внутри партиции: записи с одним ключом (если key-based partitioning) идут в порядке отправки. Нет глобального ordering по топику. Масштабирование добавлением партиций улучшает throughput, но жертвует ordering: для строгого ordering используйте одну партицию, что лимитирует parallelism.
Почему Kafka быстрая
- Sequential I/O: Запись/чтение в append-only лог — последовательные операции на диске, эффективные для HDD/SSD (миллионы IOPS vs random access).
- Zero-copy: Sendfile() копирует данные из kernel cache в socket без user space, снижая CPU и latency.
- Batching: Продюсеры/потребители батчат записи (linger.ms), амортизируя overhead сети/диска. В памяти батчи сжимаются (compression.type).
Минимальный producer/consumer
Producer (Java):
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
producer.close();
Consumer (Java):
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.util.Collections;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.close();
#Java #middle #Kafka
🔥3👍2