DataSkewer
210 subscribers
52 photos
1 video
3 files
10 links
Канал с рассуждениями и заметками о работа DE.
Download Telegram
Сегодня у меня прошел Deep Dive по Apache Airflow.
Рассказал об основных преимуществах этого инструмента, его основных юз-кейсах и показал несколько интересных конвееров данных.

Видео
https://drive.google.com/file/d/1KT1fXfdMKl8SD3WqbqEC6JyD61k-ulnq/view?usp=sharing

Презентация
https://docs.google.com/presentation/d/1oMTjc_Gjq6IIgNFzRzy2eoAF-jrWIRTf/edit?usp=sharing&ouid=114931412290786365808&rtpof=true&sd=true
🔥9👨‍💻2🎄1
Задачка пока что откладывается.
Расскажу о типах данных в контексте БД.

Иногда у вас на собесе могут спросить, а зачем вообще нужны типы данных? Ну или может вы сами задумаетесь об этом в процессе решения задачи.
Не легче было бы хранить любую информацию в одном универсальном типе? На такую роль вполне могли бы претендовать символьные строки (на жаргоне стринги или String).
Существует множество причин, почему развитие технологий пошло совсем другим путем и у нас появились long, double, bigint, text и другие.

Отчасти это исторические причины. В глубокой древности. когда еще не родился даже я, (в восьмедесятых годах двадцатого столетия) появились на свет первые реляционные базы данных, пространство оперативной памяти и жесткого диска ценилось очень и очень дорого, и приоритет отдавался технологиям, которые позволяли использовать его с максимальной эффективностью.
Существовавшие на то время языки программирования уже имели ряд встроенных правил хранения информации различного типа. К примеру, все буквы (английского или другого языка), а также специальные символы и цифры представлялись с помошью кодов таблицы ASCII, что в свою очередь позволяло отводить на один символ один байт.

Для хранения чисел от - 32768 до +32767 необходимо уже два байта (так как эти пределы представляют собой число 2 в степени 16).
Если бы для представления чисел использовались символы ASCII (т.е. хранить число в виде строки), для хранения чисел, превышавших 9999, потребовалось бы 6 байт (5 байт для цифр и один - для знака).
А теперь представьте себе, что в одном столбце могут находиться миллионы числовых значений, и экономия 4 байтов на каждом из них может вылиться в мегабайты освобожденного пространства. Сегодня эти цифры вряд ли вас впечатлят, но в 1970-х годах даже один мегабайт представлялся невероятно большим пространством. Принцип эффективности еще не утратил своего значения, но сегодня масштабы понятия пространства выросли на несколько порядков.
И тем не менее вопрос эффективного использования дискового пространства и вычислительных ресурсов никуда не делся. Просто сейчас это мультимедия и Большие Данные. Вполне нормальным юз кейсом считается прихранивание видео в базе данных. (Раньше это могли бы себя позвонить лишь очень богатые компании и правительство)

Вторая причина уже более сложная - это логическая целостность информации. Каждый тип данных имеет собственные правила, порядок сортировки, отношения с другими типами данных и др. Гораздо легче работать с множеством однотипных значений (таких как даты), чем с их смесью (когда у вас все в виде строк). В книгах это сравнивают с библиотекой, где литература разного жанра хранится в отдельных комнатах (фантастика - в одной, детективы - в другой, детские книги - в третьей и т.д.). Считается что, условное хранилище, где в одном помещении была бы беспорядочная свалка всей литературы, компакт-дисков и видеокассет было бы крайне неудобным и сложным для понимания.
🔥7👍1
🫡7👍6
Итак обещанная задача из собеседования на позицию Middle Data Engineer.

Чтобы ее решить необходимо понимать структуры данных, что не реализованы стандартно в ваших любимых языках программирования. Т.е. связанный список необходимо будет создать самому.

Так же я решил двумя способами но мне предъявили за расход памяти в первом решении через рекурсию, что в целом очень обоснованно для того кто собирается работать с большими данными.

Итак сама задача:

Задача на Python3 (можно на Scala/Java). (В Мире Дата инженерии очень и очень активно используется язык Scala придерживающийся философии функционального программирования и созданный на основе языка Java (буквуально использует JVM))

На вход подаётся массив l со связанными списками. Каждый связанный список отсортирован в порядке увеличения.
Задача соединить все связанные списки в один отсортированный связанный список и вернуть его.

Пример:

