listeners vs advertised.listeners
В чём отличие параметров брокера Apache Kafka
Когда мы запускаем брокер на локальной машине, то подключаемся к нему по
не настраивая никаких слушателей.
Это работает, потому что параметр
а параметр
Если
Если
При подключении к
Теперь в параметр
Например,
Тогда, при подключении к
Параметр listeners определяет какие интерфейсы назначаются брокеру.
Параметр advertised.listeners определяет через какие интерфейсы клиент должен взаимодействовать с брокером.
Например, это необходимо, если взаимодействие между брокерами происходит по внутренним адресам (host/IP), а с клиентами - по внешним.
Или, если для клиентов требуется протокол безопасности
#broker #listeners
@kafka_cooker
В чём отличие параметров брокера Apache Kafka
listeners
и advertised.listeners
?Когда мы запускаем брокер на локальной машине, то подключаемся к нему по
localhost:9092
,не настраивая никаких слушателей.
Это работает, потому что параметр
listeners
по-умолчанию равен PLAINTEXT://:9092
,а параметр
advertised.listeners
по-умолчанию не задан и, тогда, использует значение listeners
.listeners
задаётся в формате <LISTENER_NAME или PROTOCOL>://<HOSTNAME>:<PORT>
.Если
<HOSTNAME>
указан 0.0.0.0
, то слушателю назначаются все интерфейсы.Если
<HOSTNAME>
не указан, то слушателю назначается интерфейс по-умолчанию, равный java.net.InetAddress.getCanonicalHostName()
.При подключении к
localhost:9092
, брокер возвращает в метадата список нод, с которыми клиент (продюсер, консьюмер и т.д.) должен взаимодействовать:MetadataResponseData(brokers=[MetadataResponseBroker(nodeId=0, host='fedora', port=9092)]Какой
host
вернётся, можно проверить, запустив в терминале команду hostname
.Теперь в параметр
advertised.listeners
установим значение, соответсвующее IP адресу нашей машины в локальной сети.Например,
PLAINTEXT://192.168.1.58:9092
.Тогда, при подключении к
localhost:9092
, на запрос метадата, брокер вернёт следующий список нод:MetadataResponseData(brokers=[MetadataResponseBroker(nodeId=0, host='192.168.1.58', port=9092)]Тем самым, мы можем управлять через какие интерфейсы клиент должен взаимодействовать с брокером.
Параметр listeners определяет какие интерфейсы назначаются брокеру.
Параметр advertised.listeners определяет через какие интерфейсы клиент должен взаимодействовать с брокером.
Например, это необходимо, если взаимодействие между брокерами происходит по внутренним адресам (host/IP), а с клиентами - по внешним.
Или, если для клиентов требуется протокол безопасности
SSL
, а для брокеров - PLAINTEXT
.#broker #listeners
@kafka_cooker
❤1
in-sync реплика vs out-of-sync реплика
Механизм репликации начинает работу при установке параметра
на уровне брокера или
Для каждой партиции топика назначается ведущая реплика. Все остальные реплики данной партиции являются ведомыми.
В случае выхода из строя ведущей реплики - её место займёт одна из ведомых in-sync реплик.
in-sync реплика (ISR) должна соответствовать двум правилам:
1. поддерживать активную сессию с контроллером кластера - в зависимости от режима работы KRaft или Zookeeper,
за это отвечает параметр
или
2. догнать последний офсет ведущей реплики не более, чем за
Реплика, которая не соответствует этим условиям, является out-of-sync репликой.
Если in-sync реплика занимает место ведущей, по причине её недоступности, то такой выбор лидера называется "чистым".
Потому что, по определению isr, зафиксированные данные есть во всех согласованных репликах, что даёт гарантии отсутствия потери данных.
Но что, если in-sync реплики отсутствуют, но при этом есть out-of-sync реплики, а ведущая реплика стала недоступна?
В этом случае есть две стратегии поведения:
1. дождаться пока ведущая реплика станет доступна
2. назначить ведущей репликой одну из out-of-sync реплик
В первом варианте мы получаем простой системы. Во втором - потерю и несогласованность данных.
По-умолчанию, Apache Kafka использует первую стратегию. За это отвечает параметр
Его можно задать не только на уровне брокера, но и на уровне конкретного топика.
#broker #isr
@kafka_cooker
Механизм репликации начинает работу при установке параметра
default.replication.factor
на уровне брокера или
replication-factor
на уровне топика большим единицы.Для каждой партиции топика назначается ведущая реплика. Все остальные реплики данной партиции являются ведомыми.
В случае выхода из строя ведущей реплики - её место займёт одна из ведомых in-sync реплик.
in-sync реплика (ISR) должна соответствовать двум правилам:
1. поддерживать активную сессию с контроллером кластера - в зависимости от режима работы KRaft или Zookeeper,
за это отвечает параметр
broker.session.timeout.ms
(9 секунд)или
zookeeper.session.timeout.ms
(18 секунд)2. догнать последний офсет ведущей реплики не более, чем за
replica.lag.time.max.ms
(30 секунд)Реплика, которая не соответствует этим условиям, является out-of-sync репликой.
Если in-sync реплика занимает место ведущей, по причине её недоступности, то такой выбор лидера называется "чистым".
Потому что, по определению isr, зафиксированные данные есть во всех согласованных репликах, что даёт гарантии отсутствия потери данных.
Но что, если in-sync реплики отсутствуют, но при этом есть out-of-sync реплики, а ведущая реплика стала недоступна?
В этом случае есть две стратегии поведения:
1. дождаться пока ведущая реплика станет доступна
2. назначить ведущей репликой одну из out-of-sync реплик
В первом варианте мы получаем простой системы. Во втором - потерю и несогласованность данных.
По-умолчанию, Apache Kafka использует первую стратегию. За это отвечает параметр
unclean.leader.election.enable
(false).Его можно задать не только на уровне брокера, но и на уровне конкретного топика.
#broker #isr
@kafka_cooker
В какую партицию отправляются сообщения?
Автоматическое определение партиций для отправки сообщений претерпело множество изменений за время существования Apache Kafka.
Начиная с простого round robin, когда сообщения распределялись равномерно по партициям, заканчивая адаптивным алгоритмом, который пытается отправить как можно больше сообщений в партиции, находящихся на более производительных брокерах.
Сейчас определение партиции выглядит так:
1. Если пользователь самостоятельно указал партицию, то сообщение отправится в неё.
2. Если пользователь реализовал интерфейс
3. Если в сообщении указан ключ, а параметр
4. Во всех остальных случаях используется адаптивный вариант sticky partitioner.
Первоначальный вариант sticky partitioner выбирал определённую партицию и отправлял в неё сообщение(-я) по достижении
На практике оказалось, что отправка в партиции, находящихся на более производительных брокерах, происходила по достижении
Тем самым, на менее производительные брокеры отправлялось больше сообщений.
Новый вариант sticky partitioner, путём сбора статистики работы, пытается отправить как можно больше сообщений в партиции, находящиеся на более производительных брокерах. За это отвечает параметр
Так же, с помощью параметра
#producer #partition
@kafka_cooker
Автоматическое определение партиций для отправки сообщений претерпело множество изменений за время существования Apache Kafka.
Начиная с простого round robin, когда сообщения распределялись равномерно по партициям, заканчивая адаптивным алгоритмом, который пытается отправить как можно больше сообщений в партиции, находящихся на более производительных брокерах.
Сейчас определение партиции выглядит так:
1. Если пользователь самостоятельно указал партицию, то сообщение отправится в неё.
2. Если пользователь реализовал интерфейс
org.apache.kafka.clients.producer.Partitioner
и указал параметр partitioner.class
, то сообщение отправится в партицию, которую вернул метод int partition(...)
указанной реализации.3. Если в сообщении указан ключ, а параметр
partitioner.ignore.keys
равен false
(значение по-умолчанию), то сообщение отправится в партицию, определённую на основе хеша ключа (алгоритм murmur2).4. Во всех остальных случаях используется адаптивный вариант sticky partitioner.
Первоначальный вариант sticky partitioner выбирал определённую партицию и отправлял в неё сообщение(-я) по достижении
batch.size
(16КБ) или linger.ms
(0мс). Далее выбирал следующую партицию и так снова и снова.На практике оказалось, что отправка в партиции, находящихся на более производительных брокерах, происходила по достижении
linger.ms
, а отправка в партиции, находящихся на менее производительных брокерах, происходила по достижении batch.size
.Тем самым, на менее производительные брокеры отправлялось больше сообщений.
Новый вариант sticky partitioner, путём сбора статистики работы, пытается отправить как можно больше сообщений в партиции, находящиеся на более производительных брокерах. За это отвечает параметр
partitioner.adaptive.partitioning.enable
(true).Так же, с помощью параметра
partitioner.availability.timeout.ms
(0мс), можно настроить задержку принятия запроса брокером, по истечении которой, партиция отмечается недоступной.#producer #partition
@kafka_cooker
🔥1
Как происходит ребалансировка потребителей?
Apache Kafka использует две стратегии ребалансировки потребителей:
1. eager
2. cooperative (incremental)
Eager стратегия применяет подход stop-the-world, когда все потребители прекращают обработку сообщений, отказываются от назначенных им партиций, снова присоединяются к группе потребителей и получают новые партиции.
Cooperative стратегия старается переназначить лишь небольшое количество партиций от одного потребителя к другому, позволяя остальным потребителям продолжать обработку сообщений из партиций, которые не были переназначены.
Этот процесс проходит в несколько этапов, поэтому удаётся избежать эффекта stop-the-world, что особенно важно для больших групп потребителей, для которых ребалансировка может занять много времени.
В Apache Kafka не существует отдельного параметра, который позволял бы указать какую стратегию ребалансировки использовать.
Стратегия ребалансировки выбирается на основе стратегии назначения патриций потребителям. За это отвечает параметр
1.
2.
3.
4.
5. пользовательская реализация интерфейса
По-умолчанию, данный параметр равен
#consumer #rebalance
@kafka_cooker
Apache Kafka использует две стратегии ребалансировки потребителей:
1. eager
2. cooperative (incremental)
Eager стратегия применяет подход stop-the-world, когда все потребители прекращают обработку сообщений, отказываются от назначенных им партиций, снова присоединяются к группе потребителей и получают новые партиции.
Cooperative стратегия старается переназначить лишь небольшое количество партиций от одного потребителя к другому, позволяя остальным потребителям продолжать обработку сообщений из партиций, которые не были переназначены.
Этот процесс проходит в несколько этапов, поэтому удаётся избежать эффекта stop-the-world, что особенно важно для больших групп потребителей, для которых ребалансировка может занять много времени.
В Apache Kafka не существует отдельного параметра, который позволял бы указать какую стратегию ребалансировки использовать.
Стратегия ребалансировки выбирается на основе стратегии назначения патриций потребителям. За это отвечает параметр
partition.assignment.strategy
, который может принимать следующие значения:1.
RangeAssignor
- eager2.
RoundRobinAssignor
- eager3.
StickyAssignor
- eager4.
CooperativeStickyAssignor
- cooperative5. пользовательская реализация интерфейса
ConsumerPartitionAssignor
По-умолчанию, данный параметр равен
[RangeAssignor, CooperativeStickyAssignor]
, что значит использование, в первую очередь, RangeAssignor
, но с возможностью перехода на CooperativeStickyAssignor
.RangeAssignor
назначает потребителям последовательные подмножества партиций.RoundRobinAssignor
назначает потребителям партиции последовательно, одна за другой.StickyAssignor
старается обеспечить максимально сбалансированное назначение партиций потребителям, но, в случае ребалансировки, оставить как можно больше уже назначенных партиций нетронутыми.CooperativeStickyAssignor
работает аналогично StickyAssignor
, но поддерживает cooperative стратегию ребалансировки.#consumer #rebalance
@kafka_cooker
Зачем нужен статический потребитель?
Apache Kafka использует два вида потребителей:
1. динамические (по-умолчанию)
2. статические
Когда динамический потребитель впервые присоединяется к группе потребителей, то происходит ребалансировка группы и ему назначаются определённые партиции.
Когда, в следующий раз, данный потребитель решит снова присоединиться к группе (например, из-за перезагрузки приложения), то он ничего не будет знать о ранее назначенных ему партициях, и, в результате ребалансировки группы, получит новые партиции.
Статический потребитель ведёт себя по другому.
Когда статический потребитель впервые присоединяется к группе потребителей, то ребалансировка группы происходит по тому же принципу, что и для динамического потребителя.
Но, когда данный потребитель выключится, то ребалансировки группы не произойдёт и назначенные ему партиции не будут перераспределены.
Координатор группы будет ждать до
Если данный потребитель успеет переподключиться к группе до истечения
Важно понимать, что пока статический потребитель неактивен, то ни один другой потребитель не будет вычитывать сообщения из данных партиций.
Чтобы превратить динамический потребитель в статический, достаточно указать уникальный параметр
Если два потребителя присоединятся к одной группе с одинаковым
Кроме уменьшения количества ребалансировок, статические потребители полезны в случае, если приложение является stateful, то есть хранит состояние, а инициализация этого состояния из конкретных партиций занимает продолжительное время.
#consumer #partition #rebalance
@kafka_cooker
Apache Kafka использует два вида потребителей:
1. динамические (по-умолчанию)
2. статические
Когда динамический потребитель впервые присоединяется к группе потребителей, то происходит ребалансировка группы и ему назначаются определённые партиции.
Когда, в следующий раз, данный потребитель решит снова присоединиться к группе (например, из-за перезагрузки приложения), то он ничего не будет знать о ранее назначенных ему партициях, и, в результате ребалансировки группы, получит новые партиции.
Статический потребитель ведёт себя по другому.
Когда статический потребитель впервые присоединяется к группе потребителей, то ребалансировка группы происходит по тому же принципу, что и для динамического потребителя.
Но, когда данный потребитель выключится, то ребалансировки группы не произойдёт и назначенные ему партиции не будут перераспределены.
Координатор группы будет ждать до
session.timeout.ms
(45 сек), прежде, чем решит перераспределить партиции, назначенные на статического потребителя.Если данный потребитель успеет переподключиться к группе до истечения
session.timeout.ms
, то ему будут назначены прежние партиции.Важно понимать, что пока статический потребитель неактивен, то ни один другой потребитель не будет вычитывать сообщения из данных партиций.
Чтобы превратить динамический потребитель в статический, достаточно указать уникальный параметр
group.instance.id
.Если два потребителя присоединятся к одной группе с одинаковым
group.instance.id
, то один из них получит ошибку.Кроме уменьшения количества ребалансировок, статические потребители полезны в случае, если приложение является stateful, то есть хранит состояние, а инициализация этого состояния из конкретных партиций занимает продолжительное время.
#consumer #partition #rebalance
@kafka_cooker
Как потребителю узнать о ребалансировке?
Если приложение является stateful, то есть хранит состояние, например, кэш, а этот кэш зависит от конкретных партиций, то, в случае ребалансировки и переназначения партиций, было бы полезно этот кэш или очищать или заново инициализировать.
Чтобы узнать, какие партиции участвовали в ребалансировке, нужно создать реализацию интерфейса
1.
Принимает в качестве аргумента список отозванных партиций.
При eager ребалансировке вызывается всегда (даже с пустым списком партиций) в начале ребалансировки и после того, как потребитель перестал вычитывать сообщения.
При cooperative ребалансировке вызывается в её конце и только с непустым списком партиций.
Так же, может вызываться при остановке потребителя (
2.
Принимает в качестве аргумента список новых партиций для потребителя или пустой список, если новых партиций для потребителя нет.
Вызывается в конце ребалансировки и до того, как потребитель начнёт вычитывать сообщения.
Если конкретная партиция была переназначена от одного потребителя к другому, то в нормальных условиях гарантируется, что вызов метода
3.
Принимает в качестве аргумента список отозванных партиций. Вызывается только с непустым списком партиций.
Вызов этого метода может быть осуществлён, когда указанные партиции уже назначены другому потребителю.
#consumer #partition #rebalance
@kafka_cooker
Если приложение является stateful, то есть хранит состояние, например, кэш, а этот кэш зависит от конкретных партиций, то, в случае ребалансировки и переназначения партиций, было бы полезно этот кэш или очищать или заново инициализировать.
Чтобы узнать, какие партиции участвовали в ребалансировке, нужно создать реализацию интерфейса
org.apache.kafka.clients.consumer.ConsumerRebalanceListener
и передать её потребителю в методе subscribe
.ConsumerRebalanceListener
предоставляет три метода.1.
onPartitionsRevoked
вызывается, когда потребитель должен отказаться от партиций, которые ему были назначены ранее.Принимает в качестве аргумента список отозванных партиций.
При eager ребалансировке вызывается всегда (даже с пустым списком партиций) в начале ребалансировки и после того, как потребитель перестал вычитывать сообщения.
При cooperative ребалансировке вызывается в её конце и только с непустым списком партиций.
Так же, может вызываться при остановке потребителя (
close
, unsubscribe
).2.
onPartitionsAssigned
вызывается при любой успешной ребалансировке.Принимает в качестве аргумента список новых партиций для потребителя или пустой список, если новых партиций для потребителя нет.
Вызывается в конце ребалансировки и до того, как потребитель начнёт вычитывать сообщения.
Если конкретная партиция была переназначена от одного потребителя к другому, то в нормальных условиях гарантируется, что вызов метода
onPartitionsRevoked
первым потребителем, будет осуществлён до вызова метода onPartitionsAssigned
вторым потребителем.3.
onPartitionsLost
вызывается в исключительных ситуациях, например, когда истекла сессия потребителя или потребитель потерял членство в группе в результате непредвиденной ошибки.Принимает в качестве аргумента список отозванных партиций. Вызывается только с непустым списком партиций.
Вызов этого метода может быть осуществлён, когда указанные партиции уже назначены другому потребителю.
#consumer #partition #rebalance
@kafka_cooker
Как брокер обрабатывает запросы?
Сетевой уровень брокера Apache Kafka представляет собой NIO сервер, который поддерживает два пайплайна обработки запросов:
1. data-plane
2. control-plane
data-plane обрабатывает запросы от клиентов и других брокеров в кластере.
Для каждого слушателя в параметре
Acceptor thread содержит несколько network threads. Их количество задаётся параметром
Каждый network thread имеет свой селектор и вычитывает запросы из сокета.
Получив запрос, network thread помещает его в очередь. Размер этой очереди задаётся параметром
Запросы из этой очереди разбирают и обрабатывают несколько io threads. Их количество задаётся параметром
Как только io thread завершил обрабатывать запрос, он кладёт результат в ответную очередь. Из этой очереди его забирает network thread и отправляет клиенту.
control-plane обрабатывает запросы от контроллера кластера. Работает только, если задан параметр
Работает аналогично data-plane, за исключением того, что содержит только один network thread и один io thread.
#broker
@kafka_cooker
Сетевой уровень брокера Apache Kafka представляет собой NIO сервер, который поддерживает два пайплайна обработки запросов:
1. data-plane
2. control-plane
data-plane обрабатывает запросы от клиентов и других брокеров в кластере.
Для каждого слушателя в параметре
listeners
создаётся один acceptor thread, который обрабатывает новые соединения.Acceptor thread содержит несколько network threads. Их количество задаётся параметром
num.network.threads
(3).Каждый network thread имеет свой селектор и вычитывает запросы из сокета.
Получив запрос, network thread помещает его в очередь. Размер этой очереди задаётся параметром
queued.max.requests
(500). Если очередь полностью заполнится, то network threads будут заблокированы.Запросы из этой очереди разбирают и обрабатывают несколько io threads. Их количество задаётся параметром
num.io.threads
(8).Как только io thread завершил обрабатывать запрос, он кладёт результат в ответную очередь. Из этой очереди его забирает network thread и отправляет клиенту.
control-plane обрабатывает запросы от контроллера кластера. Работает только, если задан параметр
control.plane.listener.name
. Если не задан, то запросы от контроллера будет обрабатывать data-plane.Работает аналогично data-plane, за исключением того, что содержит только один network thread и один io thread.
#broker
@kafka_cooker
Как партиции распределяются между брокерами?
При создании топика с несколькими партициями и несколькими репликами Apache Kafka старается достичь нескольких целей.
Две самые важные из них по приоритету:
1. распределить реплики как можно более равномерно по серверным стойкам (если кластер располагается на одной стойке, то данная цель не имеет смысла)
2. распределить реплики как можно более равномерно по брокерам
Так же, существует важное ограничение, что на одном брокере должна быть только одна реплика конкретной партиции.
Поэтому, невозможно создать топик с фактором репликации 4 в кластере из 3 брокеров.
Основная идея распределения реплик состоит в том, чтобы распределять их циклически (round-robin) по серверным стойкам и брокерам.
Например, у нас есть стойки
Мы хотим создать топик с 5 партициями и фактором репликации 3.
Для того чтобы постоянно не начинать распределение со стойки
Для простоты восприятия, предположим, что наши смещения равны 0.
Тогда распределение будет выглядеть следующим образом:
- партиция 1:
- партиция 2:
- партиция 3:
- партиция 4:
- партиция 5:
Как видно, даже если какая-либо стойка выйдет из строя, это никак не скажется на доступности какой-либо партиции.
Однако, следует учитывать, что количество брокеров в каждой стойке должно быть как можно более одинаковым.
Иначе, из-за природы циклического распределения, на брокеры в стойке с наименьшим количеством брокеров будет приходиться наибольшее количество реплик.
Более подробно ознакомиться с алгоритмом распределения можно в классе
#broker #partition
@kafka_cooker
При создании топика с несколькими партициями и несколькими репликами Apache Kafka старается достичь нескольких целей.
Две самые важные из них по приоритету:
1. распределить реплики как можно более равномерно по серверным стойкам (если кластер располагается на одной стойке, то данная цель не имеет смысла)
2. распределить реплики как можно более равномерно по брокерам
Так же, существует важное ограничение, что на одном брокере должна быть только одна реплика конкретной партиции.
Поэтому, невозможно создать топик с фактором репликации 4 в кластере из 3 брокеров.
Основная идея распределения реплик состоит в том, чтобы распределять их циклически (round-robin) по серверным стойкам и брокерам.
Например, у нас есть стойки
A
, B
, C
и D
. В каждой стойке есть по 4 брокера, обозначим их A1
, A2
, A3
, A4
и т.д.Мы хотим создать топик с 5 партициями и фактором репликации 3.
Для того чтобы постоянно не начинать распределение со стойки
A
и с первого брокера в стойке, введём случайное начальное смещение как для стойки, так и для брокера в стойке.Для простоты восприятия, предположим, что наши смещения равны 0.
Тогда распределение будет выглядеть следующим образом:
- партиция 1:
A1
B1
C1
- партиция 2:
B2
C2
D1
- партиция 3:
C3
D2
A2
- партиция 4:
D3
A3
B3
- партиция 5:
A4
B4
C4
Как видно, даже если какая-либо стойка выйдет из строя, это никак не скажется на доступности какой-либо партиции.
Однако, следует учитывать, что количество брокеров в каждой стойке должно быть как можно более одинаковым.
Иначе, из-за природы циклического распределения, на брокеры в стойке с наименьшим количеством брокеров будет приходиться наибольшее количество реплик.
Более подробно ознакомиться с алгоритмом распределения можно в классе
org.apache.kafka.metadata.placement.StripedReplicaPlacer
#broker #partition
@kafka_cooker
❤1
Как определить количество реплик для топика?
Репликация предназначена для обеспечения отказоустойчивости и доступности данных не только в случае аварийных ситуаций, но и в случае проведения технического обслуживания серверов.
Партиции могут реплицироваться на брокеры в разных серверных стойках, в разных дата-центрах и даже в разных географических регионах.
Очевидно, если мы будем использовать коэффициент репликации равный 1, то при сбое не сможем ни читать, ни писать данные.
С другой стороны, если укажем его равным 10, то никто не поблагодарит нас за увеличивающиеся затраты на хранение.
Чем больше коэффициент репликации, тем:
1. Надёжнее наша система. Неважно, откажет диск у конкретного брокера или вся стойка, мы продолжим чтение и запись.
2. Дольше происходит запись. При
3. Больше трафика между брокерами.
4. Дороже хранение.
Общепринятым для production среды является коэффициент репликации равный 3.
Он обеспечивает наилучшее соотношение между надёжностью системы и затратами на её поддержание.
Допустим мы запускаем кластер с коэффициентом репликации 3 в облачной среде, в которой есть 3 зоны доступности. Зоны доступности в облаке принято рассматривать как отдельные стойки. Значит для брокеров в конкретной зоне мы должны указать одинаковый
У любого облачного провайдера есть окна обслуживания и соглашение об уровне обслуживания (SLA). Например, у одного отечественного провайдера SLA на запись в кластер Apache Kafka составляет 99,95%.
Это значит, что в определённый момент какая-либо зона может быть недоступна из-за обслуживания. Но у нас остаётся ещё две зоны. И даже если одна из них выйдет из строя уже из-за какой-либо аварии, то мы всё равно сможем продолжить работу.
Конечно, никто не гарантирует, что и третья зона не выйдет из строя. Но как, и в любых распределённых системах, в Apache Kafka мы работаем с вероятностями и компромиссами.
#topic #partition #replica
@kafka_cooker
Репликация предназначена для обеспечения отказоустойчивости и доступности данных не только в случае аварийных ситуаций, но и в случае проведения технического обслуживания серверов.
Партиции могут реплицироваться на брокеры в разных серверных стойках, в разных дата-центрах и даже в разных географических регионах.
Очевидно, если мы будем использовать коэффициент репликации равный 1, то при сбое не сможем ни читать, ни писать данные.
С другой стороны, если укажем его равным 10, то никто не поблагодарит нас за увеличивающиеся затраты на хранение.
Чем больше коэффициент репликации, тем:
1. Надёжнее наша система. Неважно, откажет диск у конкретного брокера или вся стойка, мы продолжим чтение и запись.
2. Дольше происходит запись. При
acks=all
мы должны получить подтверждение от всех реплик, что влияет на задержку продюсеров.3. Больше трафика между брокерами.
4. Дороже хранение.
Общепринятым для production среды является коэффициент репликации равный 3.
Он обеспечивает наилучшее соотношение между надёжностью системы и затратами на её поддержание.
Допустим мы запускаем кластер с коэффициентом репликации 3 в облачной среде, в которой есть 3 зоны доступности. Зоны доступности в облаке принято рассматривать как отдельные стойки. Значит для брокеров в конкретной зоне мы должны указать одинаковый
broker.rack
.У любого облачного провайдера есть окна обслуживания и соглашение об уровне обслуживания (SLA). Например, у одного отечественного провайдера SLA на запись в кластер Apache Kafka составляет 99,95%.
Это значит, что в определённый момент какая-либо зона может быть недоступна из-за обслуживания. Но у нас остаётся ещё две зоны. И даже если одна из них выйдет из строя уже из-за какой-либо аварии, то мы всё равно сможем продолжить работу.
Конечно, никто не гарантирует, что и третья зона не выйдет из строя. Но как, и в любых распределённых системах, в Apache Kafka мы работаем с вероятностями и компромиссами.
#topic #partition #replica
@kafka_cooker
❤2
Зачем брокер использует zero-copy? Часть 1
Apache Kafka хранит сообщения в файлах на жёстком диске.
Если потребитель хочет получить сообщения с определённого офсета, то брокер должен сначала прочитать эти сообщения из файла, а затем отправить их потребителю.
Как бы мы это реализовали? Получили
С точки зрения операционной системы такой подход выглядит немного сложнее:
1. Системный вызов
2. Данные из kernel space (page cache) копируются в user space, завершается вызов
3. Системный вызов
4. Завершается вызов
В итоге мы имеем 4 переключения контекста и 4 копии данных.
kernel space выглядит лишним звеном, но это не так. При чтении он значительно повышает производительность, если запрашиваемый объем данных меньше его размера. А при записи - позволяет завершить её асинхронно.
Но если размер запрашиваемых данных значительно превышает размер kernel space, то это может стать узким местом в производительности. Данные копируются несколько раз между диском, kernel space и user space.
#broker #internal
@kafka_cooker
Apache Kafka хранит сообщения в файлах на жёстком диске.
Если потребитель хочет получить сообщения с определённого офсета, то брокер должен сначала прочитать эти сообщения из файла, а затем отправить их потребителю.
Как бы мы это реализовали? Получили
InputStream
для файла и OutputStream
для сокета, создали временный массив байт, прочитали в него данные из файла InputStream.read(...)
и записали из него данные в сокет OutputStream.write(...)
.С точки зрения операционной системы такой подход выглядит немного сложнее:
1. Системный вызов
read(...)
переключает контекст с user mode в kernel mode. DMA копирует данные с диска в kernel space (page cache).2. Данные из kernel space (page cache) копируются в user space, завершается вызов
read(...)
и контекст переключается обратно с kernel mode в user mode.3. Системный вызов
write(...)
вновь переключает контекст с user mode в kernel mode. Данные копируются из user space в kernel space (socket buffer).4. Завершается вызов
write(...)
и контекст переключается обратно с kernel mode в user mode. Асинхронно DMA копирует данные из kernel space (socket buffer) в network interface controller.В итоге мы имеем 4 переключения контекста и 4 копии данных.
kernel space выглядит лишним звеном, но это не так. При чтении он значительно повышает производительность, если запрашиваемый объем данных меньше его размера. А при записи - позволяет завершить её асинхронно.
Но если размер запрашиваемых данных значительно превышает размер kernel space, то это может стать узким местом в производительности. Данные копируются несколько раз между диском, kernel space и user space.
#broker #internal
@kafka_cooker
👍1
Зачем брокер использует zero-copy? Часть 2
Часть 1
Подход zero-copy стремится решить эту проблему и повысить производительность за счет устранения избыточных копий данных.
Вместо двух системных вызовов
Ядро Linux < 2.4:
1. DMA копирует данные с диска в page cache.
2. Данные из page cache копируются в socket buffer.
3. DMA копирует данные из socket buffer в network interface controller.
Ядро Linux >= 2.4 и network interface controller поддерживает gather operations:
1. DMA копирует данные с диска в page cache.
2. В socket buffer записываются не данные, а только дескрипторы с информацией о расположении и размере данных.
3. DMA на основе полученных дескрипторов копирует данные из page cache в network interface controller.
В итоге мы имеем 2 переключения контекста и 2 копии данных.
Если рассмотреть вариант когда данные уже были в page cache (например, несколько групп потребителей вычитывают одни и те же сообщения), то такой подход позволяет потребителям вычитывать сообщения со скоростью, приближающейся к пределу скорости сетевого подключения.
В java мы можем применять такой подход путём использования nio и, в частности, метода
Kafka использует его в классе
Возможность применения zero-copy достигается тем, что Kafka использует единый формат сообщений для брокеров и клиентов. Иначе преобразование сообщений приходилось бы делать в user space, выходя за рамки kernel space.
Поэтому если мы настроим SSL, то будет использоваться
#broker #internal
@kafka_cooker
Часть 1
Подход zero-copy стремится решить эту проблему и повысить производительность за счет устранения избыточных копий данных.
Вместо двух системных вызовов
read(...)
и write(...)
мы сделаем один системный вызов sendfile(...)
.sendfile(...)
не будет копировать данные в user space, а ограничится только kernel space.Ядро Linux < 2.4:
1. DMA копирует данные с диска в page cache.
2. Данные из page cache копируются в socket buffer.
3. DMA копирует данные из socket buffer в network interface controller.
Ядро Linux >= 2.4 и network interface controller поддерживает gather operations:
1. DMA копирует данные с диска в page cache.
2. В socket buffer записываются не данные, а только дескрипторы с информацией о расположении и размере данных.
3. DMA на основе полученных дескрипторов копирует данные из page cache в network interface controller.
В итоге мы имеем 2 переключения контекста и 2 копии данных.
Если рассмотреть вариант когда данные уже были в page cache (например, несколько групп потребителей вычитывают одни и те же сообщения), то такой подход позволяет потребителям вычитывать сообщения со скоростью, приближающейся к пределу скорости сетевого подключения.
В java мы можем применять такой подход путём использования nio и, в частности, метода
FileChannel.transferTo(...)
.Kafka использует его в классе
org.apache.kafka.common.network.PlaintextTransportLayer
.Возможность применения zero-copy достигается тем, что Kafka использует единый формат сообщений для брокеров и клиентов. Иначе преобразование сообщений приходилось бы делать в user space, выходя за рамки kernel space.
Поэтому если мы настроим SSL, то будет использоваться
org.apache.kafka.common.network.SslTransportLayer
, который будет шифровать данные в user space. Тем самым мы лишимся преимуществ zero-copy. На самом деле существует системный вызов SSL_sendfile(...)
, но текущая версия Kafka его не поддерживает.#broker #internal
@kafka_cooker
Асинхронный ли send на самом деле?
Когда мы смотрим на сигнатуру метода и видим, что он возвращает
Прежде чем будет создан объект
1. Получение metadata кластера
2. Сериализация key и value
3. Определение партиции (здесь мы подробнее рассматривали этот шаг)
4. Выделение памяти под новый batch, если существующий уже полностью заполнен
5. Добавление сообщения в batch
Все эти шаги выполняются не только синхронно, но и, в некоторых случаях, могут блокировать thread, на котором был вызван метод
Metadata кластера берётся из кэша, но может выполниться запрос на её обновление.
При использовании schema registry для сериализации, схемы тоже берутся из кэша, но могут выполняться запросы на их получение. Библиотека от confluent использует блокирующий http-клиент.
Если пользователь реализовал свою стратегию определения партиций, то, в зависимости от его фантазии, не исключены запросы во внешние системы.
Если доступной памяти под новый batch недостаточно, то будет ожидание её освобождения.
Существует несколько параметров, влияющих на работу
В большинстве случаев, синхронные шаги, выполняемые до старта асинхронной логики, пройдут без блокировок. Но мы от них не застрахованы, поэтому лучше не вызывать метод
#producer
@kafka_cooker
Когда мы смотрим на сигнатуру метода и видим, что он возвращает
Future
, то считаем его асинхронным и, возможно, даже неблокирующим. Но с методом Producer.send(...)
всё обстоит немного сложнее.Прежде чем будет создан объект
Future
и произойдёт отправка данных брокеру, будут выполнены следующие шаги:1. Получение metadata кластера
2. Сериализация key и value
3. Определение партиции (здесь мы подробнее рассматривали этот шаг)
4. Выделение памяти под новый batch, если существующий уже полностью заполнен
5. Добавление сообщения в batch
Все эти шаги выполняются не только синхронно, но и, в некоторых случаях, могут блокировать thread, на котором был вызван метод
send
.Metadata кластера берётся из кэша, но может выполниться запрос на её обновление.
При использовании schema registry для сериализации, схемы тоже берутся из кэша, но могут выполняться запросы на их получение. Библиотека от confluent использует блокирующий http-клиент.
Если пользователь реализовал свою стратегию определения партиций, то, в зависимости от его фантазии, не исключены запросы во внешние системы.
Если доступной памяти под новый batch недостаточно, то будет ожидание её освобождения.
Существует несколько параметров, влияющих на работу
send
.max.block.ms
(60сек) - общее время ожидания получения metadata кластера и выделения памяти под новый batch. Если получение metadata заняло X, то выделение памяти должно занять не более max.block.ms - X. Время затраченное на сериализацию и определение партиции не учитывается.buffer.memory
(32МБ) - размер памяти, используемой для хранения сообщений, прежде чем они будут отправлены брокеру. Если памяти недостаточно, то будет ожидание max.block.ms
её освобождения.delivery.timeout.ms
(120сек) - максимальный интервал времени между возвратом из метода send
и вызовом callback-функции (переданной в этот метод) с сообщением об успехе или ошибке. Включает время, затраченное на повторные попытки отправки сообщения. Не должен превышать сумму параметров request.timeout.ms
и linger.ms
.request.timeout.ms
(30сек) - время ожидания ответа от брокера.retry.backoff.ms
(100мс) - время между повторными попытками отправки сообщения.В большинстве случаев, синхронные шаги, выполняемые до старта асинхронной логики, пройдут без блокировок. Но мы от них не застрахованы, поэтому лучше не вызывать метод
send
на epoll.#producer
@kafka_cooker
👍2
Где брокер хранит сообщения и как их удалить?
При создании топика партиции распределяются между брокерам (здесь мы рассматривали это подробнее). Брокер разбивает каждую партицию на сегменты. Каждый сегмент хранится в отдельном файле с расширением
Параметр
Для каждой партиции брокер держит открытые дескрипторы в режиме чтения для всех сегментов и только один дескриптор в режиме записи. Этот файл, открытый в режиме записи, является активным сегментом, то есть сегментом, в который производится запись сообщений. В один момент времени может быть только один активный сегмент. Он не может быть удалён.
По умолчанию каждый сегмент содержит или 1ГБ данных
Формат данных в log файлах соответствует формату сообщений, передаваемых между брокерами и клиентами. Поэтому брокер может применять оптимизацию zero-copy (здесь мы рассматривали это подробнее).
Потребители могут вычитывать сообщения с любого офсета и с любого времени. Чтобы брокер мог быстро находить эти сообщения во множестве сегментов, существует два типа индексов.
Первый задает соответствие офсета файлу сегмента и месту в этом файле. Хранится в файле с расширением
Второй - сопоставляет временные метки с офсетами. Хранится в файле с расширением
Существует три политики очистки, задаваемых параметром
1. delete. Является политикой по умолчанию. Удаляет неактивные сегменты или по времени
При настройке этих параметров нужно обращать внимание на настройки сегмента
2. compact. Удаляет сообщения в неактивных сегментах с одинаковыми ключами за исключением самого последнего. Отсюда и название - сжатие. Нельзя использовать null ключи.
Выполняется в фоне на
Позволяет полностью удалить все сообщения по определённому ключу, если в качестве value передать null. Это особое сообщение, называемое tombstone, будет храниться в течении
Политика compact бывает полезна, например, когда приложение хранит своё состояние в kafka и при перезапуске его восстанавливает. В этом случае приложению не нужна вся история состояний, а нужно только последнее актуальное состояние.
3. compact,delete. Объединяет в себе две политики. Сегменты будут сжиматься по политике compact, а после удаляться по политике delete. В какой-то мере страхует нас, если мы ошиблись и стали отправлять все уникальные ключи.
#broker #log
@kafka_cooker
При создании топика партиции распределяются между брокерам (здесь мы рассматривали это подробнее). Брокер разбивает каждую партицию на сегменты. Каждый сегмент хранится в отдельном файле с расширением
.log
.Параметр
log.dirs
задаёт список директорий для хранения этих файлов.Для каждой партиции брокер держит открытые дескрипторы в режиме чтения для всех сегментов и только один дескриптор в режиме записи. Этот файл, открытый в режиме записи, является активным сегментом, то есть сегментом, в который производится запись сообщений. В один момент времени может быть только один активный сегмент. Он не может быть удалён.
По умолчанию каждый сегмент содержит или 1ГБ данных
log.segment.bytes
, или данные за одну неделю log.roll.ms
, в зависимости от того, что наступит раньше. При достижении этого лимита текущий сегмент перестаёт быть активным, его файл переоткрывается в режиме чтения, создаётся новый файл, который соответствует новому активному сегменту.Формат данных в log файлах соответствует формату сообщений, передаваемых между брокерами и клиентами. Поэтому брокер может применять оптимизацию zero-copy (здесь мы рассматривали это подробнее).
Потребители могут вычитывать сообщения с любого офсета и с любого времени. Чтобы брокер мог быстро находить эти сообщения во множестве сегментов, существует два типа индексов.
Первый задает соответствие офсета файлу сегмента и месту в этом файле. Хранится в файле с расширением
.index
.Второй - сопоставляет временные метки с офсетами. Хранится в файле с расширением
.timeindex
.Существует три политики очистки, задаваемых параметром
log.cleanup.policy
.1. delete. Является политикой по умолчанию. Удаляет неактивные сегменты или по времени
log.retention.ms
, или по размеру log.retention.bytes
, в зависимости от того, что наступит раньше.log.retention.ms
(7 дней) удаляет неактивные сегменты, когда разница между текущим временем и временем последнего изменения сегмента (обычно соответствует времени закрытия сегмента) достигло указанного предела.log.retention.bytes
(-1) удаляет неактивные сегменты, когда общий размер сообщений в партиции достиг указанного предела.При настройке этих параметров нужно обращать внимание на настройки сегмента
log.segment.bytes
, log.roll.ms
и т.п. Иначе может получиться ситуация, когда сегмент всё ещё активный, в нём присутствуют сообщения за 5 дней, а мы настроили удаление за 1 день. В этом случае никакого удаления не произойдёт.2. compact. Удаляет сообщения в неактивных сегментах с одинаковыми ключами за исключением самого последнего. Отсюда и название - сжатие. Нельзя использовать null ключи.
Выполняется в фоне на
log.cleaner.threads
по достижении min.cleanable.dirty.ratio
(отношение несжатых сообщений к общему размеру всех сообщений).Позволяет полностью удалить все сообщения по определённому ключу, если в качестве value передать null. Это особое сообщение, называемое tombstone, будет храниться в течении
log.cleaner.delete.retention.ms
(24 часа).Политика compact бывает полезна, например, когда приложение хранит своё состояние в kafka и при перезапуске его восстанавливает. В этом случае приложению не нужна вся история состояний, а нужно только последнее актуальное состояние.
3. compact,delete. Объединяет в себе две политики. Сегменты будут сжиматься по политике compact, а после удаляться по политике delete. В какой-то мере страхует нас, если мы ошиблись и стали отправлять все уникальные ключи.
#broker #log
@kafka_cooker
🔥5
GUI для Apache Kafka
Во время разработки, отладки и тестирования часто возникает необходимость быстро посмотреть, а что же там вообще происходит в топиках, какие, откуда и куда сообщения отправляются.
Как бы ни было богато api библиотеки kafka-clients, делать это через код неудобно и немасштабируемо в рамках команды.
Существует немало продуктов, решающих эту проблему. Например, консольный kcat, web kafka-ui или десктопный conduktor.
Разработчики могут использовать и kcat, но для аналитиков и тестировщиков это не очень удобно.
При выборе GUI для использования в своей команде, мы отталкивались в первую очередь от удобства.
По этой причине мы отказались не только от консольных утилит, но и от разных web ui, запускаемых в docker.
Долгое время мы использовали conduktor и нас всё устраивало. После известных ограничений, мы попытались перейти на offset explorer, но, объективно, он уже устарел.
Поэтому мы решили сделать свой GUI для Apache Kafka, который мог бы подойти всем участникам команды.
Atom KTool, как мы его назвали, представляет собой desktop приложение, распространяемое для Linux (deb, rpm, AppImage, tar.gz), Windows (exe, msi, portable) и macOS (dmg, pkg, tar.gz).
Позволяет:
- настраивать подключение к Apache Kafka, Schema Registry и Kafka Connect
- фильтровать сообщения по времени, офсетам и партициям
- искать сообщения по ключу и телу с помощью JavaScript функций
- публиковать сообщения
- использовать Avro, Protobuf и JSON схемы
- создавать топики и редактировать конфиги
- отслеживать потребителей
- устанавливать acl и квоты
- управлять схемами и коннекторами
- и многое другое
Ознакомиться подробнее и загрузить можно здесь https://atomone.tech/ru/ktool
По всем вопросам и проблемам можно писать @aleksey_kc
#ktool
@kafka_cooker
Во время разработки, отладки и тестирования часто возникает необходимость быстро посмотреть, а что же там вообще происходит в топиках, какие, откуда и куда сообщения отправляются.
Как бы ни было богато api библиотеки kafka-clients, делать это через код неудобно и немасштабируемо в рамках команды.
Существует немало продуктов, решающих эту проблему. Например, консольный kcat, web kafka-ui или десктопный conduktor.
Разработчики могут использовать и kcat, но для аналитиков и тестировщиков это не очень удобно.
При выборе GUI для использования в своей команде, мы отталкивались в первую очередь от удобства.
По этой причине мы отказались не только от консольных утилит, но и от разных web ui, запускаемых в docker.
Долгое время мы использовали conduktor и нас всё устраивало. После известных ограничений, мы попытались перейти на offset explorer, но, объективно, он уже устарел.
Поэтому мы решили сделать свой GUI для Apache Kafka, который мог бы подойти всем участникам команды.
Atom KTool, как мы его назвали, представляет собой desktop приложение, распространяемое для Linux (deb, rpm, AppImage, tar.gz), Windows (exe, msi, portable) и macOS (dmg, pkg, tar.gz).
Позволяет:
- настраивать подключение к Apache Kafka, Schema Registry и Kafka Connect
- фильтровать сообщения по времени, офсетам и партициям
- искать сообщения по ключу и телу с помощью JavaScript функций
- публиковать сообщения
- использовать Avro, Protobuf и JSON схемы
- создавать топики и редактировать конфиги
- отслеживать потребителей
- устанавливать acl и квоты
- управлять схемами и коннекторами
- и многое другое
Ознакомиться подробнее и загрузить можно здесь https://atomone.tech/ru/ktool
По всем вопросам и проблемам можно писать @aleksey_kc
#ktool
@kafka_cooker
🔥10👍1