Школа Больших Данных
566 subscribers
117 photos
711 links
Канал Школы Больших Данных https://www.bigdataschool.ru/ - обучение технологиям Big Data: разработка приложений и администрирование кластеров Hadoop, Kafka, Spark, NoSQL, Python, ML и DS.
Тел: +7 (495) 41-41-121
Контакты: @Bigdataschool_msk @olga_burykh
Download Telegram
#bigdata #статьи
Борьба за качество данных с entity resolution

Результаты аналитической обработки данных напрямую зависят от их качества. Качественные данные не имеют дублей, пропусков, а также нарушений целостности, когда описание одних и тех же сущностей или их характеристик противоречат друг другу. Для реализации этого используется подход разрешения сущностей (entity resolution).

По сути, разрешение сущностей — это задача поиска каждого экземпляра сущности, например клиента, во всех корпоративных системах, приложениях и базах знаний как локально, так и в облаке.

Например, эта концепция реализуется в платформе Банка России «Знай своего клиента» (ЗСК) — сервис, с помощью которого кредитные организации могут узнать уровень риска по подозрительным операциям юридических лиц и индивидуальных предпринимателей. На основании сведений о 7 миллионах банковских клиентов, ЗСК маркирует добросовестность их финансовых операций в соответствии с ФЗ «О противодействии легализации (отмыванию) доходов, полученных преступным путем, и финансированию терроризма».

Разрешение сущностей нужно для объединения данных, относящихся к одному и тому же реальному объекту, например субъекту, объекту или другой бизнес-единицы. Конечным результатом этого процесса является единая запись для каждой сущности, содержащая всю информацию о ней, консолидированную в одном месте, без дублирующихся или противоречивых данных.

В отличие от традиционного сопоставления данных (data matching), когда записи из разных источников попарно сравниваются друг с другом, полагаясь на сопоставление атрибутов, entity resolution работает итеративно. Этот подход постоянно пополняет записи дополнительными данными для обеспечения наиболее точного представления, устанавливая связи между ними, даже при исходном низком качестве или выполненными изменениями.

Наиболее известной разновидностью подхода entity resolution сегодня стало разрешение личности (identity resolution), когда это целевым объектом объединения связанных записей является отдельный субъект – пользователь или клиент.

Маркетологи, рекламодатели и другие бизнес-пользователи уже давно хотят иметь единое представление о клиенте. Именно это стремятся предоставить соответствующие платформы данных – CDP (Customer Data Platfrom), объединяя действия и атрибуты пользователя в нескольких точках взаимодействия и системах.
Цель разрешения личности — связать все данные, как оффлайн, так и онлайн, вместе, чтобы ассоциировать каждое поведенческое действие с конкретным клиентом или профилем пользователя.

Основные техники и инструменты подходов entity resolution и identity resolution похожи.
Их можно разделить на 2 категории:
1️⃣детерминированное разрешение или сопоставление на основе правил, когда определяются точные атрибуты для унификации и дедупликации существующих записей. Детерминированное разрешение реализуется относительно просто и быстро. Оно отлично работает в простых сценариях, где данные имеют аналогичную структуру, например, почтовые индексы, адреса, номера документов и пр.
2️⃣вероятностное разрешение или нечеткое сопоставление, основанное на машинном обучении, искусственном интеллекте и прогнозирующих моделях для идентификации и унификации объектов посредством дедупликации записей. Это сегодня востребовано больше всего, т.к. данные обычно хранятся в разных форматах и местах, а точные правила их сопоставления невозможно определить заранее.

С точки зрения дата-инженера, разрешение сущностей состоит из четырех этапов:
✔️прием данных
✔️дедупликация
✔️связывание записей
✔️канонизация

Этот общий подход разрешения сущностей немного модифицируется в identity resolution, что мы рассмотрим далее.

@BigDataSchool_ru https://bigdataschool.ru/blog/news/machine-learning/entity-and-identity-resolution-tools-for-data-quality.html
#bigdata #статьи
Еще раз о разнице потоковой и пакетной парадигмы обработки данных

Пакетная обработка и потоковая обработка — это две разные парадигмы обработки данных. Они отличаются принципами работы и подходят для различных сценариев.

Пакетная обработка автоматизируют повторяющиеся задачи, периодически обрабатывая большое количество записей в одном пакете за относительно длительный период, обычно измеряемый часами, днями или неделями.

Типичное пакетное задание обрабатывает сохраненные данные и сохраняет результаты в базе данных, или в файле. Обновления или изменения применяются немедленно ко всем выбранным записям, файлам или байтам в пакете. Поэтому для пакетной парадигмы более важно эффективно управлять хранилищем данных, чем временем обработки, к которому предъявляются не очень жесткие требования. Чаще всего пакетное задание просто должно быть выполнено в течение установленного периода, например, ежедневные задания должны выполняться в течение 24 часов, а еженедельные не могут занимать больше недели. Помимо типовых бизнес-задач, например, формирование отчетов или вызов вычислений по конкретному датасету, пакетная обработка используется для сбора, компоновки и перемещения данных в ETL-процессах, в т.ч. внутри и между хранилищами облачных хранилищ.

Также задачи пакетной обработки могут использоваться вместе с окнами потоковых транзакций в гибридных конвейерах, которые выполняют внутреннюю сверку и при этом доставляют конечным пользователям обновления статуса в реальном времени.
Например, онлайн-покупки, уведомления о статусах заказа или обновления складских запасов.

В отличие от пакетных систем, которые ждут сбора и компоновки данных перед их обработкой, системы потоковой обработки обрабатывают данные сразу после их создания, генерируя выходные данные в реальном времени.

Потоковая обработка обрабатывает данные на лету по мере их поступления из различных внешних систем в режиме.
В отличие от предсказуемых источников пакетных данных, потоковые данные поступают из меняющихся источников. Потоковые данные генерируются непрерывно, обычно в больших объемах  и с высокой скоростью. Источник потоковых данных обычно состоит из непрерывных журналов с отметками времени, в которых фиксируются события по мере их возникновения. Например, события пользовательского поведения на сайте, изменения температуры в IoT-датчике, логи сервера, рекламных платформ в реальном времени, а также данных о потоках кликов из приложений и веб-сайтов.