Вход: l = [[2,3,7],[2,32,44],[21,66]]
Выход: [2,2,3,7,21,32,44,66]

Ограничения:

l.length не выше 104 (может быть 0)
l[i].length не выше 500 (может быть 0)
значение чисел в связанном списке не больше (<=) 104 и не меньше (>=) -104
значения чисел отсортированы в порядке увеличения

Хочу заранее заметить, что задача не решается в лоб и вас не просят отсортировать обычный массив. Загуглите что такое связанный список если не знаете.
🤔6🎄2
По поводу связанных списков в прошлом посте.
Хотел бы на них более подробно остановиться и объяснить суть этой структуры данных, на примере новогодних игрушек.
Представьте, что у вас по комнате разбросана куча игрушек. Вы хотите отслеживать их в определенном порядке, но у вас нет коробки для игрушек, чтобы хранить их все вместе. Вместо этого вы решаете создать воображаемую линию игрушек, в которой каждая игрушка связана со следующей.

Связанный список это по сути линия из игрушек. Каждая игрушка называется «узлом» и состоит из двух частей: одна часть содержит саму игрушку (назовем ее данными), а другая часть указывает на следующую игрушку в строке (назовем ее следующей). указатель).

Итак, вы начинаете с того, что выбираете игрушку и кладете ее в начало линии. Эта игрушка - первый узел в связанном списке. Затем вы выбираете другую игрушку и соединяете ее с первой игрушкой, используя указатель «следующий». Продолжайте делать это до тех пор, пока не соедините все игрушки в линию.

Чтобы найти конкретную игрушку в связанном списке, вы начинаете с первой игрушки и следуете по указателям следующий, пока не дойдете до нужной. Каждая игрушка знает только об игрушке, которая идет после нее, поэтому вам придется проходить по очереди по одной игрушке за раз.

Если вы хотите добавить новую игрушку, вы просто соедините ее с последней игрушкой в связанном списке, используя указатель следующий. Если вы хотите удалить игрушку, вы обновляете указатель «следующий» предыдущей игрушки, чтобы пропустить игрушку, которую хотите удалить.

Связанные списки как структура данных полезна в высоконагруженных системах, тк очень эффективно используют память, позволяют легко добавлять или удалять элементы, так же их используют во всяких garbage collector-ax, чтобы понимать когда от какого либо объекта можно избавляться. Так же в более экзотических случаях их используют для реализации еще более сложных структур данных вроде деревьев или графов.
Но есть и минусы:
Поиск конкретной 'игрушки' в связанном списке занимает больше времени, чем если бы все игрушки находились в условном "ящике" для игрушек, где вы можете быстро найти их (имелся в виду обычный словарь/dictionary или массив).

так связаный список можно реализовать на языке Scala

//  Обьявляем класс с узлом
class Node(value: Int) {
var data: Int = value
var next: Option[Node] = None
}

// Создаем класс самого связанного списка
class LinkedList {
var head: Option[Node] = None

// Метод добавления нового узла в список
def addNode(value: Int): Unit = {
val newNode = new Node(value)
if (head.isEmpty) {
head = Some(newNode)
} else {
var current = head
while (current.get.next.isDefined) {
current = current.get.next
}
current.get.next = Some(newNode)
}
}

// метод для вывода в консоль нового элемента связанного списка
def printList(): Unit = {
var current = head
while (current.isDefined) {
print(current.get.data + " ")
current = current.get.next
}
println()
}
}

// как использовать
val myList = new LinkedList()
myList.addNode(1)
myList.addNode(2)
myList.addNode(3)
myList.printList()
👍9
🤯5
С наступающими новогодними праздниками, дорогие подписчики, пусть Новый Год пройдет у вас лучше чем прошлый, и в новом году вы станете умнее и лучше (желаю себе того же ха-ха).

В новогоднем посте хотел бы затронуть такую фундаментальную тему как построение и моделирование Хранилищ Данных (Data WareHouse), это ключевой элемент в работе подавляющего большинства Дата-инженеров. Вся работа инженеров данных так или иначе связана с DWH.

Существует множество методологий и подходов к созданию DWH
- Corporate Information Factory (CIF) (Корпоративная Аналитическая Платформа)
- Entity-Relationship Modeling (Отношение сущностей)
- Dimensional Modeling (Многомерное моделирование)
- Data Vault Modeling (Гибридный подход полученный соединением схемы «звезды» и 3-ей нормальной формы)
- другие экзотические подходы

