DataSkewer
210 subscribers
52 photos
1 video
3 files
10 links
Канал с рассуждениями и заметками о работа DE.
Download Telegram
🤯5
С наступающими новогодними праздниками, дорогие подписчики, пусть Новый Год пройдет у вас лучше чем прошлый, и в новом году вы станете умнее и лучше (желаю себе того же ха-ха).

В новогоднем посте хотел бы затронуть такую фундаментальную тему как построение и моделирование Хранилищ Данных (Data WareHouse), это ключевой элемент в работе подавляющего большинства Дата-инженеров. Вся работа инженеров данных так или иначе связана с DWH.

Существует множество методологий и подходов к созданию DWH
- Corporate Information Factory (CIF) (Корпоративная Аналитическая Платформа)
- Entity-Relationship Modeling (Отношение сущностей)
- Dimensional Modeling (Многомерное моделирование)
- Data Vault Modeling (Гибридный подход полученный соединением схемы «звезды» и 3-ей нормальной формы)
- другие экзотические подходы

Это ключевое в работе Дата инженера - понимание концепций моделирование данных что будут храниться у вас. Без этого, вас строго говоря нельзя называть дата-инженером, даже если вы будете хорошо писать код.

Но не переживайте)

Для вас я приложу несколько БАЗОВЫХ книг что помогут вам лучше понять эти концепции, и с легкостью проходить собесы и курсы в университете. Преподавателей и собеседующих будете валить ВЫ, а не они вас)

Правда эти книги на английском, но вы только выиграете от того что будете читать это на английском.

The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling - Ralph Kimball (очень большой Папа в мире данных) and Margy Ross.

Building the Data Warehouse - W. H. Inmon

Data Warehouse Design: Modern Principles and Methodologies - Matteo Golfarelli and Stefano Rizzi.
🎄7👍41
Первый пост в новом году.
Прошлый год оставил мне и команде в которой я работаю странный и непонятный баг.

Одно из интеграционных приложений некорректно вело себя только на промышленном стенде.

Если говорить коротко, то генерируемые sequence-ом ID почему то стали дублироваться. То есть представьте ситуацию что в вашей таблице users ID 1, 2, 3, 4 - присваивались несколько раз в результате чего происходили падения.

На остальных стендах баг не фиксировался. Ушло довольно много времени и нервов у множества людей.
Уже готовилась встреча с командой отвечающей за сопровождение промышленного стенда (по сути сервер который используется в приложении доступном для конечного пользователя).

Было перепробовано множество версий, даже самых экзотических, вплоть до некорректной работы nextval() в PostgreSQL. (Идея конечно безумная, но иногда до этого доходит в поиске причины проблем)

В результате выяснилось что в одном из пулл реквесте (мердже реквесте в гите) была допущена ошибка суть которой заключалась в том что происходило обращение к неправильному сиквенсу, то есть примерно это

nextval(table_2_generator)

вместо

nextval(table_1_generator)

- этого никто не заметил и чтобы предупредить эту ошибку на стороне разработчиков должна была присутствовать банальная внимательность.
Забавен тот факт что эту ошибку не увидело целых 3 разработчика, которые так или иначе не видели этот код. (Включая меня ха-ха)

На других стендах эта ошибка не была видна потому что тестовые файлы были слишком малы в размере и сиквенсы не успевали каннибализировать друг у друга значения айдишников.

Эта история заставила меня задуматься о том что не стоит придумывать слишком сложные сценарии чтобы найти ошибку - иногда стоит искать проще.
А так же, что при работе с данными нужно разрабатывать более сложные тесты и возможно как то пересмотреть подходы к тестированию, вплоть до использования AI. Об этом уже много сказано и написано статей, однако еще предстоит увидеть применение этим подходам, особенно в больших компаниях.
8🔥4🎄3
Как работает кэширование в Apache Spark и что происходит с кэшированными данными внутри Spark приложения ?

Сегодня я задался именно этим вопросом и хочу с вами поделиться ответом.