Пакетная обработка проще потоковой и более эффективна с точки зрения масштабирования, сохранности и точного анализа данных. Также пакетная парадигма позволяет лучше утилизировать вычислительные ресурсы, выполняя сложные операции в запланированное время, например, при невысокой нагрузке без дополнительного административного вмешательства. Однако, пакетная обработка большого объема данных выполняется достаточно медленно и может стать причиной отказа при интенсивных вычислениях.

Эти проблемы решает потоковая обработка с вычислениями в реальном времени. Однако, эта парадигма сложнее в проектировании и реализации, а также чревата потерями данных, которые не обработаны в заданных границах. Кроме того, сложность и точность потоковых вычислений намного меньше, чем при пакетной обработке. Потоковая передача отлично реализует параллелизм. Пакетная обработка может обрабатывать несколько источников данных, но не одновременно. Также пакетная парадигма предполагает однородные и конечные данные.

На практике пакетная и потоковая обработка часто дополняют друг друга и могут сочетаться в одном решении. Например, потоковая обработка для сбора данных о покупках в реальном времени вместе с пакетными операциями управления запасами в ночное время и проведения ежемесячных счетов.

Вспомнив разницу пакетной и потоковой парадигмы, далее рассмотрим, какие модели обработки данных в реальном времени сегодня существуют и как они реализуются.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/spark/data-streaming-models.html
#spark #статьи
Логирование системных метрик в приложении Apache Spark

Поскольку фреймворк Apache Spark изначально предназначен для создания высоконагруженных распределенных приложений пакетной и потоковой обработки больших объемов данных, он позволяет отслеживать системные события, предоставляя инструменты мониторинга.
Есть несколько способов мониторинга Spark-приложений: веб-интерфейсы, метрики и внешние инструменты.

Однако, использование этого графического интерфейса ограничено жизненным циклом самого Spark-приложения. По умолчанию информация о списке этапов и задачах планировщика, размерах RDD и использовании памяти, а также действующих исполнителях, доступна только во время работы приложения. Чтобы просмотреть веб-интерфейс постфактум, надо установить конфигурации spark.eventLog.enabled значение true перед запуском приложения. Это настроит фреймворк на регистрацию событий приложения в постоянное хранилище.

Также для просмотра ранее накопленных метрик можно обратиться к серверу истории, который позволяет просмотреть логи приложения после его завершения, отображая информацию о конфигурации фреймворка, заданиях, этапах, исполнителях и задачах, потоках выполнения DAG, потреблении ресурсов драйвера и исполнителя.

Долго работающие приложения Structured Streaming создают много событий в системных логах. Это приводит к увеличению размера лог-файла журнала событий, что требует большого количества ресурсов для воспроизведения каждого обновления на сервере истории. Также крупные задания, такие как сложные аналитические SQL-запросы, могут создавать большие журналы событий, которые занимают много дискового пространства и даже могут привести к ошибке нехватки памяти (OOM, OutOfMemory) при загрузке пользовательских интерфейсов.

Избежать этого поможет скользящее сжатие логов, о котором мы поговорим далее.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/spark/how-to-compact-spark-event-log-files.html
#spark #статьи
Как настроить сжатие логов

Скользящее сжатие логов настраивается в файле spark-defaults.conf для следующих конфигураций:
✔️eventLog.rolling.enabled – чередование журнала событий в зависимости от размера лог-файла. Это означает регистрацию событий в новый лог-файл, когда текущий достиг максимального значения. По умолчанию этот параметр отключен);
✔️eventLog.rolling.maxFileSize – максимальный размер файла журнала событий до ее смены. По умолчанию этот параметр равен 128 МБ, а минимально возможный предел равен 10 МБ.

Также в файле конфигурации сервера истории spark-history-server.conf можно настроить параметр конфигурации spark.history.fs.eventLog.rolling.maxFilesToRetain, который указывает максимальное количество сохраняемых несжатых файлов журнала событий. По умолчанию все файлы журнала событий сохраняются. По умолчанию  значение spark.history.fs.eventLog.rolling.maxFilesToRetain будет равно бесконечности, что означает, что все файлы журнала событий сохраняются. Установка меньшего значения приводит к сжатию старых лог-файлов. Минимально возможное значение этой конфигурации равно 1, т.е. только в несжатом состоянии будет только текущий лог-файл, куда ведется регистрация событий.

Настройка этих конфигураций позволит иметь несколько лог-файлов меньшего размера вместо одного огромного. Однако, общий размер всех журналов останется прежним. Кроме того, следует помнить, что обратной стороной преимущества сжатия является потеря качества данных.

В частности, при сжатии логов некоторые события не будут отображаться в пользовательском интерфейсе. Когда происходит сжатие, сервер истории перечисляет все доступные файлы журнала событий для приложения и рассматривает лог-файлы с индексом меньше, чем у того, который сохранен в качестве объекта сжатия.
Например, если Spark-приложение имеет 5 лог-файлов и конфигурации spark.history.fs.eventLog.rolling.maxFilesToRetain задано значение 2, то для сжатия будут выбраны первые 3 лог-файла.

После выбора целей для сжатия фреймворк анализирует их, чтобы выяснить, какие события можно исключить, и переписывает эти урезанные логи в один компактный файл, отбрасывая некоторые события. Обычно при сжатии исключаются события, указывающие на устаревшие данные.
Например, события для завершенного задания и связанные с ним события этапа или задачи, а также события для исполнителя, который завершается, события для завершенного выполнения SQL-запросов и связанные с ними события задания/этапа/задачи.

После завершения перезаписи исходные лог-файлы обычно удаляются. Сервер истории может не сжимать старые файлы журнала событий, если выясняется, сжатие не экономит достаточно много места. Как правило, сжатие эффективно для потоковой обработки, поскольку каждый микропакет запускает одно или несколько заданий, которые вскоре завершаются. Для пакетных запросов сжатие не дает большого эффекта, а потому применять его не рекомендуется.
Поскольку сжатие лог-файлов добавлено во фреймворк начиная с версии 3.0, эта функция пока еще не очень стабильна. Иногда сжатие может исключить больше событий, что приведет к излишне лаконичному отображению информации в пользовательском интерфейсе на сервере истории Spark.
Поэтому включать сжатие лог-файлов следует с осторожностью.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/spark/how-to-compact-spark-event-log-files.html
#Python #статьи
Еще раз о том, что такое реестр схем Kafka и чем он полезен

Реестр схем (Schema Registry) – это модуль Confluent для Apache Kafka, который позволяет централизовано управлять схемами данных полезной нагрузки сообщений в топиках.