Это ключевое в работе Дата инженера - понимание концепций моделирование данных что будут храниться у вас. Без этого, вас строго говоря нельзя называть дата-инженером, даже если вы будете хорошо писать код.

Но не переживайте)

Для вас я приложу несколько БАЗОВЫХ книг что помогут вам лучше понять эти концепции, и с легкостью проходить собесы и курсы в университете. Преподавателей и собеседующих будете валить ВЫ, а не они вас)

Правда эти книги на английском, но вы только выиграете от того что будете читать это на английском.

The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling - Ralph Kimball (очень большой Папа в мире данных) and Margy Ross.

Building the Data Warehouse - W. H. Inmon

Data Warehouse Design: Modern Principles and Methodologies - Matteo Golfarelli and Stefano Rizzi.
🎄7👍41
Первый пост в новом году.
Прошлый год оставил мне и команде в которой я работаю странный и непонятный баг.

Одно из интеграционных приложений некорректно вело себя только на промышленном стенде.

Если говорить коротко, то генерируемые sequence-ом ID почему то стали дублироваться. То есть представьте ситуацию что в вашей таблице users ID 1, 2, 3, 4 - присваивались несколько раз в результате чего происходили падения.

На остальных стендах баг не фиксировался. Ушло довольно много времени и нервов у множества людей.
Уже готовилась встреча с командой отвечающей за сопровождение промышленного стенда (по сути сервер который используется в приложении доступном для конечного пользователя).

Было перепробовано множество версий, даже самых экзотических, вплоть до некорректной работы nextval() в PostgreSQL. (Идея конечно безумная, но иногда до этого доходит в поиске причины проблем)

В результате выяснилось что в одном из пулл реквесте (мердже реквесте в гите) была допущена ошибка суть которой заключалась в том что происходило обращение к неправильному сиквенсу, то есть примерно это

nextval(table_2_generator)

вместо

nextval(table_1_generator)

- этого никто не заметил и чтобы предупредить эту ошибку на стороне разработчиков должна была присутствовать банальная внимательность.
Забавен тот факт что эту ошибку не увидело целых 3 разработчика, которые так или иначе не видели этот код. (Включая меня ха-ха)

На других стендах эта ошибка не была видна потому что тестовые файлы были слишком малы в размере и сиквенсы не успевали каннибализировать друг у друга значения айдишников.

Эта история заставила меня задуматься о том что не стоит придумывать слишком сложные сценарии чтобы найти ошибку - иногда стоит искать проще.
А так же, что при работе с данными нужно разрабатывать более сложные тесты и возможно как то пересмотреть подходы к тестированию, вплоть до использования AI. Об этом уже много сказано и написано статей, однако еще предстоит увидеть применение этим подходам, особенно в больших компаниях.
8🔥4🎄3
Как работает кэширование в Apache Spark и что происходит с кэшированными данными внутри Spark приложения ?

Сегодня я задался именно этим вопросом и хочу с вами поделиться ответом.

Итак, одним из простейших способов оптимизации Спарк приложения является кэширование данных.
К нему стоит прибегать если вы обращаетесь к какой то одинаковой выборке данных более 1 раза.
Строго говоря это вообще никак не влияет на логику вашего запроса и плохому коду не поможет. Однако на некоторых участках расчета может существенно помочь ускориться.

Кэшировать можно так
‘’’
a=sparkDF.cache()
b=sparkDF2.persist()

a.count()
b.count()
‘’’
В первом случае это кэширование в оперативной памяти. Во втором можно писать на диск если передать соответствующий аргумент.
Так же в конце мы материализуем кэш вызывая функцию count().
Это относится к lazy evaluation. Об этом в других постах.

Самое интересное происходит далее.

Представьте себе гипотетическую ситуацию, что вы переназначили переменную с уже материализованным кэшем.
Например так

a=sparkDF3.cache()

Что произойдет с кэшем ?
Логически кажется что какой нибудь garbage collector должен его сам подчистить, тк больше к этой сущности из кода обратиться невозможно, однако нет.

Этого не произойдет.

Фактически у вас теперь засорена память вашего экзекутора, и вы не можете ее использовать в своих расчетах.

По хорошему вы были должны сделать
a.unpersist()
Перед тем как что то заново кэшировать с этим названием переменной.
Однако не все потеряно.

Существует метод clearCache(), что очистит вообще весь ваш кэш даже если вы больше не можете обратиться к старому кэшу. Используется так.