Итак, одним из простейших способов оптимизации Спарк приложения является кэширование данных.
К нему стоит прибегать если вы обращаетесь к какой то одинаковой выборке данных более 1 раза.
Строго говоря это вообще никак не влияет на логику вашего запроса и плохому коду не поможет. Однако на некоторых участках расчета может существенно помочь ускориться.

Кэшировать можно так
‘’’
a=sparkDF.cache()
b=sparkDF2.persist()

a.count()
b.count()
‘’’
В первом случае это кэширование в оперативной памяти. Во втором можно писать на диск если передать соответствующий аргумент.
Так же в конце мы материализуем кэш вызывая функцию count().
Это относится к lazy evaluation. Об этом в других постах.

Самое интересное происходит далее.

Представьте себе гипотетическую ситуацию, что вы переназначили переменную с уже материализованным кэшем.
Например так

a=sparkDF3.cache()

Что произойдет с кэшем ?
Логически кажется что какой нибудь garbage collector должен его сам подчистить, тк больше к этой сущности из кода обратиться невозможно, однако нет.

Этого не произойдет.

Фактически у вас теперь засорена память вашего экзекутора, и вы не можете ее использовать в своих расчетах.

По хорошему вы были должны сделать
a.unpersist()
Перед тем как что то заново кэшировать с этим названием переменной.
Однако не все потеряно.

Существует метод clearCache(), что очистит вообще весь ваш кэш даже если вы больше не можете обратиться к старому кэшу. Используется так.

spark.catalog.clearCache()

Однако вы конечно потеряете весь свой кэш, но увы, за все нужно платить.

Кэшируйте с умом и не забывайте делать unpersist().

В следующем посте поговорим о фундаментальной концепции Apache Spark - lazy evaluation.
🔥11
Apache Spark обладает такой функциональной особенностью как lazy evaluation.
Это условное разделение всех возможных операций на действия и трансформации. Перед тем как выполнить какое-либо действие Спарк накапливает ряд трансформаций, оптимизирует и запускает их при вызове действия.

Для преобразований Spark добавляет их в группу DAG вычислений, и только когда драйвер запрашивает некоторые данные (действие), эта группа DAG действительно выполняется.

Одним из преимуществ этого является то, что Spark может принимать множество решений по оптимизации после того, как у него будет возможность просмотреть DAG целиком. Это было бы невозможно, если бы он выполнял все сразу после получения.

Опять же можно провоцировать Спарк на выполнение действий постоянно. Однако это уберет весь смысл Спарка и вы получите нечто вроде стандартного Tez или hadoop map reduce. (Ранее мы спровоцировали Спарк на действие вызвав функцию count())

Или представим ситуацию, вы с энтузиазмом выполняете каждое преобразование, что это значит? Это означает, что вам придется материализовать в памяти огромное количество промежуточных наборов данных. Очевидно, что это неэффективно - во-первых, это увеличит затраты на сбор мусора. (Потому что на самом деле вас не интересуют эти промежуточные результаты как таковые. Это просто удобные абстракции для вас при написании программы.) Итак, вместо этого вы говорите Spark, какой конечный ответ вас интересует, и он находит лучший способ добраться туда.

Ниже приложил основные действия и трансформации в спарке.
6
Список действий и трансформаций
7
Решил временно сместить тематику и рассказать о каких то простых вещах:

Например иногда в шутку на собесах или в неформальной обстановке Python разработчики рассказывают/хвастаются возможность написать калькулятор в одну строчку. (На Java без библиотек так естественно не получится)

Итак, таким образом вы можете создать калькулятор в одну строку (правда для постоянного использования его нужно будет перезапускать, но можно написать еще строчку и все будет работать вечно)

print(eval(input()))
12
На собеседовании могут спросить о способах удаления элементов из Python листа, приведу примеры основных и экзотических способов. Когда у меня спросили я назвал только четыре.

используя ключевое слово del. (Нужно так же знать индекс элемента)

list1 = ["a", "b", "c", "d"]
del list1[1]


Используя метод remove (удалит первый встретившийся в листе элемент)

list2 = ["a", "b", "c", "d"]
list2.remove("b")



Используя метод pop (нужно указать индекс, иначе удалит последний элемент)