Приложения-продюсеры и потребители Kafka могут использовать эти схемы для обеспечения согласованности и совместимости данных по мере их эволюционного развития.
Можно сказать, что реестр схем — это ключевой компонент управления данными в потоковой системе, помогающий обеспечить их качество, соблюдение стандартов и прозрачность происхождения с поддержкой аудита и совместной работы между разными командами.
Определив схемы данных для полезной нагрузки и зарегистрировав ее в реестре, ее можно переиспользовать, частично освобождая приложение-потребитель от валидации структуры данных. Когда  продюсер отправляет события в Kafka, схема данных включается в заголовок сообщения, а Schema Registry гарантирует валидность структуры данных для конкретного топика.

Таким образом, реестр схем позволяет продюсерам и потребителям взаимодействовать в рамках четко определенного контракта данных, контролируя эволюционное развитие схемы с помощью четких и явных правил совместимости.
Также это оптимизирует полезную нагрузку по сети, передавая идентификатор схемы вместо всего определения схемы. Фактически реестр схемы состоит из REST-сервиса для проверки, хранения и получения схем в форматах AVRO, JSON Schema и Protobuf. Сериализаторы и десериализаторы этих 3-х форматов данных подключаются к клиентам Apache Kafka, т.е. приложениям-продюсерам и потребителям для хранения и извлечения схем полезной нагрузки.

Реестр схем на платформе Upstash, где развернут мой экземпляр Apache Kafka, полностью совместим с реестром схем Confluent. Поэтому его можно использовать с сериализаторами и десериализаторами io.confluent.kafka.serializers.KafkaAvroSerializer/Deserializer и io.confluent.connect.avro.AvroConverter, а также другими UI-инструментами, поддерживающими реестр схем Confluent.

Чтобы показать, как это работает, написали небольшое Python-приложение, которое рассмотрим далее.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/kafka/kafka-python-producer-example-with-schema-registry-on-upstash.html
#Airflow #статьи
Краткий ликбез по WSL и Docker для любителей Windows

Обычно я всегда запускала веб-сервер Apache AirFlow в интерактивной среде Google Colab, которая по своей сути представляет собой виртуальную машину с Unix-подобной ОС. А, чтобы получить доступ к localhost, использовала утилиту тунелирования ngrok.

С одной стороны, это было удобно, поскольку каждая Colab-среда работает изолировано: можно было не связываться с виртуальными Python-средами, чтобы избежать «ада зависимостей», когда одна библиотека конфликтует с другой, достаточно просто закрыть ненужную Colab-среду.

Однако, запускать приложения AirFlow в режиме standalone удобнее всего на собственной локальной машине, чтобы избежать задержек и других недостатков Colab. Поэтому решила запускать веб-серверы в своей собственной среде, независимо от внешних сервисов.

Избежать конфликта зависимостей, свойственных AirFlow, при запуске его на локальной машине с операционной системой Windows, можно 2-мя способами:
1️⃣использовать виртуальные среды Python;
2️⃣сделать контейнер, упаковав в него приложение со всеми зависимостями.

Далее рассмотрим именно 2-ой способ.

Чтобы запустить контейнер на операционной системе Windows, надо установить WSL - подсистему Windows для Linux, которая позволяет запускать среду Linux на Windows без отдельной виртуальной машины. С WSL можно запускать Linux в оболочке Bash с выбранным дистрибутивом, чтобы работать с CLI-интерфейсом и приложениями Linux.

В отличие от полноценной виртуальной машины, WSL требует меньше ресурсов (ЦП, памяти и хранилища), а также может обращаться к файлам Windows в Linux. Обеспечить изоляцию приложений, чтобы избежать конфликтов с зависимостями, свойственных Python-разработке, можно с помощью контейнеризации, что также хорошо поддерживается в WSL.

Сегодня наиболее популярным инструментом контейнеризации является Docker – он позволяет упаковать приложение со всем его окружением и зависимостями в контейнер, который можно запустить на любой Linux-подобной системе.

Эта платформа дает возможность отделить приложения от инфраструктуры, перезапуская их по мере необходимости. Изоляцию обеспечивает запуск каждого контейнера в отдельном процессе. Docker-образ можно развернуть несколько раз, получив независимые контейнеры с работающими приложениями.

Docker-контейнер — это запущенный и изолированный образ с возможностью временного сохранения данных, которые записываются в его верхний слой и при удалении контейнера удаляются.

Далее читайте: Создание и запуск docker-контейнера Apache AirFlow.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/airflow/airflow-docker-container-in-wsl-on-windows.html
#AirFlow #статьи
Еще раз про хуки и соединения Apache AirFlow

Доступность системы является ключевым свойством информационной безопасности. Проверить, что веб-сервис доступен, можно по статусу HTTP-ответа на GET-запрос. Чтобы делать такую проверку периодически, т.е. по расписанию, можно использовать Apache AirFlow. Этот пакетный ETL-оркестратор отлично подходит для подобных сценариев. Для синхронного взаимодействия с HTTP-серверами в AirFlow есть класс HttpHook, а для асинхронного – HttpAsyncHook.

Вообще хук (hook) в AirFlow — это высокоуровневый интерфейс к внешней платформе, который позволяет быстро общаться со сторонними системами без необходимости писать низкоуровневый код, который обращается к их API или использует специальные библиотеки. Хуки также являются строительными блоками, из которых строятся Операторы.

Хуки в AirFlow интегрируются с подключениями (Connection) для сбора учетных данных – набор параметров: имя пользователя, пароль и хост, а также тип внешней системы.

Все эти параметры можно настроить программно или в пользовательском интерфейсе фреймворка.
Например, я хочу проверять доступность нашего сайта, т.е. именно он будет выступать в роли внешней системы. Для этого надо задать его адрес в настройках подключения, чтобы использовать методы http-хуков.

Поскольку теперь я запускаю docker-контейнер с Apache AirFlow в режиме standalone на своей локальной машине, чтобы выйти в веб-интерфейс, мне достаточно обратиться к localhost и порту, на котором развернуто это приложение.
Добавить новое подключение к внешней системе надо в разделе Admin/Connections.

Внешней системой является открытый веб-сайт, а не база данных и не частное приложение, поэтому указывать порт, логин и пароль нет необходимости. Достаточно только задать имя подключения, которое будет использоваться в коде Python-оператора, тип соединения и хост.

Далее рассмотрим пример реализации: код задачи и DAG

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/airflow/airflow-hook-http-service-example.html
#архитектура #статьи
Основные сложности проектирования современной архитектуры данных