spark.catalog.clearCache()

Однако вы конечно потеряете весь свой кэш, но увы, за все нужно платить.

Кэшируйте с умом и не забывайте делать unpersist().

В следующем посте поговорим о фундаментальной концепции Apache Spark - lazy evaluation.
🔥11
Apache Spark обладает такой функциональной особенностью как lazy evaluation.
Это условное разделение всех возможных операций на действия и трансформации. Перед тем как выполнить какое-либо действие Спарк накапливает ряд трансформаций, оптимизирует и запускает их при вызове действия.

Для преобразований Spark добавляет их в группу DAG вычислений, и только когда драйвер запрашивает некоторые данные (действие), эта группа DAG действительно выполняется.

Одним из преимуществ этого является то, что Spark может принимать множество решений по оптимизации после того, как у него будет возможность просмотреть DAG целиком. Это было бы невозможно, если бы он выполнял все сразу после получения.

Опять же можно провоцировать Спарк на выполнение действий постоянно. Однако это уберет весь смысл Спарка и вы получите нечто вроде стандартного Tez или hadoop map reduce. (Ранее мы спровоцировали Спарк на действие вызвав функцию count())

Или представим ситуацию, вы с энтузиазмом выполняете каждое преобразование, что это значит? Это означает, что вам придется материализовать в памяти огромное количество промежуточных наборов данных. Очевидно, что это неэффективно - во-первых, это увеличит затраты на сбор мусора. (Потому что на самом деле вас не интересуют эти промежуточные результаты как таковые. Это просто удобные абстракции для вас при написании программы.) Итак, вместо этого вы говорите Spark, какой конечный ответ вас интересует, и он находит лучший способ добраться туда.

Ниже приложил основные действия и трансформации в спарке.
6
Список действий и трансформаций
7
Решил временно сместить тематику и рассказать о каких то простых вещах:

Например иногда в шутку на собесах или в неформальной обстановке Python разработчики рассказывают/хвастаются возможность написать калькулятор в одну строчку. (На Java без библиотек так естественно не получится)

Итак, таким образом вы можете создать калькулятор в одну строку (правда для постоянного использования его нужно будет перезапускать, но можно написать еще строчку и все будет работать вечно)

print(eval(input()))
12
На собеседовании могут спросить о способах удаления элементов из Python листа, приведу примеры основных и экзотических способов. Когда у меня спросили я назвал только четыре.

используя ключевое слово del. (Нужно так же знать индекс элемента)

list1 = ["a", "b", "c", "d"]
del list1[1]


Используя метод remove (удалит первый встретившийся в листе элемент)

list2 = ["a", "b", "c", "d"]
list2.remove("b")



Используя метод pop (нужно указать индекс, иначе удалит последний элемент)

list3 = ["a", "b", "c", "d"]
list3.pop(1)



Используя синтаксический сахар - list comprehension:

list4 = ["a", "b", "c", "d"]
list4 = [i for i in list4 if i != "b"]


Используя лямбда-функцию (так же известную как анонимную) в функции filter

list5 = ["a", "b", "c", "d"]
list5 = list(filter(lambda i: i != "b", list5))


Встроенными инструментами работы с листами в Python

list6 = ["a", "b", "c", "d"]
list6[1:2] = []


Важно помнить что все эти способы кроме лямбда функции и list comprehension - модифицируют исходный лист, лямбда функция и list comprehension создают новый лист.

А теперь к совсем экзотическим, но в зависимости от постановки задачи их тоже можно упомянуть.

метод clear - полностью очистит ваш лист, что удалит как нужный элемент так и все остальные.🤡

list7 = ["a", "b", "c", "d"]
list7.clear()


И наверное самый экзотический способ - через преобразование листа в сет и обратно. На типе данных сет можно использовать метод discard что удалит элемент, логики в этом абсолютно никакой нет и вы потеряете все плюсы листа (так сет удалит дубликаты и собьет порядок), так же это более ресурснозатратно, но тем не менее так тоже можно удалить элемент.

first_list = ["a", "b", "c", "d", "c"]
set_list = set(first_list)
set_list.discard("b")
main_list = list(set_list)
print(main_list)
🔥11😁4💯3
Как производить расчет прошлых дат при создании DAG в Apache Airflow ? 🤔

В Apache Airflow существует такое понятие как backfilling - планирование расчета дат в прошлом. Очень распространенный use case для тех кто работает с данными.