list3 = ["a", "b", "c", "d"]
list3.pop(1)



Используя синтаксический сахар - list comprehension:

list4 = ["a", "b", "c", "d"]
list4 = [i for i in list4 if i != "b"]


Используя лямбда-функцию (так же известную как анонимную) в функции filter

list5 = ["a", "b", "c", "d"]
list5 = list(filter(lambda i: i != "b", list5))


Встроенными инструментами работы с листами в Python

list6 = ["a", "b", "c", "d"]
list6[1:2] = []


Важно помнить что все эти способы кроме лямбда функции и list comprehension - модифицируют исходный лист, лямбда функция и list comprehension создают новый лист.

А теперь к совсем экзотическим, но в зависимости от постановки задачи их тоже можно упомянуть.

метод clear - полностью очистит ваш лист, что удалит как нужный элемент так и все остальные.🤡

list7 = ["a", "b", "c", "d"]
list7.clear()


И наверное самый экзотический способ - через преобразование листа в сет и обратно. На типе данных сет можно использовать метод discard что удалит элемент, логики в этом абсолютно никакой нет и вы потеряете все плюсы листа (так сет удалит дубликаты и собьет порядок), так же это более ресурснозатратно, но тем не менее так тоже можно удалить элемент.

first_list = ["a", "b", "c", "d", "c"]
set_list = set(first_list)
set_list.discard("b")
main_list = list(set_list)
print(main_list)
🔥11😁4💯3
Как производить расчет прошлых дат при создании DAG в Apache Airflow ? 🤔

В Apache Airflow существует такое понятие как backfilling - планирование расчета дат в прошлом. Очень распространенный use case для тех кто работает с данными.

Есть несколько способов триггернуть его
1) Через консоль (не всегда доступен тк вам могут и не дать прямого доступа на хост чтобы вы ничего не положили, тем не менее использовать все равно можно, расскажу о двух хитростях в следующих постах)

airflow dags backfill \
—start-date START_DATE \
—end-date END_DATE \
dag_id


2) через настройки дага
☝️ важно помнить - зачастую так же необходимо указать параметр отвечающий за необходимость запуска/не запуска дополнительных потоков

dag = DAG(
your_job_id,
default_args=your_dag_args,
schedule_interval=your_scheduler,
max_active_runs=1,
catchup=True
)


так с помощью параметра catchup (активного по умолчанию, вы можете контролировать этот самый бэкфиллинг)

параметр max_active_runs, как очевидно из названия контролирует число потоков отсчитывающих прошлые дни. Если поставить 1, то будете постепенно отсчитывать по 1 дню, что хорошо, понятно и предсказуемо для вашего кода.
В противном случае, в зависимости и от логики вашей таблицы вы рискуете все испортить)
Например если у вас есть staging таблички какие то - то вы не можете сразу несколько дней вместе рассчитывать, без дополнительной логики.

В общем этот backfilling считается одной из сильнейших сторон Apache airflow. Это спрашивают на собесах и это мощный инструмент, что вам наверняка пригодится если вы используете Airflow.
👍10🔥4💯2
🧠 топ консольных команд что я использую в работе:

Этой командой вы можете добавить к своей спарк сессии джарник с какой либо библиотекой и использовать эту библиотеку в своем спарк контексте

%AddJar file:/home/my_jar.jar


Это команды для очистки технических папочек что не нужны вам и заполняют память кластера. Они очищаются и сами по себе спустя время, однако при особо тяжелом расчете и падении - вы можете в моменте очень сильно засорить память кластера и себе и коллегам. Не беспокойтесь за вами наверняка придут люди сопровождающие кластер и спросят почему в вашей папке с мусором лежит 1 Тб данных

hdfs dfs -rm -r -skipTrash .sparkStaging

hdfs dfs -rm -r -skipTrash .Trash


Просто положить файлик на HDFS чтобы потом считать

hdfs dfs -put file.txt



убить подвисшее приложение, или приложение что ест особо много ресурсов

yarn app -kill application_1532419910561_0001



узнать кто убил ваше приложение

yarn app -status application_1532419910561_0001 | grep killed