Из-за принципиальных отличий потоковой парадигмы обработки данных от пакетной, задача проектирования дата-конвейеров сильно усложняется, т.к. редко где на практике используется только один подход. При большей простоте пакетного подхода, его не всегда достаточно для бизнес-нужд.

Например, организовать пакетную выгрузку из прикладных систем в корпоративное хранилище на базе того же PostgreSQL или Greenplum с помощью AirFlow достаточно просто при соблюдении некоторых условий.

Однако, подобная инфраструктура не очень хорошо подойдет для потоковой аналитики больших данных в реальном времени, включая мониторинг событий пользовательского поведения и/или генерацию рекомендаций с помощью ML-моделей.
Однако, далеко не каждая компания обладает ресурсами и желанием их вкладывать в такие масштабные аналитические проекты.

Любую инициативу следует рассматривать, прежде всего, с точки зрения экономической эффективности: какой результат она принесет для бизнеса на текущий момент, а также в долгосрочной перспективе. И если точного ответа на эти вопросы у бизнес-заказчика нет, начинать ИТ-проект, даже при наличии технических возможностей его выполнить, не имеет смысла.

Впрочем, подобные сомнения свойственны мелким и средним компаниям, а крупному бизнесу даже не требуется доказывать ценность аналитики больших данных.
В случае корпоративного масштаба архитектору важно предложить такое решение, которое впишется в системный ландшафт и сможет расти вместе с бизнесом, гибко меняя свою производительность и набор функциональных возможностей.

Поэтому при проектировании архитектуры данных надо рассматривать предлагаемые решения со следующих точек зрения:
✔️оркестрация конвейеров обработки данных;
✔️управление хранилищами данных, включая хранилища состояний для stateful-приложений;
✔️синхронизация конвейеров и согласованность времени в потоковой обработке.

Далее подробно рассмотрим каждый из этих пунктов.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/nosql/how-to-design-modern-data-architecture.html
#NIFI #статьи
Новые процессоры Apache NiFi 2.0.0-M2

С точки зрения управления версиями, веха рассматривается как некоторое значимое обновление, контрольная точка, меняющая дальнейшее развитие проекта. Как правило, изменения из нескольких вех объединятся в одной версии.

Первая веха 2-го релиза Apache NiFi вышла в ноябре 2023 года. Спустя 2 месяца вышла 2-я веха с 4-мя новыми службами контроллера и 7-ю процессорами, а также 12 обновлениями зависимостей и критическими изменениями.

В части зависимостей обновлены Spring Framework до 6 версии, Jetty до 12, Jakarta Servlet API до 6, Jakarta XML Binding до 4, аннотации Swagger до 2 и спецификации OpenAPI до 3.0. Среди критических обновлений удалены модули сервера MiNiFi C2 и конфигурация образа Docker, процессоры InfluxDB и службы уведомлений Bootstrap, процессоры JSON-преобразований JoltTransformJSON и JoltTransformRecord перемещены из пакета nifi—standard—nar в nifi—jolt—nar. Вместо класса SimpleDateFormat для анализа и форматирования даты и времени теперь используется DateTimeFormatter. Из-за перехода с Servlet API 3 на версию 6 потребуются обновления и перекомпиляция пользовательских расширений GUI.

С точки зрения дата-инженера наиболее интересными решениями стали внедрения новых компонентов. В частности, добавлен процессор PutMongoBulk, который обеспечивает массовую запись данных в документо-оринетированную СУБД MongoDB. Ранее существовавший в NiFi процессор PutMongo мог обновлять (или добавлять) только одну запись за раз, что очень неэффективно при массовых операциях.
Другое процессор, PutMongoRecord использует настроенное средство чтения записей и схему для чтения входящего набора записей из тела FlowFile, а затем вставляет пакеты этих записей в настроенную коллекцию MongoDB. Это предполагает подробное знание структуры вставляемых JSON-документов. Поэтому новый процессор PutMongoBulk использует API самой MongoDB — BulkWrite, объединяя несколько операций в одну без каких-либо предположений о структуре самих вставляемых/изменяемых документов.

Таким образом, MongoDB обрабатывает все за одну операцию, что снижает нагрузку и повышает эффективность ETL-процессов.

Для повышения общей эффективности обработки очень больших Parquet-файлов добавлено 2 новых процессора: CalculateParquetOffsets и CalculateParquetRowGroupOffsets. Первый генерирует N файлов потока из входных данных и добавляет атрибуты со смещениями, необходимыми для чтения группы строк в содержимом FlowFile. Процессор CalculateParquetRowGroupOffsets генерирует один FlowFile из каждой группы строк входных данных и добавляет атрибуты со смещениями, необходимыми для чтения группы строк в содержимом FlowFile. Также для работы с Parquet-файлами добавлен процессор SplitParquet, который принимает в качестве параметра количество записей для разделения FlowFile. На выходе процессор генерирует несколько FlowFile с неизмененным содержимым и добавляет атрибуты со смещениями, необходимыми для чтения группы строк в содержимом.

С разделением связан еще один новый процессор Apache NiFi – FilterAttribute, который определяет, какие атрибуты FlowFile должны существовать в выбранных точках потока и уменьшает их набор. Это похоже на работу процессора процессора UpdateAttribute, который определяет набор удаляемых атрибутов только по заданному регулярному выражению. А FilterAttribute предполагает явное указание набора атрибутов, которые необходимо сохранить, что проще задания регулярного выражения.
Также в Apache NiFi 2.0.0-M2 добавлены процессоры для взаимодействия с SaaS-решением обработки обращений Zendesk и публикации сообщений в канал Slack.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/nifi/nifi-2-0-0-m2-release-overview.html
#ETL #статьи
Что такое гонка данных в дата-инженерии

Одна из главных особенностей распределенных систем – это задержка между обновлением одних и тех же данных на разных компонентах. Это очень важно при реализации конвейеров обработки данных, которые зависят друг от друга.

Проектирование таких ETL-конвейеров усложняется, если есть строгие требования к последовательности обработки данных и ссылочной целостности. А когда данные поступают с разной скоростью из разных источников с различными соглашениями об уровне обслуживания, возникает так называемая гонка данных.

Вообще состояние гонки или неопределенность параллелизма довольно характерно для распределенных систем стека Big Data из-за большого количества сервисов и огромного количества событий.

