Java for Beginner
743 subscribers
709 photos
200 videos
12 files
1.15K links
Канал от новичков для новичков!
Изучайте Java вместе с нами!
Здесь мы обмениваемся опытом и постоянно изучаем что-то новое!

Наш YouTube канал - https://www.youtube.com/@Java_Beginner-Dev

Наш канал на RUTube - https://rutube.ru/channel/37896292/
Download Telegram
Интерфейс Consumer<T> и метод accept

Consumer<T> — это функциональный интерфейс, представленный в Java 8 в пакете java.util.function. Он используется для выполнения действий над объектом типа T, не возвращая никакого результата. Интерфейс имеет один абстрактный метод accept(T t), который принимает объект типа T и выполняет над ним некоторое действие.

@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}


Как работает метод accept?

Метод accept — это основной метод интерфейса Consumer. Он принимает объект типа T и выполняет над ним некоторое действие. Результатом выполнения метода является void, то есть метод не возвращает никакого значения.

Пример:
Consumer<String> printUpperCase = str -> System.out.println(str.toUpperCase());
printUpperCase.accept("hello"); // HELLO
Здесь мы создали Consumer, который принимает строку и выводит ее в верхнем регистре. Метод accept применяется к строке "hello", и результат выводится на экран.


Плюсы и минусы использования Consumer

Плюсы:
Упрощает код, делая его более читаемым и выразительным.
Позволяет легко выполнять действия над объектами, особенно в сочетании с Stream API.
Поддерживает лямбда-выражения, что делает код более компактным.


Минусы:
Может быть избыточным для простых действий, где можно обойтись обычным методом.
Требует понимания функционального программирования для эффективного использования.


Пример использования Consumer для выполнения действий над объектами

Один из самых распространенных сценариев использования Consumer — это выполнение действий над элементами коллекции.
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;

public class ConsumerExample {
public static void main(String[] args) {
List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Eve");

// Создаем Consumer для вывода имен в верхнем регистре
Consumer<String> printUpperCase = name -> System.out.println(name.toUpperCase());

// Применяем Consumer к каждому элементу списка
names.forEach(printUpperCase);
}
}
В этом примере мы используем Consumer для вывода каждого имени из списка в верхнем регистре. Метод forEach принимает Consumer и применяет его к каждому элементу списка.


#Java #Training #Medium #Functional_programming #Consumer #accept
2👌1
Более сложные сценарии использования Consumer

Метод andThen

Метод andThen позволяет комбинировать два Consumer таким образом, что сначала выполняется первый Consumer, а затем второй. Это полезно, когда нужно выполнить несколько действий над одним объектом.

Пример:
Consumer<String> printUpperCase = str -> System.out.println(str.toUpperCase());
Consumer<String> printLowerCase = str -> System.out.println(str.toLowerCase());

// Комбинируем два Consumer
Consumer<String> printBoth = printUpperCase.andThen(printLowerCase);

printBoth.accept("Hello"); // HELLO hello
В этом примере мы создали два Consumer: один выводит строку в верхнем регистре, а другой — в нижнем. Метод andThen объединяет их, и результат выполнения printBoth — это сначала вывод строки в верхнем регистре, а затем в нижнем.


Пример использования Consumer в Stream API

Consumer часто используется в Stream API для выполнения действий над элементами потока.
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;

public class StreamConsumerExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

// Создаем Consumer для вывода квадратов чисел
Consumer<Integer> printSquare = number -> System.out.println(number * number);

// Применяем Consumer к каждому элементу потока
numbers.stream().forEach(printSquare);
}
}
В этом примере мы используем Consumer для вывода квадратов чисел из списка. Метод forEach в Stream API принимает Consumer и применяет его к каждому элементу потока.


Пример использования Consumer с другими функциональными интерфейсами

Consumer можно комбинировать с другими функциональными интерфейсами, такими как Predicate, для создания более сложных сценариев.
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;