в одну команду обновить ваш тикет в керберосе (если просто кинит написать то потом нужно будет отдельно пароль вводить)

echo MyPassword | kinit -p myUser


положить какую либо папочку с вашего локального компьютера на какой либо хост (на том хосте должен лежать ваш ключик)

scp -r local-directory username@remotehost.ru:remote_directory
🔥132💯2👍1
Интересный пост моего коллеги об использовании ИИ в разработке.
Стоит отметить что это очень удобно для разработчиков из России, поскольку от OpenAI на Россию наложена анафема (надеемся что не навсегда) - и таким образом можно получить неплохой инструмент без шарад с использование VPN, левых сим-карт, продавцов из не подсанкционных стран итп.
По моим ощущениям это все же немного слабее заграничных инструментов вроде ChatGPT и GitHub CoPilot, но кто знает что будет дальше.
В нынешней итерации это уже итак неплохой инструмент.
6👍3🦄2😍1
🤭 Не реклама

Если кто-то начал использовать GigaChat (про него я писал здесь), то тут вышло обновление, про которое можно почитать тут. Код, кстати, стал генерить лучше 🫥
Please open Telegram to view this post
VIEW IN TELEGRAM
👍83😁3
⚡️⚡️⚡️Хочу обратить ваше внимание на канал моего знакомого - Евгения, дата инженера из одной крупной российской компании.
Он очень харизматичный и активный парень, записывает интервью с представителями разных специальностей и рангов в мире IT.

Вот его интервью с Middle Java Developer.

https://youtu.be/Kv8r4Y9dpWM?si=NXrvhcfbwD8neArR

Так же он активно посещает различные IT мероприятия в Москве и рассказывает о них.
При всем при этом успевая продуктивно работать и очень доходчиво объяснять сложные темы в своих постах. Я часто узнаю в этих постах что-то новое и ликвидирую пробелы в собственных знаниях.

https://t.me/halltape_data
🔥11
DataSkewer
Сравнение основных оркестраторов задач востребованных на рынке. Если вы скажете что работали с каждым из них - я назову вас лжецом.
🐘 Сейчас в рамках решения рабочих задач пользуюсь одним относительно старым планировщиком рабочих процессов.
Здесь он представлен как Apache Oozie.
Это приложение позволяет создавать достаточно сложные и разветвленные направленные ациклические графы с задачами.
В отличии от airflow вам не нужно знать Python.
Хотя нужно знать очень много чего другого.
По моему мнению у этого инструмента порог вхождения значительно выше чем у Airflow.
Вам нужно самим конфигурировать кучу файлов, вы должны четко понимать что вы делаете, знать компоненты экосистемы, ваш TaskTracker, NameNode, самому собрать Джарник со Спарк приложением, положить его на кластер, все настроить, установить на кластер, поднять сам хадуп кластер если его нет. И в общем это все тянется в кучу очень нетривиальных задач.

В то время как в airflow достаточно сделать pip install и можно уже в целом ехать.

Сам инструмент использует xml разметку. Может запускать из коробки spark приложения, так же есть свой аналог BashOperator в airflow.

Этот мастодонт никуда еще не денется и уметь им пользоваться важно, особенно в больших компаниях что уже вложили кучу денег в разработку и в эти инструменты.

Из минусов - в домашних условиях на слабеньком пк поработать с ним будет очень непросто.

Ну и забавный момент. Изначально это слово Уузи исходит из Мьянмы. Там это тренеры и погонщики слонов. Что забавно и создателями предполагалось как игра слов - ведь ваш хадуп кластер имеет логотип слона. А уузи это погонщик этого слона что заставляет его бегать и решать задачи.
👍9🔥3💯1
😁5👍2🔥1
Здравствуйте подписчики, выдался очень плотный спринт (это как правило двухнедельный цикл за который команда разработки должна выполнить ряд задач и предоставить бизнесу(заказчику) инкремент). Нужно было быстро усваивать кучу знаний в товарных количествах, что в целом хорошо.