Гонка данных является одним из случаев состояния гонки, возникающим при параллельном выполнении кода в многопоточном режиме, когда поведение системы зависит от того, в каком порядке выполняются части кода.

Проблемы параллелизма обычно незаметны при разработке и тестировании, а проявляются только в производственных средах на высоких рабочих нагрузках.

Если поведение системы зависит от последовательности событий, которая не является детерминированной и явно не контролируется, состояние гонки рано или поздно возникнет. Это характерно для многопоточных и многопроцессорных систем, а также проявляется в конвейерах обработки данных в виде гонки данных, когда один процесс обращается к фрагменту данных, который в это время изменяется другим процессом.

Чтобы понять, как это может случиться, чем опасно и какие способы позволят избежать этого, далее по ссылке рассмотрим практический пример.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/airflow/how-to-avoid-race-conditions-in-etl-pipelines.html
#DWH #статьи
Слоистая структура DWH и подход Data Vault

Корпоративное хранилище данных (DWH, Data Warehouse) часто бывает гетерогенным, т.к. организованным с помощью нескольких баз данных, связанных ETL-процессами. Согласно концепции слоистой архитектуры (LSA, Layered Scalable Architecture), например, Raw-слой с исходными данными, загруженными из разных источников, можно реализовать в Greenplum, как и Transformed-слой с преобразованными, т.е. консолидированными записями.

А для аналитического слоя с витринами данных (Data Mart) со структурированными и денормализованными данными для их тематического анализа, например, сводная информация о клиентах, продажах и пр., отлично подходят колоночные СУБД типа Clickhouse или вообще конечные BI-инструменты.

С точки зрения проектирования архитектуры данных наибольший интерес представляет Transformed-слой, структуру которого можно спроектировать по-разному, используя популярные подходы к созданию корпоративного хранилища.

Из всех подходов проектирования DWH именно методология Data Vault мне нравится больше всего. Она проще якорной модели (Anchor Modeling), но также обладает высокой гибкостью и лучше подается дополнению и расширению по сравнению с классическими звездными схемами по Кимбалу и Инмону.

Согласно Data Vault, модель хранилища данных описывается следующими концепциями:
✔️Хаб (Hub)– основной бизнес-объект, сущность домена, например, клиент, продукт, заказ и пр. Уникальным идентификатором хаб-таблицы является первичный ключ, созданный на основе MD5- или SHA-1-хэша от бизнес-ключа.
✔️Ссылка или связь (Link)— отношение между хаб-таблицами. У link-таблицы нет собственных атрибутов, есть только системные метаданные, что рассмотрим позже, первичный ключ и внешние ключи связываемых хабов.
✔️Спутник (Satellite)— таблица с атрибутами хаба. Помимо описания сущности хаба, каждая запись в спутнике маркируется составным первичным ключом, состоящим из внешнего ключа на хаб или ссылку и время загрузки записи. Именно за счет спутников достигается гибкость хранилища данных по методологии Data Vault: всегда можно расширить атрибутивный состав, не меняя исходную бизнес-сущность. Также в таблицах-спутниках можно хранить историю изменения контекста, добавляя новую запись при обновлении данных в системе-источнике.

Вспомнив, что такое Data Vault, далее рассмотрим последовательность применения этого подхода к проектированию Transformed-слоя DWH на примере интернет-магазина.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/greenplum/dwh-design-with-data-vault-example.html
#DWH #статьи
Постановка задачи: анализ систем-источников

Сегодня корпоративные хранилища данных (DWH, Data Warehouse) обычно реализуются в виде нескольких баз данных, связанных ETL-процессами.

Причем каждая из этих гомогенных или гетерогенных, т.е. на одной или разных платформах, баз данных представляет собой слой LSA-архитектуры (Layered Scalable Architecture).
К примеру, Raw-слой с исходными данными, загруженными из разных источников, можно реализовать в Greenplum, поскольку эта реляционная СУБД на базе PostgreSQL поддерживает механизм массовой параллельной загрузки данных. Аналогично Transformed-слой с преобразованными, т.е. консолидированными записями тоже отлично реализуется в Greenplum.
А для аналитики большого объема уже структурированных и нормализованных данных в реальном времени лучше подойдет витрина данных (Data Mart) в виде Clickhouse или вообще BI-системы: PowerBI, Qlick или облачные сервисы типа Yandex.Datalens.

В прошлый раз мы рассматривали, как спроектировать схему данных для Transformed-слоя DWH с помощью подхода Data Vault. Однако, чтобы наполнить этот слой, консолидировав и преобразовав данные, их сперва надо залить в Raw-слой из исходных систем.

Как обычно, в качестве примера возьмем компанию электронной коммерции, которая использует несколько прикладных систем для своих бизнес-задач:
✔️MMS (Marketing Management System) — система управления скидками и маркетингом, которая содержит оперативные данные о программах лояльности: название, датах старта и окончания, товары, на которые распространяется скидка по программе лояльности и клиенты, к которым применима программа лояльности;
✔️OMS (Order Management System) — система управления заказами с данными о заказах (номер, дата и время, товары, сумма, покупатель и адрес доставки Покупатель), а также операции по изменению статуса заказа;
✔️WMS (Warehouse Management System) — система управления складом с данными о складских операциях с товаром и его локацией на конкретном складе;
✔️CMS (Customer Management System) — система управления клиентами с данными о клиентах (Email, Телефон, ФИО, дата рождения, статус) и клиентских обращениях (дата, тип, сотрудник, основание, результат);
✔️SCM (Supply Chain Management) — система управления цепочками поставок с данными по поставщикам (название, Email, Телефон, ИНН, адрес, счет) и товарам (название, единица измерения, категория, стоимость, поставщик, количество, дата поставки, дата годности);
✔️PMS (Payment Management System) — система управления платежами с данными по платежам: платежная система, тип платежа, счет отправителя, счет получателя, сумма, валюта, статус, назначение платежа.

Информация из всех этих транзакционных систем должна попасть в аналитическое хранилище.

Разумеется, схема данных в Raw-слое DWH будет отличаться от схемы в Transformed-слое, а также от исходной схемы в транзакционной системе.

Например, в Raw-слое DWH данные из одного источника могут загружаться в одну или в пару таблиц, хотя в исходной OLTP-системе они сильно нормализованы и разнесены на гораздо большее число таблиц. Поскольку названия таблиц в Raw-слое будут использоваться в качестве значений в Transformed-слое хранилища данных, рекомендуется называть их тематически.

