#spark #статьи
Что такое checkpoint в Apache Spark и зачем он нужен
Чтобы приложение потоковой передачи было устойчиво к сбоям по внешним причинам, например, отказ JVM, Spark Streaming сохраняет промежуточные данные в отказоустойчивой системе хранения, чтобы использовать их для восстановления приложения, т.е. перезапуска драйвера. Для этого фреймворк использует механизм контрольных точек (checkpoint), который сохраняет состояние приложения в надежном хранилище, например, в распределенной файловой системе Hadoop (HDFS).
Контрольные точки – это механизм Spark Core, ядра фреймворка, которое используется для распределенных вычислений. Он позволяет перезапустить драйвер в случае сбоя с ранее вычисленным состоянием распределенных вычислений, описанным как файл RDD. Этот подход применялся в Spark Streaming — устаревшем модуле Spark для потоковой обработки на основе RDD API. Установление контрольных точек усекает происхождение RDD, подлежащего проверке, что использует Spark MLlib в итеративных алгоритмах машинного обучения, таких как ALS.
Таким образом, контрольные точки можно использовать для усечения логического плана датафрейма в итерационных алгоритмах, где план выполнения запроса растет экспоненциально. Через создание контрольной точки план разбивается на участки и сохраняется в файлах внутри каталога, установленного с помощью SparkContext.setCheckpointDir(). Контрольная точка набора данных в Spark SQL использует контрольную точку для усечения происхождения базового RDD для рассматриваемого Dataset или датафрейма.
Различают 2 вида контрольных точек в Spark:
✔️контрольная точка метаданных
✔️контрольная точка данных
Таким образом, локальная установка контрольных точек использует хранилище исполнителя для записи файлов контрольных точек, поэтому жизненный цикл исполнителя считается ненадежным. Надежная контрольная точка использует надежное хранилище данных (HDFS).
Таким образом, контрольные точки метаданных необходимы для восстановления после сбоев драйверов, а контрольные точки данных нужны для stateful-приложений, где выполняются преобразования с сохранением состояния.
Помимо этой классификации по объектам сохранения, контрольные точки также можно разделить по степени надежности каталога контрольных точек:
✔️Надежная контрольная точка, когда фактический RDD сохраняется в надежной распределенной файловой системе в каталоге, установленном с помощью метода setCheckpointDir(directory: String).
✔️Локальная контрольная точка, когда усеченный (неполный) граф происхождения RDD сохраняется в локальном хранилище исполнителя.
Однако, в этом случае восстановление после сбоев драйверов будет частичным, поскольку некоторые полученные, но необработанные данные могут быть утеряны. Впрочем, для многих сценариев это приемлемо.
Поскольку контрольная точка сохраняет данные на диске, усекая план выполнения запроса, ее можно применять для увеличения скорости его выполнения. Когда план запроса становится огромным, производительность резко снижается, поэтому можно добавить контрольные точки в некоторых стратегических точках конвейера данных. Например, при выполнении JOIN-операций: HashAggregate, ShuffleHashJoin, BroadcastHashJoin и SortMergeJoin.
Контрольная точка может быть активной или отложенной в зависимости от флага оператора eager. По умолчанию контрольная точка является активной и выполняется немедленно по запросу. Отложенная контрольная точка выполняется только при вызове действия.
Для использования контрольных точек необходимо указать ее каталог с помощью команды SparkContext.setCheckpointDir.
@BigDataSchool_ru
https://bigdataschool.ru/blog/checkpoints-in-spark-streaming.html
Что такое checkpoint в Apache Spark и зачем он нужен
Чтобы приложение потоковой передачи было устойчиво к сбоям по внешним причинам, например, отказ JVM, Spark Streaming сохраняет промежуточные данные в отказоустойчивой системе хранения, чтобы использовать их для восстановления приложения, т.е. перезапуска драйвера. Для этого фреймворк использует механизм контрольных точек (checkpoint), который сохраняет состояние приложения в надежном хранилище, например, в распределенной файловой системе Hadoop (HDFS).
Контрольные точки – это механизм Spark Core, ядра фреймворка, которое используется для распределенных вычислений. Он позволяет перезапустить драйвер в случае сбоя с ранее вычисленным состоянием распределенных вычислений, описанным как файл RDD. Этот подход применялся в Spark Streaming — устаревшем модуле Spark для потоковой обработки на основе RDD API. Установление контрольных точек усекает происхождение RDD, подлежащего проверке, что использует Spark MLlib в итеративных алгоритмах машинного обучения, таких как ALS.
Таким образом, контрольные точки можно использовать для усечения логического плана датафрейма в итерационных алгоритмах, где план выполнения запроса растет экспоненциально. Через создание контрольной точки план разбивается на участки и сохраняется в файлах внутри каталога, установленного с помощью SparkContext.setCheckpointDir(). Контрольная точка набора данных в Spark SQL использует контрольную точку для усечения происхождения базового RDD для рассматриваемого Dataset или датафрейма.
Различают 2 вида контрольных точек в Spark:
✔️контрольная точка метаданных
✔️контрольная точка данных
Таким образом, локальная установка контрольных точек использует хранилище исполнителя для записи файлов контрольных точек, поэтому жизненный цикл исполнителя считается ненадежным. Надежная контрольная точка использует надежное хранилище данных (HDFS).
Таким образом, контрольные точки метаданных необходимы для восстановления после сбоев драйверов, а контрольные точки данных нужны для stateful-приложений, где выполняются преобразования с сохранением состояния.
Помимо этой классификации по объектам сохранения, контрольные точки также можно разделить по степени надежности каталога контрольных точек:
✔️Надежная контрольная точка, когда фактический RDD сохраняется в надежной распределенной файловой системе в каталоге, установленном с помощью метода setCheckpointDir(directory: String).
✔️Локальная контрольная точка, когда усеченный (неполный) граф происхождения RDD сохраняется в локальном хранилище исполнителя.
Однако, в этом случае восстановление после сбоев драйверов будет частичным, поскольку некоторые полученные, но необработанные данные могут быть утеряны. Впрочем, для многих сценариев это приемлемо.
Поскольку контрольная точка сохраняет данные на диске, усекая план выполнения запроса, ее можно применять для увеличения скорости его выполнения. Когда план запроса становится огромным, производительность резко снижается, поэтому можно добавить контрольные точки в некоторых стратегических точках конвейера данных. Например, при выполнении JOIN-операций: HashAggregate, ShuffleHashJoin, BroadcastHashJoin и SortMergeJoin.
Контрольная точка может быть активной или отложенной в зависимости от флага оператора eager. По умолчанию контрольная точка является активной и выполняется немедленно по запросу. Отложенная контрольная точка выполняется только при вызове действия.
Для использования контрольных точек необходимо указать ее каталог с помощью команды SparkContext.setCheckpointDir.
@BigDataSchool_ru
https://bigdataschool.ru/blog/checkpoints-in-spark-streaming.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Контрольные точки в Apache Spark Streaming
Чтобы обеспечить отказоустойчивость потоковых приложений, Apache Spark использует механизм
#Flink #статьи
Зачем Apache Flink нужны сетевые буферы
Каждая запись в Flink отправляется следующей подзадаче вместе с другими записями в сетевом буфере — наименьшей единице связи между подзадачами.
Чтобы поддерживать стабильно высокую пропускную способность, Flink использует очереди сетевых буферов (текущих данных) на входной и выходной стороне процесса передачи.
Каждая подзадача имеет входную очередь, ожидающую потребления данных, и выходную очередь, ожидающую отправки данных следующей подзадаче.
Наличие большего объема передаваемых данных означает, что Flink может обеспечить более высокую и устойчивую пропускную способность конвейера. Однако, это приведет к увеличению времени прохождения контрольной точки.
Контрольные точки во Flink могут завершиться только после того, как все подзадачи пройдут все заданные барьеры. В согласованных (выровненных) контрольных точках эти барьеры перемещаются по графу задания вместе с сетевыми буферами.
Чем больше объем данных, тем дольше время распространения барьера контрольной точки. Размер невыровненных контрольных точек прямо пропорционален объему передаваемых данных, поскольку все захваченные текущие данные должны сохраняться как часть контрольной точки.
Поэтому, чтобы предупредить чрезмерное увеличение сетевого буфера из-за большого объема передаваемых данных, ранее нужно было жестко задать объем буфера с учетом модели памяти Flink-приложений.
Напомним, процесс диспетчера задач Flink представляет собой типичный JVM-процесс, память которого состоит из кучи JVM и памяти вне кучи. Эти типы памяти используются Flink напрямую или JVM для своих конкретных целей, например, метапространства.
У Flink есть два основных потребителя памяти: пользовательский код задач оператора заданий и сам фреймворк, потребляющий память для внутренних структур данных, сетевых буферов и пр.
Пользовательский код имеет прямой доступ ко всем типам памяти: JVM Heap, Direct и Native Memory. Поэтому Flink не может контролировать его распределение и использование.
Однако, существует два типа памяти вне кучи, которые используются задачами и явно контролируются Flink: управляемая память Off-Heap и сетевые буферы, которые являются частью прямой памяти JVM, выделенной для обмена данными пользовательских записей между задачами оператора.
Можно настроить распределение памяти, пропорционально разбив ее общий объем между управляемой памятью и сетевыми буферами. Оставшаяся память затем назначается куче задач, если она не задана явно, и другим фиксированным компонентам кучи JVM и компонентам вне кучи.
О том, как настроить размер сетевого буфера, читаем по ссылка далее.
@BigDataSchool_ru
https://bigdataschool.ru/blog/network-buffers-in-apache-flink.html
Зачем Apache Flink нужны сетевые буферы
Каждая запись в Flink отправляется следующей подзадаче вместе с другими записями в сетевом буфере — наименьшей единице связи между подзадачами.
Чтобы поддерживать стабильно высокую пропускную способность, Flink использует очереди сетевых буферов (текущих данных) на входной и выходной стороне процесса передачи.
Каждая подзадача имеет входную очередь, ожидающую потребления данных, и выходную очередь, ожидающую отправки данных следующей подзадаче.
Наличие большего объема передаваемых данных означает, что Flink может обеспечить более высокую и устойчивую пропускную способность конвейера. Однако, это приведет к увеличению времени прохождения контрольной точки.
Контрольные точки во Flink могут завершиться только после того, как все подзадачи пройдут все заданные барьеры. В согласованных (выровненных) контрольных точках эти барьеры перемещаются по графу задания вместе с сетевыми буферами.
Чем больше объем данных, тем дольше время распространения барьера контрольной точки. Размер невыровненных контрольных точек прямо пропорционален объему передаваемых данных, поскольку все захваченные текущие данные должны сохраняться как часть контрольной точки.
Поэтому, чтобы предупредить чрезмерное увеличение сетевого буфера из-за большого объема передаваемых данных, ранее нужно было жестко задать объем буфера с учетом модели памяти Flink-приложений.
Напомним, процесс диспетчера задач Flink представляет собой типичный JVM-процесс, память которого состоит из кучи JVM и памяти вне кучи. Эти типы памяти используются Flink напрямую или JVM для своих конкретных целей, например, метапространства.
У Flink есть два основных потребителя памяти: пользовательский код задач оператора заданий и сам фреймворк, потребляющий память для внутренних структур данных, сетевых буферов и пр.
Пользовательский код имеет прямой доступ ко всем типам памяти: JVM Heap, Direct и Native Memory. Поэтому Flink не может контролировать его распределение и использование.
Однако, существует два типа памяти вне кучи, которые используются задачами и явно контролируются Flink: управляемая память Off-Heap и сетевые буферы, которые являются частью прямой памяти JVM, выделенной для обмена данными пользовательских записей между задачами оператора.
Можно настроить распределение памяти, пропорционально разбив ее общий объем между управляемой памятью и сетевыми буферами. Оставшаяся память затем назначается куче задач, если она не задана явно, и другим фиксированным компонентам кучи JVM и компонентам вне кучи.
О том, как настроить размер сетевого буфера, читаем по ссылка далее.
@BigDataSchool_ru
https://bigdataschool.ru/blog/network-buffers-in-apache-flink.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Сетевые буферы в Apache Flink: что это такое и при чем здесь контрольные точки
Как Apache Flink обеспечивает стабильно высокую пропускную способность потоковой обработки
#kafka #статьи @BigDataSchool_ru Тестирование по Kafka.
В каком виде выполняется Kafka Connect?
В каком виде выполняется Kafka Connect?
Anonymous Quiz
29%
в виде независимых потоков
29%
в виде кластера процессов-исполнителей
29%
в виде специальных Stream-объектов
14%
в виде связанных между собой Stream-объектов
🔔Коллеги, мы - Школа Больших Данных проводим бесплатный митап по Clickhouse.
Митап рассчитан на инженеров данных, разработчиков, архитекторов БД, аналитиков.
Зарегистрироваться можно по ссылке:
https://shkola-bolshih-dannyh.timepad.ru/event/2706210/
Митап рассчитан на инженеров данных, разработчиков, архитекторов БД, аналитиков.
Зарегистрироваться можно по ссылке:
https://shkola-bolshih-dannyh.timepad.ru/event/2706210/
shkola-bolshih-dannyh.timepad.ru
Бесплатный митап "Clickhouse как основа DWH" / События на TimePad.ru
ClickHouse — это одна из самых быстро развивающихся СУБД и платформ для работы с данными.
В рамках митапа вы познакомитесь с данным инструментом: изучите основную терминологию, тенденции развития, почему Clickhouse популярен, а также познакомитесь с программой…
В рамках митапа вы познакомитесь с данным инструментом: изучите основную терминологию, тенденции развития, почему Clickhouse популярен, а также познакомитесь с программой…
#Greenplum #статьи
Как превратить Greenplum в векторную базу данных с расширением pgvector
Будучи вариацией PostgreSQL с механизмами массово-параллельной загрузки, Greenplum отлично справляется с огромным объемом данных. Однако, к хранилищам данных, используемым в ML-проектах, предъявляются особые требования. В частности, современные ИИ-решения активно используют векторные СУБД для реализации встраиваний.
Встраивание в ML представляет собой сложный объект, преобразованный в вектор — список чисел, который отражает семантические и синтаксические отношения данных. Обычно для работы со выстраиваниями используются специализированные векторные СУБД (Pinecone, Milvus, Weaviate, Enterprise-редакция key-value базы Redis, SingleStore, Relevance AI, Qdrant, Vespa и др.), однако, Greenplum тоже можно использовать для этого, если дополнить ее возможности модулем pgvector.
Модуль pgvector предоставляет возможности поиска векторного сходства, которые позволяют искать, хранить и запрашивать встраивания в больших масштабах. Этот модуль для Greenplum эквивалентен версии модуля pgvector 0.5.0, используемого с PostgreSQL. Модуль pgvector предоставляет тип данных vector и методы доступа к индексу ivfflat и hnsw.
Тип, методы, а также вспомогательные функции и операторы, предоставляемые модулем, позволяют выполнять точный и приблизительный поиск соседей и определять L2, внутренний продукт и косинусное расстояние между вложениями. Также можно использовать модуль для хранения и запроса вложений.
Тип данных vector представляет собой n-мерную координату. Каждый vector занимает 4-х кратное измерение и 8 байт памяти. Каждый элемент представляет собой число одинарной точности с плавающей запятой аналогично типу real в Greenplum, и все элементы должны быть конечными (без NaN, Infinity или -Infinity). Векторы могут иметь до 16 000 измерений.
Модуль pgvector для Greenplum имеет следующие ограничения:
✔️оптимизатор запросов GPORCA не поддерживает методы доступа к индексу ivfflat и hnsw. Запросы к таблицам, использующим эти типы индексов, возвращаются к планировщику на основе PostgreSQL.
✔️AO-таблицы, оптимизированные для добавления, не могут использовать векторные индексы;
✔️Размер индекса (вектора) может быть больше размера таблицы.
Модуль pgvector устанавливается при установке базы данных Greenplum. Но, прежде чем использовать его типы данных и метод доступа к индексу, надо зарегистрировать расширение vector в каждой базе данных.
Далее: MLOps с MPP-СУБД. Расширение PostgresML.
@BigDataSchool_ru
https://bigdataschool.ru/blog/ml-pluggins-for-greenplum.html
Как превратить Greenplum в векторную базу данных с расширением pgvector
Будучи вариацией PostgreSQL с механизмами массово-параллельной загрузки, Greenplum отлично справляется с огромным объемом данных. Однако, к хранилищам данных, используемым в ML-проектах, предъявляются особые требования. В частности, современные ИИ-решения активно используют векторные СУБД для реализации встраиваний.
Встраивание в ML представляет собой сложный объект, преобразованный в вектор — список чисел, который отражает семантические и синтаксические отношения данных. Обычно для работы со выстраиваниями используются специализированные векторные СУБД (Pinecone, Milvus, Weaviate, Enterprise-редакция key-value базы Redis, SingleStore, Relevance AI, Qdrant, Vespa и др.), однако, Greenplum тоже можно использовать для этого, если дополнить ее возможности модулем pgvector.
Модуль pgvector предоставляет возможности поиска векторного сходства, которые позволяют искать, хранить и запрашивать встраивания в больших масштабах. Этот модуль для Greenplum эквивалентен версии модуля pgvector 0.5.0, используемого с PostgreSQL. Модуль pgvector предоставляет тип данных vector и методы доступа к индексу ivfflat и hnsw.
Тип, методы, а также вспомогательные функции и операторы, предоставляемые модулем, позволяют выполнять точный и приблизительный поиск соседей и определять L2, внутренний продукт и косинусное расстояние между вложениями. Также можно использовать модуль для хранения и запроса вложений.
Тип данных vector представляет собой n-мерную координату. Каждый vector занимает 4-х кратное измерение и 8 байт памяти. Каждый элемент представляет собой число одинарной точности с плавающей запятой аналогично типу real в Greenplum, и все элементы должны быть конечными (без NaN, Infinity или -Infinity). Векторы могут иметь до 16 000 измерений.
Модуль pgvector для Greenplum имеет следующие ограничения:
✔️оптимизатор запросов GPORCA не поддерживает методы доступа к индексу ivfflat и hnsw. Запросы к таблицам, использующим эти типы индексов, возвращаются к планировщику на основе PostgreSQL.
✔️AO-таблицы, оптимизированные для добавления, не могут использовать векторные индексы;
✔️Размер индекса (вектора) может быть больше размера таблицы.
Модуль pgvector устанавливается при установке базы данных Greenplum. Но, прежде чем использовать его типы данных и метод доступа к индексу, надо зарегистрировать расширение vector в каждой базе данных.
Далее: MLOps с MPP-СУБД. Расширение PostgresML.
@BigDataSchool_ru
https://bigdataschool.ru/blog/ml-pluggins-for-greenplum.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Машинное обучение с Greenplum: обзор ML-расширений
Как использовать Greenplum в проектах машинного обучения: знакомимся с расширением PostgresML и
#Greenplum #статьи
MLOps с MPP-СУБД: расширение PostgresML
Продолжая разговор про ИИ и машинное обучение при использовании Greenplum также следует сказать про PostgresML — ML-расширение для PostgreSQL, которое позволяет выполнять обучение и вывод текстовых и табличных данных с помощью SQL-запросов.
С помощью PostgresML можно интегрировать ML-модели в базу данных VMware Greenplum и использовать возможности передовых алгоритмов для эффективной обработки данных. Модуль postgresml предоставляет функции PostgresML для использования десятков тысяч предварительно обученных ИИ-моделей с открытым исходным кодом, представленные платформой обработки данных Hugging Face AI.
Можно сказать, что PostgresML превращает PostgreSQL и Greenplum в комплексную MLOps-платформу, интегрируя в обработку данных ключевые компоненты рабочего процесса машинного обучения. Без привлечения внешних систем, т.е. не перемещая данные за пределы PostgreSQL и Greenplum, PostgresML позволяет этим СУБД работать как хранилище фичей и/или моделей, а также играть роль механизма машинного обучения и службы вывода.
Такая консолидация упрощает создание и развертывание высокопроизводительных ИИ-приложений, работающих в режиме реального времени. PostgresML поддерживает контролируемые и неконтролируемые алгоритмы, такие как регрессия, кластеризация, глубокие нейронные сети и пр.
Можно строить ML-модели, используя SQL на данных, хранящихся в PostgreSQL и Greenplum. Модели сохраняются обратно в исходную базу данных для последующего вывода с малой задержкой.
PostgresML также можно применять для работы с большими языковыми моделями, такими как GPT-3 и другими генеративными нейросетями. В частности, всего с помощью нескольких строк SQL-запросов можно использовать современные NLP-технологии для семантического поиска, анализа текста, извлечения информации, обобщения документов, перевода текста и прочих задач обработки естественного языка.
PostgresML имеет открытый исходный код, но также предлагается как полностью управляемый облачный сервис. В дополнение к API SQL он предоставляет пакеты SDK для Javascript, Python и Rust для быстрого создания векторного поиска, чат-ботов и других ML-приложений.
Например, далее (на сайте) рассмотрим код, который загружает в Greenplum набор данных и создает таблицу для их хранения, создает вложение для текста, а также загружает и запускает предварительно обученные модели.
В заключение отметим, что расширение postgresml пока поддерживает не все функции PostgresML, а лишь часть из них:
✔️load dataset() – загружает набор данных в таблицы в VMware Greenplum с помощью INSERTкоманды SQL;
✔️embed() – создает внедрение для набора данных;
✔️transform() – применяет предварительно обученный преобразователь для обработки данных.
@BigDataSchool_ru
https://bigdataschool.ru/blog/ml-pluggins-for-greenplum.html
MLOps с MPP-СУБД: расширение PostgresML
Продолжая разговор про ИИ и машинное обучение при использовании Greenplum также следует сказать про PostgresML — ML-расширение для PostgreSQL, которое позволяет выполнять обучение и вывод текстовых и табличных данных с помощью SQL-запросов.
С помощью PostgresML можно интегрировать ML-модели в базу данных VMware Greenplum и использовать возможности передовых алгоритмов для эффективной обработки данных. Модуль postgresml предоставляет функции PostgresML для использования десятков тысяч предварительно обученных ИИ-моделей с открытым исходным кодом, представленные платформой обработки данных Hugging Face AI.
Можно сказать, что PostgresML превращает PostgreSQL и Greenplum в комплексную MLOps-платформу, интегрируя в обработку данных ключевые компоненты рабочего процесса машинного обучения. Без привлечения внешних систем, т.е. не перемещая данные за пределы PostgreSQL и Greenplum, PostgresML позволяет этим СУБД работать как хранилище фичей и/или моделей, а также играть роль механизма машинного обучения и службы вывода.
Такая консолидация упрощает создание и развертывание высокопроизводительных ИИ-приложений, работающих в режиме реального времени. PostgresML поддерживает контролируемые и неконтролируемые алгоритмы, такие как регрессия, кластеризация, глубокие нейронные сети и пр.
Можно строить ML-модели, используя SQL на данных, хранящихся в PostgreSQL и Greenplum. Модели сохраняются обратно в исходную базу данных для последующего вывода с малой задержкой.
PostgresML также можно применять для работы с большими языковыми моделями, такими как GPT-3 и другими генеративными нейросетями. В частности, всего с помощью нескольких строк SQL-запросов можно использовать современные NLP-технологии для семантического поиска, анализа текста, извлечения информации, обобщения документов, перевода текста и прочих задач обработки естественного языка.
PostgresML имеет открытый исходный код, но также предлагается как полностью управляемый облачный сервис. В дополнение к API SQL он предоставляет пакеты SDK для Javascript, Python и Rust для быстрого создания векторного поиска, чат-ботов и других ML-приложений.
Например, далее (на сайте) рассмотрим код, который загружает в Greenplum набор данных и создает таблицу для их хранения, создает вложение для текста, а также загружает и запускает предварительно обученные модели.
В заключение отметим, что расширение postgresml пока поддерживает не все функции PostgresML, а лишь часть из них:
✔️load dataset() – загружает набор данных в таблицы в VMware Greenplum с помощью INSERTкоманды SQL;
✔️embed() – создает внедрение для набора данных;
✔️transform() – применяет предварительно обученный преобразователь для обработки данных.
@BigDataSchool_ru
https://bigdataschool.ru/blog/ml-pluggins-for-greenplum.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Машинное обучение с Greenplum: обзор ML-расширений
Как использовать Greenplum в проектах машинного обучения: знакомимся с расширением PostgresML и
#kafka @BigDataSchool_ru
Тест на основы Kafka
Какая ошибка может остановить синхронную фиксацию?
Тест на основы Kafka
Какая ошибка может остановить синхронную фиксацию?
Anonymous Quiz
48%
CommitOffsetException
24%
FailedCommitOffset
19%
CommitException
10%
CommitFailedException
#архитектура #статьи
Особенности и ограничения Stateless-движка
Напомним, классический NiFi предназначен для запуска большого многопользовательского приложения, в полной мере использует все предоставленные ресурсы, обеспечивая надежность потокового конвейера за счет сохранения данных на каждом этапе.
Stateless-движок поддерживает тот же API, позволяя работать со всеми процессорами и определениями потоков базового фремворка, но выполняет фиксацию данных только после успешного завершения всего потока.
Поэтому Stateless-механизм требует, чтобы источник данных был надежным и воспроизводимым, что гарантируют не все системы. Кроме того, каждый поток данных, запускаемый в Stateless-режиме, должен храниться в одном источнике и одном приемнике или пункте назначения, чтобы избежать дублирования данных. Поскольку данные в NiFi Stateless проходят через поток данных синхронно от начала до конца, использовать процессоры, которым требуется несколько FlowFile, не получится.
Например, MergeContent и MergeRecord, которым нужны все данные для выполнения слияния. Если процессор имеет данные в очереди и запускается, но не может добиться какого-либо прогресса, Stateless-механизм снова запускает исходный процессор, чтобы ему дополнительные данные. Иногда это может привести к ситуации, когда данные будут поступать постоянно, в зависимости от поведения процессора. Чтобы избежать этого, объем данных, которые могут быть переданы при одном вызове потока данных, ограничивается с помощью настроек конфигурации.
Например, если конфигурация потока данных ограничивает объем данных на один вызов, а процессор MergeContent настроен так, что ожидает определенного количества данных, поток будет продолжать запускать MergeContent без какого-либо прогресса до тех пор, пока истечет максимальный срок хранения или время потока данных.
Кроме того, в зависимости от контекста, в котором выполняется Stateless, запуск исходных компонентов может не предоставить дополнительные данные. Например, если Stateless запускается в среде, где данные ставятся в очередь во входном порту, а затем запускается поток данных, последующий запуск входного порта не приведет к созданию дополнительных данных.
Поэтому дата-инженер должен убедиться, что все потоки данных, содержащие логику для объединения FlowFiles, настроены с использованием максимального срока хранения для MergeContent и MergeRecord. В стандартном развертывании NiFi в этой ситуации обычно происходит зацикливание ошибочного соединения от исходного процессора обратно к нему же. Это приводит к тому, что процессор постоянно пытается обработать FlowFile, пока не добьется успеха. В классическом приложении NiFi получает данные и отвечает за владение ими, храня их до тех пор, пока нижестоящие службы не смогут получить эти данные. Однако, в случае с Stateless NiFi хранение не реализуется, а источник данных считается надежным и воспроизводимым, что не всегда соответствует действительности.
Кроме того, Stateless-движок не сохраняет данные после перезапуска, поэтому алгоритмы обработки сбоев могут быть разными. При использовании Stateless-механизма в случае невозможности доставить данные в нижестоящую систему следует направить FlowFile на выходной порт, а затем пометить его как порт сбоя.
Наконец, при использовании механизма без сохранения состояния потоки не должны загружать большие файлы, поскольку, в отличие от классического NiFi, содержимое FlowFile хранится не на диске, а в памяти, в куче JVM. А память не предназначена для долговременного хранения больших объемов данных.
Поэтому не рекомендуется загружать большие файлы, такие как набор данных размером 100 ГБ, в NiFi Stateless. Это приводит к ошибке OutOfMemoryError или к значительной сборке мусора, что сильно снижает производительность. Впрочем, частично обойти это ограничение можно, настроив Stateless-движок на использование репозитория контента с диска.
В следующий раз рассмотрим отличия классического механизма Apache NiFi от Statless-движка в таблице.
@BigDataSchool_ru
https://bigdataschool.ru/blog/apache-nifi-statefull-vs-stateless.html
Особенности и ограничения Stateless-движка
Напомним, классический NiFi предназначен для запуска большого многопользовательского приложения, в полной мере использует все предоставленные ресурсы, обеспечивая надежность потокового конвейера за счет сохранения данных на каждом этапе.
Stateless-движок поддерживает тот же API, позволяя работать со всеми процессорами и определениями потоков базового фремворка, но выполняет фиксацию данных только после успешного завершения всего потока.
Поэтому Stateless-механизм требует, чтобы источник данных был надежным и воспроизводимым, что гарантируют не все системы. Кроме того, каждый поток данных, запускаемый в Stateless-режиме, должен храниться в одном источнике и одном приемнике или пункте назначения, чтобы избежать дублирования данных. Поскольку данные в NiFi Stateless проходят через поток данных синхронно от начала до конца, использовать процессоры, которым требуется несколько FlowFile, не получится.
Например, MergeContent и MergeRecord, которым нужны все данные для выполнения слияния. Если процессор имеет данные в очереди и запускается, но не может добиться какого-либо прогресса, Stateless-механизм снова запускает исходный процессор, чтобы ему дополнительные данные. Иногда это может привести к ситуации, когда данные будут поступать постоянно, в зависимости от поведения процессора. Чтобы избежать этого, объем данных, которые могут быть переданы при одном вызове потока данных, ограничивается с помощью настроек конфигурации.
Например, если конфигурация потока данных ограничивает объем данных на один вызов, а процессор MergeContent настроен так, что ожидает определенного количества данных, поток будет продолжать запускать MergeContent без какого-либо прогресса до тех пор, пока истечет максимальный срок хранения или время потока данных.
Кроме того, в зависимости от контекста, в котором выполняется Stateless, запуск исходных компонентов может не предоставить дополнительные данные. Например, если Stateless запускается в среде, где данные ставятся в очередь во входном порту, а затем запускается поток данных, последующий запуск входного порта не приведет к созданию дополнительных данных.
Поэтому дата-инженер должен убедиться, что все потоки данных, содержащие логику для объединения FlowFiles, настроены с использованием максимального срока хранения для MergeContent и MergeRecord. В стандартном развертывании NiFi в этой ситуации обычно происходит зацикливание ошибочного соединения от исходного процессора обратно к нему же. Это приводит к тому, что процессор постоянно пытается обработать FlowFile, пока не добьется успеха. В классическом приложении NiFi получает данные и отвечает за владение ими, храня их до тех пор, пока нижестоящие службы не смогут получить эти данные. Однако, в случае с Stateless NiFi хранение не реализуется, а источник данных считается надежным и воспроизводимым, что не всегда соответствует действительности.
Кроме того, Stateless-движок не сохраняет данные после перезапуска, поэтому алгоритмы обработки сбоев могут быть разными. При использовании Stateless-механизма в случае невозможности доставить данные в нижестоящую систему следует направить FlowFile на выходной порт, а затем пометить его как порт сбоя.
Наконец, при использовании механизма без сохранения состояния потоки не должны загружать большие файлы, поскольку, в отличие от классического NiFi, содержимое FlowFile хранится не на диске, а в памяти, в куче JVM. А память не предназначена для долговременного хранения больших объемов данных.
Поэтому не рекомендуется загружать большие файлы, такие как набор данных размером 100 ГБ, в NiFi Stateless. Это приводит к ошибке OutOfMemoryError или к значительной сборке мусора, что сильно снижает производительность. Впрочем, частично обойти это ограничение можно, настроив Stateless-движок на использование репозитория контента с диска.
В следующий раз рассмотрим отличия классического механизма Apache NiFi от Statless-движка в таблице.
@BigDataSchool_ru
https://bigdataschool.ru/blog/apache-nifi-statefull-vs-stateless.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Классический Apache NiFi vs Stateless-движок: что и когда выбирать
Недавно мы писали, что такое Apache NiFi без сохранения состояния и чем он отличается от клас
#архитектура #статьи
Чтобы резюмировать вышеописанные отличия классического механизма Apache NiFi от Statless-движка, сравним их по следующим критериям:
✔️Ключевое назначение;
✔️Долговечность хранения данных;
✔️Порядок обработки данных;
✔️Работа с памятью (кучей JVM);
✔️Работа в режиме клиента или сервера;
✔️Особенности потребления ресурсов;
✔️Надежность происхождения данных;
✔️Варианты использования.
@BigDataSchool_ru
https://bigdataschool.ru/blog/apache-nifi-statefull-vs-stateless.html
Чтобы резюмировать вышеописанные отличия классического механизма Apache NiFi от Statless-движка, сравним их по следующим критериям:
✔️Ключевое назначение;
✔️Долговечность хранения данных;
✔️Порядок обработки данных;
✔️Работа с памятью (кучей JVM);
✔️Работа в режиме клиента или сервера;
✔️Особенности потребления ресурсов;
✔️Надежность происхождения данных;
✔️Варианты использования.
@BigDataSchool_ru
https://bigdataschool.ru/blog/apache-nifi-statefull-vs-stateless.html
#kafka @BigDataSchool_ru
Тест на основы Kafka
Что происходит при синхронной фиксации?
Тест на основы Kafka
Что происходит при синхронной фиксации?
Anonymous Quiz
38%
создается отдельный поток для параллельной работы приложения
38%
доступ к топикам блокируется из других приложений
13%
приложение полностью блокируется
13%
приложение продолжает свою работу
#kafka #статьи
Проблема сквозного шифрования в Kafka
Apache Kafka часто используется в качестве middleware-слоя для асинхронной интеграции множества приложений и для взаимодействия компонентов микросервисной архитектуры. Каждая часть такой сложной системы является независимым элементом развертывания и обычно реализована с помощью уникального стека технологий. Поэтому наивный подход с шифрованием чувствительных данных в полезной нагрузке на стороне продюсера и расшифровкой на потребителе, становится неудобным. Например, данные, зашифрованные с помощью Python-библиотеки, проблематично расшифровать с помощью Java.
Помимо обеспечения языковой совместимости, необходимо также обеспечить управление ротацией ключей. Это требует более сложного контракта интерфейса или хранения ключей в заголовках записей, проходящих через библиотеку шифрования. По мере включения в потоковый конвейер новых приложений с разными библиотеками шифрации/дешифрации данных возникает потребность в матрице совместимости, чтобы каждая команда, ответственная за разработку нового сервиса знала об ограничениях стека технологий.
Кроме постоянной поддержки такой матрицы совместимости и развития инструмента сквозного шифрования, необходимо также определять политики доступа к операции шифрования, что усложняет развертывание и интеграционное тестирование. Сквозное шифрование требует много скоординированных усилий, общего понимания, общих целей и ответственности, чего сложно достичь в крупной распределенной экосистеме.
Упростить проблему шифрования данных в Apache Kafka можно с помощью Java-приложения Conductor Gateway, которое полностью совместимо с протоколом Kafka и не зависит от провайдера платформы.
Что оно собой представляет, рассмотрим далее.
@BigDataSchool_ru
https://bigdataschool.ru/blog/what-is-conduktor-gateway-for-apache-kafka.html
Проблема сквозного шифрования в Kafka
Apache Kafka часто используется в качестве middleware-слоя для асинхронной интеграции множества приложений и для взаимодействия компонентов микросервисной архитектуры. Каждая часть такой сложной системы является независимым элементом развертывания и обычно реализована с помощью уникального стека технологий. Поэтому наивный подход с шифрованием чувствительных данных в полезной нагрузке на стороне продюсера и расшифровкой на потребителе, становится неудобным. Например, данные, зашифрованные с помощью Python-библиотеки, проблематично расшифровать с помощью Java.
Помимо обеспечения языковой совместимости, необходимо также обеспечить управление ротацией ключей. Это требует более сложного контракта интерфейса или хранения ключей в заголовках записей, проходящих через библиотеку шифрования. По мере включения в потоковый конвейер новых приложений с разными библиотеками шифрации/дешифрации данных возникает потребность в матрице совместимости, чтобы каждая команда, ответственная за разработку нового сервиса знала об ограничениях стека технологий.
Кроме постоянной поддержки такой матрицы совместимости и развития инструмента сквозного шифрования, необходимо также определять политики доступа к операции шифрования, что усложняет развертывание и интеграционное тестирование. Сквозное шифрование требует много скоординированных усилий, общего понимания, общих целей и ответственности, чего сложно достичь в крупной распределенной экосистеме.
Упростить проблему шифрования данных в Apache Kafka можно с помощью Java-приложения Conductor Gateway, которое полностью совместимо с протоколом Kafka и не зависит от провайдера платформы.
Что оно собой представляет, рассмотрим далее.
@BigDataSchool_ru
https://bigdataschool.ru/blog/what-is-conduktor-gateway-for-apache-kafka.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Зачем вам Conduktor Gateway для Apache Kafka
Как улучшить управляемость шифрования конфиденциальных данных в Apache Kafka с Conduktor Gateway: разбор Java-приложения для EDA-систем
#kafka #статьи
Что такое Conduktor Gateway
Ядро Conduktor Gateway используется в качестве транспортного уровня между клиентскими приложениями Kafka и кластерами Kafka.
Этот транспортный уровень расширяется за счет взаимодействия с Kafka, изменения данных или выполнения логических операций для повышения ценности.
Сам шлюз состоит из двух концептуальных частей: ядра и перехватчиков, обеспечивая следующие функции:
✔️виртуализация кластеров для клиентов посредством мультиарендности;
✔️шифрование на уровне полей в записях Kafka;
✔️хаос-инжиниринг для тестирования сбоев по различным сценариям;
✔️защита структуры и правильного использования среды Kafka.
Продолжим разбирать на сайте.
@BigDataSchool_ru
https://bigdataschool.ru/blog/what-is-conduktor-gateway-for-apache-kafka.html
Что такое Conduktor Gateway
Ядро Conduktor Gateway используется в качестве транспортного уровня между клиентскими приложениями Kafka и кластерами Kafka.
Этот транспортный уровень расширяется за счет взаимодействия с Kafka, изменения данных или выполнения логических операций для повышения ценности.
Сам шлюз состоит из двух концептуальных частей: ядра и перехватчиков, обеспечивая следующие функции:
✔️виртуализация кластеров для клиентов посредством мультиарендности;
✔️шифрование на уровне полей в записях Kafka;
✔️хаос-инжиниринг для тестирования сбоев по различным сценариям;
✔️защита структуры и правильного использования среды Kafka.
Продолжим разбирать на сайте.
@BigDataSchool_ru
https://bigdataschool.ru/blog/what-is-conduktor-gateway-for-apache-kafka.html
#kafka @BigDataSchool_ru
Тест на основы Kafka
В каком виде хранятся данные потоков в KSQL?
Тест на основы Kafka
В каком виде хранятся данные потоков в KSQL?
Anonymous Quiz
29%
в виде упорядоченных коллекций
59%
в виде пар ключ/значение
0%
в виде массивов
12%
в виде списков
#kafka #статьи
Шифрование и дешифрация полезной нагрузки в Apache Kafka с Conduktor Gateway
Поскольку Conduktor Gateway полностью соответствует протоколу Kafka, cквозное шифрование с этим шлюзом не требует никаких изменений в клиентских приложениях.
При этом обеспечивается полная независимость от языка и даже поддерживается работа с kafka-console-consumer.sh.
Таким образом, разработчики контролируют свои данные, не беспокоясь о безопасности, версий и совместимости на всех языках. Инженеры по безопасности могут выявлять, маркировать и применять адаптированные правила шифрования, поскольку все инструменты для этого консолидированы в одном месте. Инженеры по эксплуатации способствуют плавной интеграции усилий разработчиков и команды безопасности, делая сложный механизм операций с данными невидимым для постороннего глаза.
Такое эффективное разграничение ролей повышает эффективность каждой команды и приводит к слаженной работе, обеспечивающей безопасную и бесперебойную эксплуатацию Apache Kafka.
Рассмотрим пример, когда инженеры по безопасности определили правила шифрования/расшифровки полей password и visa в топике customers (продолжим по ссылке).
@BigDataSchool_ru
https://bigdataschool.ru/blog/what-is-conduktor-gateway-for-apache-kafka.html
Шифрование и дешифрация полезной нагрузки в Apache Kafka с Conduktor Gateway
Поскольку Conduktor Gateway полностью соответствует протоколу Kafka, cквозное шифрование с этим шлюзом не требует никаких изменений в клиентских приложениях.
При этом обеспечивается полная независимость от языка и даже поддерживается работа с kafka-console-consumer.sh.
Таким образом, разработчики контролируют свои данные, не беспокоясь о безопасности, версий и совместимости на всех языках. Инженеры по безопасности могут выявлять, маркировать и применять адаптированные правила шифрования, поскольку все инструменты для этого консолидированы в одном месте. Инженеры по эксплуатации способствуют плавной интеграции усилий разработчиков и команды безопасности, делая сложный механизм операций с данными невидимым для постороннего глаза.
Такое эффективное разграничение ролей повышает эффективность каждой команды и приводит к слаженной работе, обеспечивающей безопасную и бесперебойную эксплуатацию Apache Kafka.
Рассмотрим пример, когда инженеры по безопасности определили правила шифрования/расшифровки полей password и visa в топике customers (продолжим по ссылке).
@BigDataSchool_ru
https://bigdataschool.ru/blog/what-is-conduktor-gateway-for-apache-kafka.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Зачем вам Conduktor Gateway для Apache Kafka
Как улучшить управляемость шифрования конфиденциальных данных в Apache Kafka с Conduktor Gateway: разбор Java-приложения для EDA-систем
#архитектура #статьи
Что такое Databricks SQL
Платформа Databricks Lakehouse предоставляет комплексное решение для хранения данных. Она построена на открытых стандартах и API. Эта архитектура данных сочетает ACID-транзакции и управление данными корпоративных хранилищ данных с гибкостью и экономичностью озер данных. Databricks SQL описывает реляционное хранилище корпоративных данных, встроенное в платформу Databricks Lakehouse, которая предоставляет общие вычислительные ресурсы для бизнес-аналитики.
Платформа Databricks Lakehouse организует данные, хранящиеся в Delta Lake, в облачном объектном хранилище с помощью знакомых взаимосвязей, таких как схемы базы данных, таблицы и представления. Databricks рекомендует использовать многоуровневый подход к проверке, очистке и преобразованию данных для аналитики.
Databricks SQL предоставляет общие вычислительные ресурсы для SQL- запросов, визуализаций и панелей мониторинга, которые выполняются с таблицами в Lakehouse.
В Databricks SQL эти запросы, визуализации и дэшборды разрабатываются и выполняются с помощью встроенного редактора SQL. Регулярно используемый код SQL можно сохранить в виде фрагментов для быстрого повторного использования, а результаты запросов можно кэшировать, чтобы сократить время выполнения.
Кроме того, можно запланировать автоматическое обновление обновлений запросов, а также выдачу предупреждений при возникновении значимых изменений в данных.
Databricks SQL также позволяет аналитикам анализировать данные с помощью визуализаций и дэшбордов с возможностью перетаскивания для быстрого специального исследовательского анализа.
Databricks SQL поддерживает три типа хранилищ, каждый из которых имеет разные уровни производительности и поддержки функций:
✔️Бессерверное хранилище поддерживает все функции хранилища SQL Pro, а также расширенные функции производительности Databricks SQL. Хранилища SQL запускаются в учетной записи Databricks клиента с использованием бессерверных вычислений. Хранилища SQL не поддерживают сквозную передачу учетных данных. Databricks рекомендует использовать Unity Catalog для управления данными, который обеспечивает централизованный контроль доступа, аудит, происхождение и возможности обнаружения данных в рабочих пространствах платформы.
✔️Хранилище Pro поддерживает дополнительные функции производительности Databricks SQL и все функции Databricks SQL.
✔️Классическое хранилище поддерживает функции производительности начального уровня и ограниченный набор функций Databricks SQL.
Как и в любом реляционном хранилище, Databricks SQL поддерживается кэширование — метод повышения производительности, позволяющий избежать необходимости многократного повторного вычисления или извлечения одних и тех же данных.
В Databricks SQL кэширование может значительно ускорить выполнение запросов и минимизировать использование хранилища, что приводит к снижению затрат и более эффективному использованию ресурсов.
Далее рассмотрим: Кэширование в Lakehouse, преимущества и принципы работы.
@BigDataSchool_ru
https://bigdataschool.ru/blog/cashing-in-databricks-sql.html
Что такое Databricks SQL
Платформа Databricks Lakehouse предоставляет комплексное решение для хранения данных. Она построена на открытых стандартах и API. Эта архитектура данных сочетает ACID-транзакции и управление данными корпоративных хранилищ данных с гибкостью и экономичностью озер данных. Databricks SQL описывает реляционное хранилище корпоративных данных, встроенное в платформу Databricks Lakehouse, которая предоставляет общие вычислительные ресурсы для бизнес-аналитики.
Платформа Databricks Lakehouse организует данные, хранящиеся в Delta Lake, в облачном объектном хранилище с помощью знакомых взаимосвязей, таких как схемы базы данных, таблицы и представления. Databricks рекомендует использовать многоуровневый подход к проверке, очистке и преобразованию данных для аналитики.
Databricks SQL предоставляет общие вычислительные ресурсы для SQL- запросов, визуализаций и панелей мониторинга, которые выполняются с таблицами в Lakehouse.
В Databricks SQL эти запросы, визуализации и дэшборды разрабатываются и выполняются с помощью встроенного редактора SQL. Регулярно используемый код SQL можно сохранить в виде фрагментов для быстрого повторного использования, а результаты запросов можно кэшировать, чтобы сократить время выполнения.
Кроме того, можно запланировать автоматическое обновление обновлений запросов, а также выдачу предупреждений при возникновении значимых изменений в данных.
Databricks SQL также позволяет аналитикам анализировать данные с помощью визуализаций и дэшбордов с возможностью перетаскивания для быстрого специального исследовательского анализа.
Databricks SQL поддерживает три типа хранилищ, каждый из которых имеет разные уровни производительности и поддержки функций:
✔️Бессерверное хранилище поддерживает все функции хранилища SQL Pro, а также расширенные функции производительности Databricks SQL. Хранилища SQL запускаются в учетной записи Databricks клиента с использованием бессерверных вычислений. Хранилища SQL не поддерживают сквозную передачу учетных данных. Databricks рекомендует использовать Unity Catalog для управления данными, который обеспечивает централизованный контроль доступа, аудит, происхождение и возможности обнаружения данных в рабочих пространствах платформы.
✔️Хранилище Pro поддерживает дополнительные функции производительности Databricks SQL и все функции Databricks SQL.
✔️Классическое хранилище поддерживает функции производительности начального уровня и ограниченный набор функций Databricks SQL.
Как и в любом реляционном хранилище, Databricks SQL поддерживается кэширование — метод повышения производительности, позволяющий избежать необходимости многократного повторного вычисления или извлечения одних и тех же данных.
В Databricks SQL кэширование может значительно ускорить выполнение запросов и минимизировать использование хранилища, что приводит к снижению затрат и более эффективному использованию ресурсов.
Далее рассмотрим: Кэширование в Lakehouse, преимущества и принципы работы.
@BigDataSchool_ru
https://bigdataschool.ru/blog/cashing-in-databricks-sql.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Кэширование в Databricks SQL
Что такое Databricks SQL и как его ускорить, используя кэширование данных: типы хранилищ данны
#kafka @BigDataSchool_ru
Тест на основы Kafka.
Что такое перебалансировка данных в Kafka?
Тест на основы Kafka.
Что такое перебалансировка данных в Kafka?
Anonymous Quiz
20%
передача раздела от неактивного подписчика активному
45%
распределение данных по брокерам
10%
распределение новых подписчиков по топикам
25%
перераспределение данных по топикам
#архитектура #статьи
Кэширование в Lakehouse: преимущества и принципы работы
Кэширование ускоряет выполнение запросов благодаря сохранению их результатов или часто используемых данных в памяти или других быстрых носителях. Это особенно полезно для повторяющихся запросов, поскольку система может быстро получать кэшированные результаты вместо их повторного вычисления.
Кроме того, кэширование сводит к минимуму потребность в дополнительных вычислительных ресурсах за счет повторного использования ранее вычисленных результатов. Сокращение общего времени безотказной работы склада и потребности в дополнительных вычислительных кластерах, что приводит к экономии средств и улучшению качества ресурсов.
Databricks SQL поддерживает несколько уровней кэширования:
✔️кэш пользовательского интерфейса для оптимизации взаимодействия с пользователем в GUI путем быстрого предоставления доступа к самым последним результатам запросов и информационной панели.
Когда пользователи впервые открывают панель мониторинга или запрос SQL, в кэше отображается самый последний результат запроса, что снижает нагрузку на вычислительные ресурсы.
Это приводит к сокращению времени отклика и более удобному использованию пользовательского интерфейса.
Кэш пользовательского интерфейса важен для управления запланированными выполнениями. Когда запланировано обновление, в кэше сохраняются обновленные данные, гарантируя немедленный доступ к самой последней информации в дэшборде.
Жизненный цикл кэша составляет не более 7 дней, и кэш становится недействительным после обновления базовых таблиц.
✔️кэширование результатов включает в себя локальный и удаленный кэш результатов, которые совместно повышают производительность запросов, сохраняя результаты запросов в памяти или на удаленных носителях данных.
Локальный кэш находится в памяти, где сохраняются результаты запросов в течение всего срока службы кластера или при заполнении кэша, в зависимости от того, что наступит раньше. Этот кэш полезен для ускорения повторяющихся запросов, устраняя необходимость повторного вычисления одних и тех же результатов.
Однако, после остановки или перезапуска кластера кэш очищается и все результаты запросов удаляются.
Удаленный кэш результатов появился в в первом квартале 2023 года и представляет собой бессерверную систему кэширования, которая сохраняет результаты запросов, сохраняя их в облачном хранилище.
Удаленный кэш результатов решает распространенную проблему кэширования результатов запросов в памяти, которая остается доступной только до тех пор, пока работают вычислительные ресурсы. Удаленный кэш является постоянным общим для всех хранилищ в рабочей области Databricks. Для доступа к удаленному кэшу требуется работающее хранилище.
При обработке запроса кластер сначала просматривает свой локальный кэш, а затем при необходимости — удаленный. Только если результат запроса не кэшируется, он будет выполнен. Пока удаленный кэш результатов доступен для запросов с использованием клиентов ODBC/JDBC и API операторов SQL. Локальный и удаленный кэш после обновления базовых таблиц становится недействительным.
В противном случае максимальный жизненный цикл локального и удаленного кэша составляет 24 часа, который начинается с момента записи в кэш.
✔️Дисковый кэш (дельта-кэш) предназначен для повышения производительности запросов за счет хранения данных на диске, что позволяет ускорить чтение данных.
Данные автоматически кэшируются при извлечении файлов с использованием быстрого промежуточного формата. Сохраняя копии файлов в локальном хранилище, подключенном к вычислительным узлам, дисковый кэш обеспечивает расположение данных ближе к рабочим процессам, что приводит к повышению производительности запросов.
Также дисковый кэш автоматически обнаруживает изменения в базовых файлах данных, гарантируя актуальность.
Дисковый кэш имеет те же характеристики жизненного цикла, что и локальный кэш результатов, т.е. при остановке или перезапуске кластера кэш очищается и его необходимо заполнить заново.
@BigDataSchool_ru
https://bigdataschool.ru/blog/cashing-in-databricks-sql.html
Кэширование в Lakehouse: преимущества и принципы работы
Кэширование ускоряет выполнение запросов благодаря сохранению их результатов или часто используемых данных в памяти или других быстрых носителях. Это особенно полезно для повторяющихся запросов, поскольку система может быстро получать кэшированные результаты вместо их повторного вычисления.
Кроме того, кэширование сводит к минимуму потребность в дополнительных вычислительных ресурсах за счет повторного использования ранее вычисленных результатов. Сокращение общего времени безотказной работы склада и потребности в дополнительных вычислительных кластерах, что приводит к экономии средств и улучшению качества ресурсов.
Databricks SQL поддерживает несколько уровней кэширования:
✔️кэш пользовательского интерфейса для оптимизации взаимодействия с пользователем в GUI путем быстрого предоставления доступа к самым последним результатам запросов и информационной панели.
Когда пользователи впервые открывают панель мониторинга или запрос SQL, в кэше отображается самый последний результат запроса, что снижает нагрузку на вычислительные ресурсы.
Это приводит к сокращению времени отклика и более удобному использованию пользовательского интерфейса.
Кэш пользовательского интерфейса важен для управления запланированными выполнениями. Когда запланировано обновление, в кэше сохраняются обновленные данные, гарантируя немедленный доступ к самой последней информации в дэшборде.
Жизненный цикл кэша составляет не более 7 дней, и кэш становится недействительным после обновления базовых таблиц.
✔️кэширование результатов включает в себя локальный и удаленный кэш результатов, которые совместно повышают производительность запросов, сохраняя результаты запросов в памяти или на удаленных носителях данных.
Локальный кэш находится в памяти, где сохраняются результаты запросов в течение всего срока службы кластера или при заполнении кэша, в зависимости от того, что наступит раньше. Этот кэш полезен для ускорения повторяющихся запросов, устраняя необходимость повторного вычисления одних и тех же результатов.
Однако, после остановки или перезапуска кластера кэш очищается и все результаты запросов удаляются.
Удаленный кэш результатов появился в в первом квартале 2023 года и представляет собой бессерверную систему кэширования, которая сохраняет результаты запросов, сохраняя их в облачном хранилище.
Удаленный кэш результатов решает распространенную проблему кэширования результатов запросов в памяти, которая остается доступной только до тех пор, пока работают вычислительные ресурсы. Удаленный кэш является постоянным общим для всех хранилищ в рабочей области Databricks. Для доступа к удаленному кэшу требуется работающее хранилище.
При обработке запроса кластер сначала просматривает свой локальный кэш, а затем при необходимости — удаленный. Только если результат запроса не кэшируется, он будет выполнен. Пока удаленный кэш результатов доступен для запросов с использованием клиентов ODBC/JDBC и API операторов SQL. Локальный и удаленный кэш после обновления базовых таблиц становится недействительным.
В противном случае максимальный жизненный цикл локального и удаленного кэша составляет 24 часа, который начинается с момента записи в кэш.
✔️Дисковый кэш (дельта-кэш) предназначен для повышения производительности запросов за счет хранения данных на диске, что позволяет ускорить чтение данных.
Данные автоматически кэшируются при извлечении файлов с использованием быстрого промежуточного формата. Сохраняя копии файлов в локальном хранилище, подключенном к вычислительным узлам, дисковый кэш обеспечивает расположение данных ближе к рабочим процессам, что приводит к повышению производительности запросов.
Также дисковый кэш автоматически обнаруживает изменения в базовых файлах данных, гарантируя актуальность.
Дисковый кэш имеет те же характеристики жизненного цикла, что и локальный кэш результатов, т.е. при остановке или перезапуске кластера кэш очищается и его необходимо заполнить заново.
@BigDataSchool_ru
https://bigdataschool.ru/blog/cashing-in-databricks-sql.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Кэширование в Databricks SQL
Что такое Databricks SQL и как его ускорить, используя кэширование данных: типы хранилищ данны
#Flink #статьи
3 этапа преобразования задания Apache Flink
Задание Apache Flink проходит несколько этапов перед своим физическим выполнением:
✔️сперва пользовательский код преобразуется в потоковый граф (Stream Graph);
✔️затем узлы потокового графа преобразуются в узлы графа задания (JobGraph), который представляет собой низкоуровневое представление потока данных, принимаемое JobManager, где выполняется объединение нескольких операторов в один.
✔️наконец, в кластере с помощью JobManager запускается граф выполнения ExecutionGraph, где физической единицей является Задача.
Ресурсы выполнения во Flink определяются через слоты задач (Task Slots). Каждый менеджер задач (TaskManager) имеет один или несколько слотов задач, каждый из которых может запускать один конвейер параллельных задач.
Конвейер состоит из нескольких последовательных задач, таких как n-й параллельный экземпляр MapFunction вместе с n-м параллельным экземпляром DownloadFunction.
Flink часто выполняет последовательные задачи одновременно: для потоковых программ это происходит в любом случае, но и для пакетных программ это происходит часто.
Во время выполнения задания менеджер заданий (JobManager) отслеживает распределенные задачи, решает, когда запланировать следующую задачу или набор задач, и реагирует на завершенные задачи или сбои выполнения. JobManager получает JobGraph, который представляет собой поток данных, состоящий из операторов (JobVertex) и промежуточных результатов (IntermediateDataSet ).
У каждого оператора есть свойства, такие как параллелизм и код, который он выполняет. Кроме того, к JobGraph имеется набор подключенных библиотек, необходимых для выполнения кода операторов. JobManager преобразует JobGraph в ExecutionGraph, который представляет собой параллельную версию JobGraph. Для каждой JobVertex он содержит ExecutionVertex для каждой параллельной подзадачи.
Оператор с параллелизмом 100 будет иметь один JobVertex и 100 ExecutionVertices. ExecutionVertex отслеживает состояние выполнения конкретной подзадачи. Все ExecutionVertices из одного JobVertex хранятся в ExecutionJobVertex, который отслеживает состояние оператора в целом. Помимо вершин, ExecutionGraph также содержит общий промежуточный результат (IntermediateResult) и разделенный по разделам (IntermediateResultPartition).
Первый отслеживает состояние IntermediateDataSet, второй — состояние каждого из его разделов. С каждым ExecutionGraph связан статус задания, который указывает текущее состояние его выполнения.
Задание Flink сначала находится в состоянии создано, затем переходит в состояние выполнения и по завершении всей работы переходит в состояние завершения. В случае сбоя задание сначала переключается на сбой , при котором все запущенные задачи отменяются. Во время выполнения ExecutionGraph каждая параллельная задача проходит несколько этапов: от создания до завершения или сбоя. Задача может выполняться несколько раз, например, при устранении сбоя.
По этой причине выполнение ExecutionVertex отслеживается в Execution. Каждая ExecutionVertex имеет текущее выполнение и предыдущие исполнения.
Разобравшись с основными принципами преобразования задания, далее рассмотрим их более подробно.
@BigDataSchool_ru
https://bigdataschool.ru/blog/flink-job-lifecycle.html
3 этапа преобразования задания Apache Flink
Задание Apache Flink проходит несколько этапов перед своим физическим выполнением:
✔️сперва пользовательский код преобразуется в потоковый граф (Stream Graph);
✔️затем узлы потокового графа преобразуются в узлы графа задания (JobGraph), который представляет собой низкоуровневое представление потока данных, принимаемое JobManager, где выполняется объединение нескольких операторов в один.
✔️наконец, в кластере с помощью JobManager запускается граф выполнения ExecutionGraph, где физической единицей является Задача.
Ресурсы выполнения во Flink определяются через слоты задач (Task Slots). Каждый менеджер задач (TaskManager) имеет один или несколько слотов задач, каждый из которых может запускать один конвейер параллельных задач.
Конвейер состоит из нескольких последовательных задач, таких как n-й параллельный экземпляр MapFunction вместе с n-м параллельным экземпляром DownloadFunction.
Flink часто выполняет последовательные задачи одновременно: для потоковых программ это происходит в любом случае, но и для пакетных программ это происходит часто.
Во время выполнения задания менеджер заданий (JobManager) отслеживает распределенные задачи, решает, когда запланировать следующую задачу или набор задач, и реагирует на завершенные задачи или сбои выполнения. JobManager получает JobGraph, который представляет собой поток данных, состоящий из операторов (JobVertex) и промежуточных результатов (IntermediateDataSet ).
У каждого оператора есть свойства, такие как параллелизм и код, который он выполняет. Кроме того, к JobGraph имеется набор подключенных библиотек, необходимых для выполнения кода операторов. JobManager преобразует JobGraph в ExecutionGraph, который представляет собой параллельную версию JobGraph. Для каждой JobVertex он содержит ExecutionVertex для каждой параллельной подзадачи.
Оператор с параллелизмом 100 будет иметь один JobVertex и 100 ExecutionVertices. ExecutionVertex отслеживает состояние выполнения конкретной подзадачи. Все ExecutionVertices из одного JobVertex хранятся в ExecutionJobVertex, который отслеживает состояние оператора в целом. Помимо вершин, ExecutionGraph также содержит общий промежуточный результат (IntermediateResult) и разделенный по разделам (IntermediateResultPartition).
Первый отслеживает состояние IntermediateDataSet, второй — состояние каждого из его разделов. С каждым ExecutionGraph связан статус задания, который указывает текущее состояние его выполнения.
Задание Flink сначала находится в состоянии создано, затем переходит в состояние выполнения и по завершении всей работы переходит в состояние завершения. В случае сбоя задание сначала переключается на сбой , при котором все запущенные задачи отменяются. Во время выполнения ExecutionGraph каждая параллельная задача проходит несколько этапов: от создания до завершения или сбоя. Задача может выполняться несколько раз, например, при устранении сбоя.
По этой причине выполнение ExecutionVertex отслеживается в Execution. Каждая ExecutionVertex имеет текущее выполнение и предыдущие исполнения.
Разобравшись с основными принципами преобразования задания, далее рассмотрим их более подробно.
@BigDataSchool_ru
https://bigdataschool.ru/blog/flink-job-lifecycle.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Под капотом задания Apache Flink: 3 этапа преобразования
Как планируются и исполняются задания Apache Flink: от пользовательского Java-кода до физическ
#kafka @BigDataSchool_ru
Тест по основам Kafka.
Какие параметры используются для полного удаления потребительской группы?
Тест по основам Kafka.
Какие параметры используются для полного удаления потребительской группы?
Anonymous Quiz
14%
—erase, —group
0%
—destroy
62%
—delete, —group
24%
—destroy, —group
#kafka #статьи
6 стратегий устранения сбоев в Apache Kafka
1️⃣Первым вариантом устранения сбоев является удаление неудавшихся сообщений. Эта стратегия более известна как сброс нагрузки. Когда обработчик обратного вызова получает временную ошибку, связанную с отправленным сообщением, он удаляет связанные данные из вышестоящих структур данных и не выполняет никаких дальнейших действий. Приложение может зарегистрировать событие. Эта ситуация должна быть видна операторам извне посредством мониторинга.
2️⃣Вторым вариантом является создание обратного давления в приложении-потребителе и повторная отправка данных продюсером. Продюсер повторно отправляет сообщения об истечении времени ожидания в клиентскую библиотеку. В случае сбоя Kafka и невозможности публикации сообщений приложение-потребитель прекращает ожидать входящего трафика до тех пор, пока сообщения не начнут поступать снова. Достоинством этого решения является отсутствие внешних зависимостей и потерь сообщений.
Но недостатки тут следующие:
➖реализация повторных попыток поверх клиентских библиотек Kafka не рекомендуется, поскольку клиентская библиотека уже содержат механизм повтора, который поддерживает порядок сообщений и обеспечивает идемпотентную выработку.
➖порядок сообщений теряется;
➖потребитель может получить одну и ту же полезную нагрузку дважды, если продюсер повторно отправляет сообщение, время ожидания которого истекло после отправки брокеру и первоначально не получил ответа;
➖блокировка системы входящего трафика фактически закрывает сервис без предупреждения внешних сторон.
➖изменение конструкции приложения для оказания обратного давления может потребовать явного изменения контракта API на границе системы.
3️⃣Альтернативной стратегией является запись всех сообщений локально и их асинхронная запись в Kafka. В случае сбоя приложение-продюсер отправляет все сообщения в альтернативное локальное хранилище. Этот вариант менее сложен варианта с автоматическим выключателем: приложение может продолжать принимать входящий трафик гораздо дольше. Для возврата в нормальное рабочее состояние не требуется никакого ручного вмешательства. Нет потерь сообщений. Однако, локальное хранилище будет менее надежным, чем кластер, который оно защищает, и станет потенциальной единственной точкой отказа. Также возникает дополнительная сквозная задержка.
4️⃣Вместо этого можно отправлять сообщения об истечении тайм-аута в локальное хранилище и их загрузка в Kafka с помощью дополнительного побочного процесса. Этот вариант использует локальное хранилище в качестве канала недоставленных сообщений. Пакетный процесс импортирует эти сообщения в Kafka, как только система вернется в нормальное состояние.
Плюсами этого варианта является низкая сложность реализации, а также приложение может продолжать принимать входящий трафик намного дольше. Нет потерь сообщений, хотя их порядок теряется.
Недостатком этой стратегии становится сложность записи в локальное хранилище и необходимость ручного вмешательства для каждого экземпляра приложения, чтобы восстановить все сообщения.
5️⃣Вместо этого можно внедрить автоматический выключатель для временной отправки сообщений в локальное хранилище. Но это довольно сложно с точки зрения реализации: система отключает поток данных в Kafka в случае сбоя этой платформы и перенаправляет сообщения в локальное хранилище. Приложение воспроизводит эти сообщения из локального хранилища и повторно отправляет их после восстановления Kafka. Затем возобновляется регулярный поток.
Сложность в том, что несколько сообщений будут находиться в очередях продюсера. Проблемы с синхронизацией могут возникнуть при размыкании и замыкании автоматического выключателя. Чтобы понять, когда следует размыкать автоматический выключатель, надо знать состояние кластера на основе клиента Kafka через JMX в Java или статистику библиотеки librdkafka. Вместо этого следует отслеживать частоту ошибок в данных, например, отчеты об доставке с ошибками.
Последний 6️⃣ разберем на сайте.
@BigDataSchool_ru
https://bigdataschool.ru/blog/how-to-prevent-kafka-failures.html
6 стратегий устранения сбоев в Apache Kafka
1️⃣Первым вариантом устранения сбоев является удаление неудавшихся сообщений. Эта стратегия более известна как сброс нагрузки. Когда обработчик обратного вызова получает временную ошибку, связанную с отправленным сообщением, он удаляет связанные данные из вышестоящих структур данных и не выполняет никаких дальнейших действий. Приложение может зарегистрировать событие. Эта ситуация должна быть видна операторам извне посредством мониторинга.
2️⃣Вторым вариантом является создание обратного давления в приложении-потребителе и повторная отправка данных продюсером. Продюсер повторно отправляет сообщения об истечении времени ожидания в клиентскую библиотеку. В случае сбоя Kafka и невозможности публикации сообщений приложение-потребитель прекращает ожидать входящего трафика до тех пор, пока сообщения не начнут поступать снова. Достоинством этого решения является отсутствие внешних зависимостей и потерь сообщений.
Но недостатки тут следующие:
➖реализация повторных попыток поверх клиентских библиотек Kafka не рекомендуется, поскольку клиентская библиотека уже содержат механизм повтора, который поддерживает порядок сообщений и обеспечивает идемпотентную выработку.
➖порядок сообщений теряется;
➖потребитель может получить одну и ту же полезную нагрузку дважды, если продюсер повторно отправляет сообщение, время ожидания которого истекло после отправки брокеру и первоначально не получил ответа;
➖блокировка системы входящего трафика фактически закрывает сервис без предупреждения внешних сторон.
➖изменение конструкции приложения для оказания обратного давления может потребовать явного изменения контракта API на границе системы.
3️⃣Альтернативной стратегией является запись всех сообщений локально и их асинхронная запись в Kafka. В случае сбоя приложение-продюсер отправляет все сообщения в альтернативное локальное хранилище. Этот вариант менее сложен варианта с автоматическим выключателем: приложение может продолжать принимать входящий трафик гораздо дольше. Для возврата в нормальное рабочее состояние не требуется никакого ручного вмешательства. Нет потерь сообщений. Однако, локальное хранилище будет менее надежным, чем кластер, который оно защищает, и станет потенциальной единственной точкой отказа. Также возникает дополнительная сквозная задержка.
4️⃣Вместо этого можно отправлять сообщения об истечении тайм-аута в локальное хранилище и их загрузка в Kafka с помощью дополнительного побочного процесса. Этот вариант использует локальное хранилище в качестве канала недоставленных сообщений. Пакетный процесс импортирует эти сообщения в Kafka, как только система вернется в нормальное состояние.
Плюсами этого варианта является низкая сложность реализации, а также приложение может продолжать принимать входящий трафик намного дольше. Нет потерь сообщений, хотя их порядок теряется.
Недостатком этой стратегии становится сложность записи в локальное хранилище и необходимость ручного вмешательства для каждого экземпляра приложения, чтобы восстановить все сообщения.
5️⃣Вместо этого можно внедрить автоматический выключатель для временной отправки сообщений в локальное хранилище. Но это довольно сложно с точки зрения реализации: система отключает поток данных в Kafka в случае сбоя этой платформы и перенаправляет сообщения в локальное хранилище. Приложение воспроизводит эти сообщения из локального хранилища и повторно отправляет их после восстановления Kafka. Затем возобновляется регулярный поток.
Сложность в том, что несколько сообщений будут находиться в очередях продюсера. Проблемы с синхронизацией могут возникнуть при размыкании и замыкании автоматического выключателя. Чтобы понять, когда следует размыкать автоматический выключатель, надо знать состояние кластера на основе клиента Kafka через JMX в Java или статистику библиотеки librdkafka. Вместо этого следует отслеживать частоту ошибок в данных, например, отчеты об доставке с ошибками.
Последний 6️⃣ разберем на сайте.
@BigDataSchool_ru
https://bigdataschool.ru/blog/how-to-prevent-kafka-failures.html
#Flink #статьи
Потоковые примитивы и низкоуровневый API
На самом нижнем уровне абстракции обработка потоков с сохранением состояния реализуется с помощью примитивов – строительных блоков, которые можно отнести к одной из следующих категорий:
✔️потоки – это основы потоковой обработки. Они могут быть неограниченными или ограниченными, т.е. наборами данных фиксированного размера. Flink имеет сложные функции для обработки неограниченных потоков, а также выделенных операторов для эффективной обработки ограниченных потоков.
Flink поддерживает два способа обработки данных: потоковый (в реальном времени) по мере создания или сохранения потока в системе хранения, например, файловой системе или хранилище объектов, а также пакетный – последующая обработка ограниченного набора данных.
Приложения Flink могут обрабатывать записанные потоки или потоки в реальном времени.
✔️Обычно потоковое приложение имеет состояние. Только приложения, которые применяют преобразования к отдельным событиям, не требуют состояния. Любому приложению, выполняющему базовую бизнес-логику, необходимо запоминать события или промежуточные результаты, чтобы получить к ним доступ в более поздний момент времени, например, при получении следующего события или по истечении определенного периода времени.
Flink предоставляет примитивы состояния для различных структур данных, таких как атомарные значения, списки или сопоставления. Разработчик выбирает наиболее эффективный примитив состояния в зависимости от шаблона доступа к функции. Состояние приложения управляется и контролируется подключаемым сервером состояния, который хранит состояние в памяти или во встроенном key-value хранилище RocksDB. Также можно подключить бэкэнды с пользовательским состоянием. Механизмы контрольных точек и точек сохранения Flink гарантируют согласованность состояния приложения в случае сбоя.
Flink способен поддерживать состояние приложения размером в несколько терабайт благодаря асинхронному и инкрементному механизму контрольных точек. Flink поддерживает масштабирование stateful-приложений путем перераспределения состояния между большим или меньшим количеством рабочих процессов.
✔️время — еще один важный компонент потоковых приложений. Большинству потоков событий присуща семантика времени, поскольку каждое событие создается в определенный момент времени. Более того, многие общие потоковые вычисления основаны на времени, например, агрегирование окон, разбивка по сеансам, обнаружение шаблонов и соединения на основе времени.
Важным аспектом потоковой обработки является то, как приложение измеряет время, т. е. разницу между временем события и временем обработки.
Flink имеет богатый набор функций, связанных со временем: приложения, которые обрабатывают потоки с семантикой времени события, вычисляют результаты на основе временных меток событий.
Flink использует водяные знаки для определения времени в приложениях, реагирующих на события. Механизм watermark обеспечивает компромисс между задержкой и полнотой результатов. Для обработки запоздалых событий Flink позволяет перенаправить их через побочные выходы и обновление ранее завершенных результатов. Также фреймворк поддерживает режим вычислений, который подходит для приложений со строгими требованиями к малой задержке и допускает приблизительные результаты.
✔️ProcessFunctions — это наиболее выразительные функциональные интерфейсы, которые предлагает Flink для обработки отдельных событий из одного или двух входных потоков или событий, сгруппированных в окне.
ProcessFunctions обеспечивают детальный контроль над временем и состоянием, может произвольно изменять свое состояние и регистрировать таймеры, которые в будущем вызовут функцию обратного вызова.
Следовательно, ProcessFunctions может реализовать сложную бизнес-логику для каждого события, что требуется для многих приложений, управляемых событиями.
Также разработчик может регистрировать время событий и обратные вызовы времени обработки, чтобы выполнять сложные вычисления.
Далее читайте о Высокоуровневых API
@BigDataSchool_ru https://bigdataschool.ru/blog/flink-api-overview.html
Потоковые примитивы и низкоуровневый API
На самом нижнем уровне абстракции обработка потоков с сохранением состояния реализуется с помощью примитивов – строительных блоков, которые можно отнести к одной из следующих категорий:
✔️потоки – это основы потоковой обработки. Они могут быть неограниченными или ограниченными, т.е. наборами данных фиксированного размера. Flink имеет сложные функции для обработки неограниченных потоков, а также выделенных операторов для эффективной обработки ограниченных потоков.
Flink поддерживает два способа обработки данных: потоковый (в реальном времени) по мере создания или сохранения потока в системе хранения, например, файловой системе или хранилище объектов, а также пакетный – последующая обработка ограниченного набора данных.
Приложения Flink могут обрабатывать записанные потоки или потоки в реальном времени.
✔️Обычно потоковое приложение имеет состояние. Только приложения, которые применяют преобразования к отдельным событиям, не требуют состояния. Любому приложению, выполняющему базовую бизнес-логику, необходимо запоминать события или промежуточные результаты, чтобы получить к ним доступ в более поздний момент времени, например, при получении следующего события или по истечении определенного периода времени.
Flink предоставляет примитивы состояния для различных структур данных, таких как атомарные значения, списки или сопоставления. Разработчик выбирает наиболее эффективный примитив состояния в зависимости от шаблона доступа к функции. Состояние приложения управляется и контролируется подключаемым сервером состояния, который хранит состояние в памяти или во встроенном key-value хранилище RocksDB. Также можно подключить бэкэнды с пользовательским состоянием. Механизмы контрольных точек и точек сохранения Flink гарантируют согласованность состояния приложения в случае сбоя.
Flink способен поддерживать состояние приложения размером в несколько терабайт благодаря асинхронному и инкрементному механизму контрольных точек. Flink поддерживает масштабирование stateful-приложений путем перераспределения состояния между большим или меньшим количеством рабочих процессов.
✔️время — еще один важный компонент потоковых приложений. Большинству потоков событий присуща семантика времени, поскольку каждое событие создается в определенный момент времени. Более того, многие общие потоковые вычисления основаны на времени, например, агрегирование окон, разбивка по сеансам, обнаружение шаблонов и соединения на основе времени.
Важным аспектом потоковой обработки является то, как приложение измеряет время, т. е. разницу между временем события и временем обработки.
Flink имеет богатый набор функций, связанных со временем: приложения, которые обрабатывают потоки с семантикой времени события, вычисляют результаты на основе временных меток событий.
Flink использует водяные знаки для определения времени в приложениях, реагирующих на события. Механизм watermark обеспечивает компромисс между задержкой и полнотой результатов. Для обработки запоздалых событий Flink позволяет перенаправить их через побочные выходы и обновление ранее завершенных результатов. Также фреймворк поддерживает режим вычислений, который подходит для приложений со строгими требованиями к малой задержке и допускает приблизительные результаты.
✔️ProcessFunctions — это наиболее выразительные функциональные интерфейсы, которые предлагает Flink для обработки отдельных событий из одного или двух входных потоков или событий, сгруппированных в окне.
ProcessFunctions обеспечивают детальный контроль над временем и состоянием, может произвольно изменять свое состояние и регистрировать таймеры, которые в будущем вызовут функцию обратного вызова.
Следовательно, ProcessFunctions может реализовать сложную бизнес-логику для каждого события, что требуется для многих приложений, управляемых событиями.
Также разработчик может регистрировать время событий и обратные вызовы времени обработки, чтобы выполнять сложные вычисления.
Далее читайте о Высокоуровневых API
@BigDataSchool_ru https://bigdataschool.ru/blog/flink-api-overview.html
Курсы Big Data,Arenadata,Greenplum, Kafka и Spark
Возможности Apache Flink для разработчика: 3 API фреймворка
Какие возможности Apache Flink предоставляет разработчику и как их использовать: краткий обзор существующих API и потоковых примитивов