В частности в прошлом и в текущем спринте работаю с планировщиком задач о котором говорил в прошлом посте.
Давайте остановимся на нем поподробнее.
Фактически oozie это Java приложение что запускает другие приложения и помогает выстроить сложные и разветвленные зависимости.

Что нужно для запуска?

Если речь идет о минимальнейших условиях, то вам нужно чтобы у вас было два файла

job.properties - файл с описанием переменных вашего хадуп кластера и путем до workflow.xml
oozie.use.system.libpath=True
oozie.wf.applijobcation.path=hdfs://mycluster/oozie-app/myapp/
nameNode=YOURNAMENODE:port
jobTracker=YOURYARN:8032


workflow.xml
Сам файл с описанием потока. Он не ограничивается одним действием, но лично я бы рекомендовал для соблюдения атомарности и облегчения дебаг не делать больше одного действия в рамках одного workflow.xml. (Наследие airflow хаха) Вы можете запускать все что угодно отсюда (вернее все что установлено на ваш кластер): башники, питон скрипты, джарники, Спарк приложения (основной юз кейс), пиг Джобы, хайв скрипты итд

<workflow-app name="Abobus" xmlns="uri:oozie:workflow:0.5">
<start to="do-sh-run"/>
<action name="do-sh-run">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>/bin/bash</exec>
<argument>test.sh</argument>
<file>test.sh#test.sh</file>
<capture-output/>
</shell>
<ok to="End"/>
<error to="Kill"/>
</action>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="End"/>
</workflow-app>


Далее в зависимости от версии oozie вы можете либо пойти в веб интерфейс мониторить свои джобы и читать логи (это можно делать напрямую не заходя в ярн если у вас было Спарк приложение), либо читать через консоль узи.

На бумаге все звучит просто, но фактически, тут множество подводных камней, особенно когда в дело вступают различные ролевые модели. Например у меня на днях сложилась ситуация что мой собственный юзер не мог писать в собственного хомяка (home/username - эта фраза является программистским жаргоном).
Поэтому при работе с такими сложными многокомпонетными приложениями и системами, нужно набраться воздуха, вдохнуть и выдохнуть - а иначе все, или станете невротиком или выгорите.
👏6😭4
💢 недавно столкнулся с неприятной проблемой при работе с S3
Но для начала вкратце расскажу о том что же такое S3.
S3 (Simple Storage Service) три Эс если по-русски - это объектное хранилище (НЕ ФАЙЛОВАЯ СИСТЕМА)
и облачный сервис изначально разработанный Amazon Web Services (AWS).
Но фраза S3 уже стала нарицательной и я заметил что так называют хранилища даже не относящиеся к амазону и не использующие его сервис.
Строго говоря вся проблема этой путаницы в закрытом исходном коде, - умные американцы не хотят раскрывать свой код, что стабильно приносит очень хорошие деньги, но технология классная поэтому появляются клоны вроде Nimbus, Fake S3, HPE Helion Eucalyptus и Scality S3.
С одним из таких клонов я и работал, для простоты и я продолжу называть его S3.
Итак S3 позволяет пользователям загружать и отправлять данные ЛЮБОГО типа по вебу.
S3 очень ценят за его низкую (в рамках большого бизнеса) цену,
его используют для бэкапов, контент платформ, CDNов, сохранений данных high performance приложений, и конечно для Биг Даты.
Так же считается что под капотом Яндекс Диска и Гугл диска некие свои аналоги aws S3.

С какой же именно проблемой я столкнулся и почему она была неприятной:
В ПРОМ таблице перестали отображаться данные что загружались airflow дагом.
быстрое обращение к s3 с нужным префиксом показало что там этих файлов нет
отправитель файлов в s3 показал свои логи согласно которым файлы были точно успешно отправлены

спрашивается где файлы?

в общем я начал дебажить и обнаружил что при прямом обращении к файлам из логов они находятся

и встал вопрос где же произошла проблема.

в эйрфлоу даге все горит зеленым

контроллер отправляющий в s3 так же говорит что все было успешно отправлено

в обшем, после отправления самых разных вариантов запросов к s3 я все же смог обнаружить эти файлы так

import boto3