Далее: Практическая реализация Raw-слоя DWH

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/greenplum/how-to-design-raw-layer-of-dwh-example.html
#kafka #статьи
Что учитывать при разделении топика Apache Kafka

При решении вопроса о том, стоит ли делить топик на разделы, надо учитывать следующие факторы:

✔️количество потребителей и вариативность их поведения, включая надежность и скорость обработки сообщений;
✔️разницу в пропускной способности публикации и потребления сообщений, чтобы увеличить скорость потребления в случае высокопроизводительного продюсера, масштабировав количество потребителей, т.е. увеличив число разделов;
✔️семантику партиционирования на основе какого-либо бизнес-признака, а не только с целью балансировки нагрузки;
✔️толерантность к упорядоченности сообщений, которая гарантируется только в рамках раздела, а не является сквозной по всему топику;
✔️ресурсные возможности узла, на котором развернут экземпляр Kafka, т.е. объем свободного места на диске для сохранения сообщений и памяти для буферизации.

Разумеется, этот список не является исчерпывающим, и может быть расширен другими соображениями при проектировании отдельно взятого потокового конвейера.

Однако, он содержит ключевые моменты, которые следует учесть при определении EDA-архитектуры данных на основе Apache Kafka.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/kafka/key-factors-to-define-quantity-of-partitions-in-kafka-topic.html
#ClickHouse #статьи
Шардирование в ClickHouse

Именно хранилище данных всегда является узким местом любой системы. Поэтому именно его надо расширить для повышения производительности.

Это можно сделать с помощью шардирования – горизонтального масштабирования за счет физического разделения данных на разные фрагменты (шарды, shards), которые располагаются на разных машинах. При этом создается большая распределенная distributed-таблица, которая маршрутизирует запросы к таблицам по шардам, обращаться к данным в которых можно также и напрямую.

В колоночной СУБД ClickHouse шардирование позволяет распределить фрагменты данных из одной базы по разным узлам кластера, увеличивая пропускную способность и снижая задержку обработки данных. Шард ClickHouse – это группа копий данных (реплик) для обеспечения отказоустойчивости СУБД, он состоит из одного или нескольких хостов-реплик. Поскольку шарды содержат разные части данных, для получения всех данных, нужно обращаться ко всем шардам. Для обеспечения надежности и повышения доступности данные реплицируются, т.е. дублируются по репликам. Запрос на запись или чтение в шард может быть отправлен на любую его реплику.

Поскольку в Clickhouse, в отличие от Greenplum, нецентрализованная архитектура, SQL-запрос выполняется параллельно, т.е. одновременно на всех сегментах. Например, при вставке с помощью INSERT-запроса данные асинхронно копируются с реплики, на которой он выполнен. А вот запрос на выборку с оператором SELECT отправляет подзапросы на все шарды кластера, независимо от распределения данных. Агрегатные же запросы к шардированным таблицам с оператором GROUP BY в ClickHouse выполняются так: сперва происходит агрегация на отдельных узлах и эти результаты  передаются узлу-инициатору запроса для общей сборки.
Для этого используется специальный табличный движок Distributed, который не обеспечивает хранение данных, а маршрутизирует запросы на шардированные таблицы с последующей обработкой результатов.

Записывать данные в шарды можно двумя способами:
✔️через distributed-таблицу по ключу шардирования, основанном на хэш-функции конкретного поля, диапазоне значений или вычисляемом динамически, в зависимости от нагрузки;
✔️непосредственное обращение к шардированным таблицам, чтобы потом считывать данные через distributed-таблицу.

Далее читайте: Особенности работы с Distributed-движком

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/clickhouse/clickhouse-sharding.html
#kafka #статьи
Изменения в брокерах, продюсера, контроллерах и Admin Client

27 февраля выпущена версия Apache Kafka 3.7, которая содержит 6 новых фич, более 45 улучшений и почти 80 исправленных ошибок. Одним из достаточно значимых изменений стала обработка сбоя диска брокера JBOD в KRaft, чтобы полностью отказаться от Zookeeper. В релизе 3.7 добавлена поддержка JBOD (Just a Bunch Of Disks) в кластерах на базе KRaft, позволяющая вести несколько каталогов журналов для каждого брокера. JBOD стал важной функцией Kafka, позволяющей запускать крупные развертывания с несколькими устройствами хранения на каждого брокера. Чтобы обеспечить доступность кластера, в случае сбоя лидера раздела контроллер должен выбрать нового лидера из одной из других синхронизированных реплик.

Ранее контроллер не проверял корректность работы лидера, предполагая, что каждый брокер работает корректно, если он еще является активным членом кластера. Однако, поскольку в KRaft членство в кластере основано на своевременных запросах подтверждения, отправляемых каждым брокером активному контроллеру, а не на эфемерном zNode в /brokers/ids, как в в ZooKeeper, при сбое одного каталога журналов брокер не сможет быть ни ведущим, ни ведомым для каких-либо разделов в этом каталоге журналов. Тогда контроллер не знает, что ему необходимо обновить лидерство и ISR для реплик в этом каталоге журналов, поскольку брокер будет продолжать отправлять контрольные запросы. Поэтому поддержка JBOD в KRaft позволяет избежать необходимости делать RPC-вызовы для перечисления всех разделов в каталоге журналов. Хотя эта функция пока не рекомендуется для производственного использования, в будущем с ее помощью можно ускорить отказ от ZooKeeper.

Еще одной интересной функцией раннего доступа можно назвать новый упрощенный протокол перебалансировки потребителей, который переносит сложность с потребителя на координатора группы внутри брокера и полностью обновляет протокол, делая его инкрементным по своей природе. Он обеспечивает те же гарантии, что и текущий протокол, но лучше и эффективнее, в том числе больше не полагаясь на барьер глобальной синхронизации. Также сокращено время, необходимое клиенту для обнаружения нового лидера раздела, что сокращает сквозную задержку запросов на создание/выборку при смене лидера, включая перезапуск брокера, переназначение раздела и пр. Здесь же можно отметить про экспоненциальную отсрочку для клиентов Kafka, которая изменяет время задержки повторной попытки повтора неудачных запросов. Это позволит уменьшить медленную конвергенцию метаданных после сбоя брокера из-за перегрузки.

В новом релизе AdminClient может напрямую общаться с кворумом контроллера KRaft и добавить регистрацию контроллера для динамического изменения уровней log4j на контроллере KRaft. Это помогает при отладке сценариев, когда другие части системы не работают.