Есть несколько способов триггернуть его
1) Через консоль (не всегда доступен тк вам могут и не дать прямого доступа на хост чтобы вы ничего не положили, тем не менее использовать все равно можно, расскажу о двух хитростях в следующих постах)

airflow dags backfill \
—start-date START_DATE \
—end-date END_DATE \
dag_id


2) через настройки дага
☝️ важно помнить - зачастую так же необходимо указать параметр отвечающий за необходимость запуска/не запуска дополнительных потоков

dag = DAG(
your_job_id,
default_args=your_dag_args,
schedule_interval=your_scheduler,
max_active_runs=1,
catchup=True
)


так с помощью параметра catchup (активного по умолчанию, вы можете контролировать этот самый бэкфиллинг)

параметр max_active_runs, как очевидно из названия контролирует число потоков отсчитывающих прошлые дни. Если поставить 1, то будете постепенно отсчитывать по 1 дню, что хорошо, понятно и предсказуемо для вашего кода.
В противном случае, в зависимости и от логики вашей таблицы вы рискуете все испортить)
Например если у вас есть staging таблички какие то - то вы не можете сразу несколько дней вместе рассчитывать, без дополнительной логики.

В общем этот backfilling считается одной из сильнейших сторон Apache airflow. Это спрашивают на собесах и это мощный инструмент, что вам наверняка пригодится если вы используете Airflow.
👍10🔥4💯2
🧠 топ консольных команд что я использую в работе:

Этой командой вы можете добавить к своей спарк сессии джарник с какой либо библиотекой и использовать эту библиотеку в своем спарк контексте

%AddJar file:/home/my_jar.jar


Это команды для очистки технических папочек что не нужны вам и заполняют память кластера. Они очищаются и сами по себе спустя время, однако при особо тяжелом расчете и падении - вы можете в моменте очень сильно засорить память кластера и себе и коллегам. Не беспокойтесь за вами наверняка придут люди сопровождающие кластер и спросят почему в вашей папке с мусором лежит 1 Тб данных

hdfs dfs -rm -r -skipTrash .sparkStaging

hdfs dfs -rm -r -skipTrash .Trash


Просто положить файлик на HDFS чтобы потом считать

hdfs dfs -put file.txt



убить подвисшее приложение, или приложение что ест особо много ресурсов

yarn app -kill application_1532419910561_0001



узнать кто убил ваше приложение

yarn app -status application_1532419910561_0001 | grep killed


в одну команду обновить ваш тикет в керберосе (если просто кинит написать то потом нужно будет отдельно пароль вводить)

echo MyPassword | kinit -p myUser


положить какую либо папочку с вашего локального компьютера на какой либо хост (на том хосте должен лежать ваш ключик)

scp -r local-directory username@remotehost.ru:remote_directory
🔥132💯2👍1
Интересный пост моего коллеги об использовании ИИ в разработке.
Стоит отметить что это очень удобно для разработчиков из России, поскольку от OpenAI на Россию наложена анафема (надеемся что не навсегда) - и таким образом можно получить неплохой инструмент без шарад с использование VPN, левых сим-карт, продавцов из не подсанкционных стран итп.
По моим ощущениям это все же немного слабее заграничных инструментов вроде ChatGPT и GitHub CoPilot, но кто знает что будет дальше.
В нынешней итерации это уже итак неплохой инструмент.
6👍3🦄2😍1
🤭 Не реклама

Если кто-то начал использовать GigaChat (про него я писал здесь), то тут вышло обновление, про которое можно почитать тут. Код, кстати, стал генерить лучше 🫥
Please open Telegram to view this post
VIEW IN TELEGRAM
👍83😁3
⚡️⚡️⚡️Хочу обратить ваше внимание на канал моего знакомого - Евгения, дата инженера из одной крупной российской компании.
Он очень харизматичный и активный парень, записывает интервью с представителями разных специальностей и рангов в мире IT.

Вот его интервью с Middle Java Developer.

https://youtu.be/Kv8r4Y9dpWM?si=NXrvhcfbwD8neArR

Так же он активно посещает различные IT мероприятия в Москве и рассказывает о них.
При всем при этом успевая продуктивно работать и очень доходчиво объяснять сложные темы в своих постах. Я часто узнаю в этих постах что-то новое и ликвидирую пробелы в собственных знаниях.

https://t.me/halltape_data
🔥11