S3_ENDPOINT = 'fake-data'
S3_BUCKET = 'fake-data'
S3_REGION = 'fake-data'
S3_ACCESS_KEY_ID = 'fake-data'
S3_SECRET_ACCESS_KEY = 'fake-data'
PREFIX_TO_ACCESS = 'mybucket/fake-data/'

sesssion = boto3.Session(S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY)
s3 = sesssion.resource('s3', endpoint_url=S3_ENDPOINT)
bucketName = S3_BUCKET
bucket = s3.Bucket(bucketName)

list_of_lost_files = []

for obj in bucket.objects.filter(Prefix=PREFIX_TO_ACCESS):
list_of_lost_files.append(obj.key)

print(list_of_lost_files)


почему же не исходный способ не показал эти файлы?

import boto3


headers = {'Content-Type': 'application/sql'}

S3_ENDPOINT = 'fake-data'
S3_BUCKET = 'fake-data'
S3_REGION = 'fake-data'
S3_ACCESS_KEY_ID = 'fake-data'
S3_SECRET_ACCESS_KEY = 'fake-data'
PREFIX_TO_ACCESS = 'mybucket/fake-data/'

session = requests.Session()

s3_client = boto3.client(
service_name='s3',
endpoint_url=S3_ENDPOINT,
region_name=S3_REGION,
aws_access_key_id=S3_ACCESS_KEY_ID,
aws_secret_access_key=S3_SECRET_ACCESS_KEY,
)

response = s3_client.list_objects_v2(
Bucket=S3_BUCKET,
Prefix=PREFIX_TO_ACCESS)


for obj in response.get('Contents', []):
print('\n')
print(obj['Key'])
print(obj['LastModified'])


избавлю вас от мучений - все дело в методе list_objects_v2 и в bucket.objects

всегда помогает чтение доки и помогло мне при решении этой проблемы
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_objects_v2.html
чтение доки подсказало что метод list_objects_v2() не может вернуть больше 1000 объектов как не старайся

MaxKeys (integer) – Sets the maximum number of keys returned in the response.
By default, the action returns up to 1,000 key names.
The response might contain fewer keys but will never contain more.

и с моим префиксом накопилось уже больше 1000 объектов
следовательно он возвращал только старые хотя новые уже были готовы и лежали рядом
Ситуацию осложняло то что мы не удаляем данные из s3 а только читаем

то есть удаляй я файлы после чтения - этой ситуации не возникло бы в принципе и я не узнал бы об этой особенности.
👍73😭3🔥1
🦠 Услышал такую интересную фразу от коллеги из другого отдела

«Код вы пишите один раз - а читают его потом много раз, вы в том числе. Поэтому всегда лучше писать его сразу как следует»

Вам может показаться это душными понятиями каких то старых дедушек, но с течением времени я понял насколько это важно.

Вот мой личный топ АНТИ ПАТТЕРНОВ в разработке:

1) В самом начале я имел неосторожность использовать ключевые слова в названии переменных, думаю тут не стоит что то еще говорить - ваша программа будет просто вести себя нестабильно если вы будете использовать ключевые слова как переменные. в моем случае я использовал слово type в Python2

2) Писать огромные функции с большим числом аргументов. Их так же сложно читать и дебажить, в целом есть даже такое правило что в названии вашей функции не должно быть слов вроде and or with итд - то есть нужна строгая модульность: одна функция выполняет одну задачу
так же есть правило не больше пяти аргументов на функцию.

3) Не писать юнит тесты - тут все просто при добавлении нового функционала вам не придется как обезьянке заново все проверять вручную теряя время. Что гипер важно.

4) Не использовать стандарты языка (в Python этот стандарты PEP) они унифицируют то какие отступы нужно делать между импортами, как называть и не называть переменные, как форматировать строки и многое многое другое

5) применять очень большие try/except конструкции - при особых условиях из за них можно просто потерять ошибку в логах, а так же это крайне неудобно дебажить.

6) использовать непонятные переменные вроде i,j,a,b итд. Вам будет неудобно в будущем вникать что же они значат и например по чему же предполагается тут итерироваться.
🔥8👍2💯2👌1🌚1