public class CombinedExample {
public static void main(String[] args) {
List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Eve");

// Создаем Predicate для фильтрации имен, начинающихся на "A"
Predicate<String> startsWithA = name -> name.startsWith("A");

// Создаем Consumer для вывода имен
Consumer<String> printName = name -> System.out.println(name);

// Фильтруем и выводим имена, начинающиеся на "A"
names.stream()
.filter(startsWithA)
.forEach(printName); // Alice
}
}
В этом примере мы используем Predicate для фильтрации имен, начинающихся на "A", и Consumer для вывода отфильтрованных имен.


#Java #Training #Medium #Functional_programming #Consumer #andThen
👍3
Apache Kafka

Consumer — группы, оффсеты, ребалансинг

Apache Kafka Consumer — это клиент, ответственный за чтение и обработку сообщений из топиков.


Group coordinator, heartbeat, session.timeout.ms

Consumer groups позволяют нескольким потребителям параллельно обрабатывать топик, распределяя партиции между ними для load balancing. Группа идентифицируется по group.id; каждый потребитель в группе — member с уникальным member.id (генерируется при join).

Group coordinator это брокер, выбранный для управления группой (hash(group.id) % num_brokers определяет координатора).

Координатор хранит состояние группы: members, assignments партиций, committed offsets (в внутренней теме __consumer_offsets).

В памяти брокера: состояние в GroupMetadataManager, с off-heap структурами для efficiency; offsets реплицированы как обычные записи.


Потребители поддерживают связь через heartbeat'ы: отдельный Heartbeat thread в KafkaConsumer посылает HeartbeatRequest каждые heartbeat.interval.ms (по умолчанию 3 сек) к координатору. Координатор отвечает, подтверждая membership. Если heartbeat не приходит в течение session.timeout.ms (по умолчанию 45 сек), member считается dead, триггеря rebalance.

В памяти потребителя: Coordinator хранит generation (epoch группы), member.id. Нюанс: высокое session.timeout.ms позволяет пережить GC-паузы или network glitches, но замедляет detection failures; низкое — приводит к frequent rebalances. При poll() heartbeat отправляется implicitly, если interval истек.


Offset management: авто-коммит vs ручной (commitSync, commitAsync)

Offsets тракают прогресс чтения: для каждой партиции — committed offset, с которого группа начнет после restart/rebalance.

Авто-коммит: Включается enable.auto.commit=true (default), с auto.commit.interval.ms (5 сек). При poll() потребитель аккумулирует processed offsets в памяти (OffsetAndMetadata), и каждые interval commit'ит их async. В памяти: Map<TopicPartition, OffsetAndMetadata> в KafkaConsumer, flushed periodically.

Trade-offs: удобно, но риск at-most-once (если crash после обработки, но до commit — потеря) или at-least-once (duplicates если commit после обработки, но перед full ack). Не используйте для critical processing.

Ручной коммит: enable.auto.commit=false. commitSync() — synchronous, блокирует до подтверждения от координатора (via OffsetCommitRequest), обеспечивает durability, но увеличивает latency. commitAsync() — asynchronous, с optional callback для error handling; faster, но требует manual retry на failures.

В памяти: offsets буферизуются до commit; при commitSync все in-flight commits ждут. Нюанс: commit только assigned партиций; при rebalance revoked offsets commit'ятся в onPartitionsRevoked. Для EOS используйте с transactions (read-process-write pattern).


#Java #middle #Kafka #Consumer
👍6
Rebalancing: Range, RoundRobin, Cooperative Sticky

Rebalancing — процесс перераспределения партиций при changes в группе (join/leave, failures). Триггерится coordinator'ом: все members посылают JoinGroupRequest, coordinator выбирает leader (первый joiner), который вычисляет assignments via assignor.

Assignors (partition.assignor.class):
- Range: Дефолт до 2.4. Партиции сортируются, делятся по range (например, для 10 partitions, 3 consumers: 0-3,4-6,7-9). Skew если num_partitions не делится evenly.
- RoundRobin: Распределяет round-robin для fairness, минимизируя skew.
- Cooperative Sticky (default с 2.4): Эволюция Sticky (минимизирует перемещения партиций). Cooperative — incremental: вместо full revoke-assign, использует несколько раундов (Eager vs Cooperative protocol). В первом раунде revoke только conflicting partitions, затем assign. Снижает downtime: consumers продолжают process revoked-later.