В релизе 3.7 добавлена защита транзакций на стороне сервера, предотвращающая их зависание для смещения потребителей в разделах топика. А, чтобы лучше отслеживать производительность, устранять и предотвращать проблемы с многоуровневым хранилищем Kafka, поддерживаются дополнительные метрики этой функции. Все версии клиентских запросов старше Apache Kafka 2.1 теперь помечены как устаревшие и вводит новые метрики для мониторинга наличия таких запросов.

Поддержка этих запросов будет прекращена в выпуске 4.0.

Далее читайте: Другие разрешения и Новинки Kafka Streams и Connect.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/kafka/kafka-3-7-release-overview.html
#kafka #статьи
Новинки Kafka Streams и Connect

Для повышения надежности в Kafka Streams добавлена вторая стратегия назначения задач с учетом стойки. Теперь информация о том, на какой стойке кластера развернуты клиенты, есть в интерфейсе назначения разделов, чтобы учитывать это при их назначении на разделы топика Kafka. Это позволит сократить трафик между стойками и ускорить обработку данных. При этом протокол перебалансировки клиентов остается прежним, изменена только логика назначения: путь чтения в лидере группы на клиентах.

Для этого добавлен новый интерфейс для обработки случаев, когда резервные задачи имеют зарегистрированные хранилища состояний, загрузку пакета записей и остановку обновлений. Также конфигурации хранилища DSL по умолчанию расширена до пользовательских типов, чтобы разработчик мог подключать любые хранилища состояний, используя новый интерфейс, поддерживающий в т.ч. RocksDB и резидентные хранилища. Еще введено версионирование хранилищ состояний, обращаться к которым можно с помощью запросов VersionedKeyQuery и MultiVersionedKeyQuery. Они позволяют выполнять поиск одного ключа, запрашивать самое последнее значение, историческое значение или диапазон исторических значений для предоставленного ключа. Упрощены требования к ненулевому ключу в Kafka Streams, которые ранее не допускались.

Также добавлены новые интерактивные запросы TimestampedKeyQuery и TimestampedRangeQuery с меткой времени для хранилищ состояний «ключ-значение» с меткой времени. Это изменение повышает безопасность типов API, возвращая значение, только если оно выдано в хранилище значений ключа с меткой времени. Запросы типа RangeQueries позволяют запрашивать диапазон ключей, результаты выполнения которых теперь можно упорядочить для каждого раздела в порядке возрастания или убывания или оставить порядок неуказанным.

Также стоит отметить изменения в Kafka Connect, среди которых динамическая настройка лога для всего кластера и отдельных рабочих процессов. Новые коннекторы теперь можно создавать в состоянии «ОСТАНОВЛЕНО» или «ПАУЗА», что облегчает их миграцию. В частности, при создании коннектора у него можно заполнить новое необязательное поле с начальным состоянием.

Еще в Kafka Connect добавлен конвертер BooleanConverter, поддерживающий сериализацию и десериализацию логического примитива в схеме благодаря изменениям в org.apache.kafka.connect.storage.Converter и org.apache.kafka.connect.storage.HeaderConverter. Наконец, устарела избыточная конечная точка для получения конфигураций задач Connect, которая будет удалена в выпуске 4.0.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/kafka/kafka-3-7-release-overview.html
#nifi #статьи
Что такое Flow-Based Programming

Каждый дата-инженер, работающий с Apache NiFi, знает, что этот фреймворк поддерживает потоковую обработку информации, понимая под потоком неограниченно поступающие данные.

Однако, фундаментальные концепции NiFi основаны на ключевых идеях потоко-ориентированное программирование (FBP, Flow-Based Programming).
Эта парадигма программирования имеет длительную историю (с конца 60-х годов XX века) и использует метафору фабрики обработки данных для проектирования и создания приложений.

FBP определяет приложения как сети процессов черного ящика — компонентов, взаимодействующих друг с другом через фрагменты данных (информационные пакеты), которые перемещаются по заранее определенным соединениям.

FBP — это особый случай программирования потоков данных, характеризующийся асинхронными, параллельными процессами «под обложкой», информационными пакетами с определенным временем жизни, именованными портами, соединениями с «ограниченным буфером» и их определением, внешних по отношению к компонентам.

С математической точки зрения FBP оперирует понятиями теории графов, представляя программу как ориентированный граф, в вершинах (узлах) которого выполняются единичные вычисления, т.е. входные данные преобразуются в выходные. Узлы посылают и принимают данные через порты — точки соединения дуг (рёбер графа) и узлов.
На практике эта идея используется в нотациях функционально-событийного моделирования бизнес-процессов, например, BPMN.

Далее читайте: Как FBP-парадигма отражена в Apache NiFi.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/nifi/nifi-and-flow-based-programming.html
#spark #статьи
Где stateful-операторы Spark Structured Streaming хранят состояния?

Хотя Apache Spark Structured Streaming реализует потоковую парадигму обработки информации, он по-прежнему использует микропакеты, т.е. ограниченные наборы данных, с которыми выполняется операция.
Например, рассмотрим типовую задачу анализа данных в некотором временном окне, например, определить скользящее среднее количество запросов, поступивших из разных городов в течение 5 минут. Чтобы получить результат, необходимо знать не только данные в текущем микропакете, но и в предыдущих. Такие операции с данными называются stateful, а сами обрабатываемые при этом данные называются состоянием, которое необходимо сохранить.
В Spark Structured Streaming состояния хранятся в хранилищах состояний исполнителей, потребляя их ресурсы: память и дисковое пространство.

В Apache Spark есть две встроенные реализации провайдера хранилища состояний:
✔️распределенная файловая система Hadoop (HDFS), используемая по умолчанию. Здесь все данные на первом этапе MapReduce-вычислений сохраняются в памяти, а затем в файлах в файловой системе, совместимой с HDFS. В реализации HDFSBackedStateStore данные о состоянии хранятся в памяти JVM исполнителей, а большое количество объектов состояния создает нагрузку на память JVM, вызывая большие паузы в работе сборщика мусора (Garbage Collector).
✔️RocksDB – реализация на основе key-value БД, добавленная в Apache Spark с версии 3.2. Она позволяет избежать проблем с JVM при множестве ключей для stateful-операций, когда сборка мусора приостанавливается, вызывая большие различия во времени микропакетной обработки. Вместо хранения состояния в памяти JVM используется NoSQL-хранилище RocksDB для эффективного управления состоянием в собственной памяти и на локальном диске. При этом любые изменения этого состояния автоматически сохраняются структурированной потоковой передачей в указанное местоположение контрольной точки, обеспечивая полные гарантии отказоустойчивости.