В памяти coordinator'а: хранит previous assignments для sticky. Нюанс: custom assignor implement ConsumerPartitionAssignor; для large groups (>100) rebalance может занять секунды из-за sync.


Static membership


Static membership избегает rebalance при restarts: group.instance.id (уникальный static ID per instance). При join с тем же ID, coordinator распознает как restart, не триггеря rebalance (если assignments unchanged).

В памяти: coordinator тракает instances в GroupMetadata. Нюанс: полезно для stateful consumers (с local state); но если instance меняет host, rebalance все равно. Комбинируйте с cooperative для minimal disruption.


Pause/Resume и backpressure

Для контроля flow: consumer.pause(Collection<TopicPartition>) останавливает fetch для партиций, но poll() продолжает heartbeat. resume() возобновляет. Используйте для backpressure: если downstream slow, pause до обработки backlog.

В памяти: paused partitions в Set<TopicPartition> в Fetcher; fetch requests skip them. Нюанс: paused не влияет на rebalance; при длительном pause lag растет, рискуя eviction из группы если max.poll.interval.ms истекает.

Backpressure: мониторьте lag, dynamically pause если queue full. В Streams это built-in via task pausing.


Метрики: lag, rebalance-latency

Consumer экспортирует метрики via KafkaConsumer.metrics():
- consumer-lag: Records-lag-max — максимальный lag (LEO - committed offset) по партициям. Вычисляется via FetchRequest с metadata. Высокий lag указывает на slow processing; мониторьте per-partition.
- rebalance-latency-avg: Среднее время rebalance (от trigger до completion). Включает join, sync, assign. Высокое — из-за large groups или slow assignors.

Другие: poll-time, commit-latency. Нюанс: используйте ConsumerMetrics для JMX; в production alert на lag > threshold или frequent rebalances.


#Java #middle #Kafka #Consumer
👍5
Пример кода

Пример на Java с subscribe и RebalanceListener для handling revoke/assign:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false"); // Manual commit

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit offsets before losing partitions
commitOffsets(); // Custom method to commit processed offsets
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Restore state or seek to offsets
/* warm-up: e.g., load local state for partitions */
}
});

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process records
consumer.commitAsync(); // Or commitSync for sync
}
} finally {
consumer.close();
}


В production: implement commitOffsets() с Map<TopicPartition, OffsetAndMetadata>; handle exceptions в listener.


Нюансы

- Избегание frequent rebalancing: Frequent rebalances (от scaling, GC, network) приводят к downtime (poll blocks во время). Static membership стабилизирует группу при restarts. CooperativeStickyAssignor минимизирует перемещения (до 80% меньше чем Range). Увеличьте session.timeout.ms до 300 сек для tolerance; heartbeat.interval.ms= session/3. Мониторьте rebalance-rate; если high — investigate app stability.

- max.poll.interval.ms и долгие операции: По умолчанию 5 мин — максимальное время между poll(). Если processing в poll() превышает (например, heavy computation), coordinator считает dead, триггеря rebalance. Решение: разбейте работу на chunks, poll() frequently; используйте pause() для long ops, но resume timely. Для очень долгих — offload в separate thread, но sync с poll(). Нюанс: в Streams это processing.guarantee=at_least_once handles.

- Обработка с сохранением ordering: Consumer гарантирует order только внутри партиции, но rebalance может нарушить если state не сохранен. В onPartitionsRevoked commit offsets и persist state (например, в external store). В onPartitionsAssigned seek(committed offset) и restore state. Для strict ordering: single-threaded per partition, или assign manually (без subscribe, используйте assign()). Нюанс: в groups с multiple consumers ordering cross-partition не гарантировано; для global order — single partition или external sorting.


#Java #middle #Kafka #Consumer
👍3🤯2