Именно RocksDB рекомендуется использовать в качестве хранилища состояний для производственных рабочих нагрузок, поскольку со временем размер состояния обычно увеличивается и превышает миллионы ключей. Использование RocksDB позволяет избежать проблем с памятью, связанных с кучей JVM, или замедления работы из-за сборки мусора, что характерно для HDFS.

Как показали бенчмаркинговые тесты, проведенные дата-инженерами компании Databricks, stateful-операции, сохраняющие состояние в RocksDB, выполняются в несколько раз быстрее по сравнению с использованием HDFS.
Такая скорость достигается за счет адаптации внутренних механизмов RocksDB к особенностям Spark Structured Streaming, что мы рассмотрим далее.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/spark/rocksdb-as-state-backend-in-spark-structured-streaming.html
#SQL #статьи
Варианты использования и ограничения JDBC-драйвера

Новый JDBC-драйвер 6 версии рекомендуется применять для всех приложений и инструментов, которые используют его предыдущие версии (4 или 5), а также имеют интеграции с SQL-платформами миграции данных и ETL-инструментами, которые не предлагают интеграцию на основе общего Java-драйвера Neo4j.
Также он хорошо подходит для случаев, когда есть существующие интеграции, которые доступны только для чтения, но нужна и запись.

Наконец, благодаря поддержке Jakarta EE, Spring JDBCTemplate и других универсальных возможностей JDBC API 4.3, он отлично пригодится при реализации распределенных транзакций.
Наконец, JDBC-драйвер Neo4j снижает порог входа в эту NoSQL-технологию для разработчиков, которые знакомы с JDBC и хотят продолжать использовать этот API, но с Cypher и графовой СУБД. При этом нет необходимости перепроектировать приложение, созданное на основе общего Java-драйвера Neo4j. Если экосистема уже обеспечивает интеграцию более высокого уровня на основе общего Java-драйвера Neo4j, например Spring Data Neo4j (SDN) для Spring, переход пройдет безболезненно. Однако, JDBC-драйвер не стоит использовать, когда приложение и так работает с общим Java-драйвером и изменения не требуются.

Также стоит отметить ограничения JDBC-драйвер Neo4j:
✔️метаданные базы данных извлекаются максимально эффективно с использованием существующих методов схемы Neo4j, таких как labels(), db.schema.nodeTypeProperties();
✔️узлы с несколькими метками не сопоставляются с названиями таблиц, в отличие от узлов с одной меткой;
✔️нет надежного способа всегда определять тип данных для свойств на узлах, не читая их все, чего драйвер не делает;
✔️некоторые функции JDBC еще не поддерживаются, например, CallableStatement(), а некоторые функции не будут поддерживаться никогда;
✔️транслятор SQL-запросов в Cypher поддерживает только ограниченное подмножество предложений и конструкций, которые могут быть семантически эквивалентны при переводе в Cypher;
✔️нет единого правильного способа сопоставить JOIN-утверждения с отношениями;
✔️не обеспечивается автоматическое изменение или выравнивание наборов результатов, как это было в предыдущей версии. При запросе узлов, связей, путей или сопоставлений, которые должны использовать метод getObject() в наборах результатов, их надо привести к соответствующему типу, который есть в пакете neo4j.jdbc.values. Однако, транслятор SQL в Cypher при подключении к базе данных выяснит, какие свойства имеют метки, и превратит в отдельные столбцы узлов и связей, как это ожидается при выполнении оператора SELECT.

Впрочем, несмотря на эти ограничения, новый JDBC-драйвер Neo4j предоставляет разработчику довольно много возможностей по работе с этой графовой СУБД. Например, другие драйверы с возможность перевода SQL в Cypher доступны только для чтения и не поддерживают операторы записи.
Поэтому они не могут использоваться для сценариев использования ETL, направленных на передачу данных в Neo4j.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/neo4j/jdbc-driver-neo4j-6-release-overview.html
#spark #статьи
Что такое assert: конструкция тестирования

При разработке PySpark-приложения дата-инженер чаще всего оперирует такими структурами данных, как датафрейм.

Датафрейм (DataFrame) – это распределенная таблица, коллекция данных, сгруппированная в именованные столбцы аналогично реляционной таблице в Spark SQL. Обычно работа с данными в PySpark включает в себя применение преобразований, агрегаций и манипуляций к датафреймам. Чтобы протестировать полученный код в Apache Spark 3.5 были добавлены утилиты проверки равенства датафреймов. Они позволяют сверить данные с ожидаемыми результатами, помогая выявить неожиданные различия и ошибки на ранних этапах процесса анализа.

На текущий момент в PySpark есть следующие функции, которые можно использовать для тестирования написанного кода распределенного приложения:
✔️assertDataFrameEqual(actual, expected[, …]) – утилита для подтверждения равенства между фактическим и ожидаемым датафреймом или списками строк с дополнительными параметрами checkRowOrder, rtol и atol;
✔️assertSchemaEqual(actual, expected) – утилита для подтверждения равенства между фактической и ожидаемой схемами датафрейма;
✔️assertPandasOnSparkEqual(actual, expected[, …]) – утилитная функция для подтверждения равенства между фактическим объектом pandas-on-Spark и ожидаемым объектом pandas-on-Spark или pandas. О разнице между классическим API pandas и его реализации в PySpark мы ранее писали здесь и здесь.

Все эти функции основаны на понятии assert – конструкции, которая позволяет проверять предположения о значениях произвольных данных в произвольном месте программы, подавая сигнал об обнаружении некорректных данных, что может привести к сбою.

Assert завершает программу сразу же после обнаружения некорректных данных, давая разработчику возможность быстро определить ошибки и исправить их. Assert может отловить ошибки в коде на этапе компиляции или во время исполнения. Проверки на этапе компиляции можно заменить аналогичными синтаксическими конструкциями во время исполнения программы, например, try-except-finally.

В большинстве языков программирования позволяют отключить assert на этапе компиляции или во время выполнения программы, поэтому влияние этих конструкций на производительность итогового продукта незначительно.

@BigDataSchool_ru
https://bigdataschool.ru/blog/news/spark/pyspark-dataframe-testing-with-assert-functions.html