Aspiring Data Science
372 subscribers
425 photos
11 videos
10 files
1.88K links
Заметки экономиста о программировании, прогнозировании и принятии решений, научном методе познания.
Контакт: @fingoldo

I call myself a data scientist because I know just enough math, economics & programming to be dangerous.
Download Telegram
#polars #deltalake #deltars

По сути, deltalake - это субд на паркетных файлах и поларсе, с версионированием изменений и time travel.
Есть компактификация/речанкинг маленьких файлов, "бесшовная" работа с облаком, даже ADIC транзакции.

https://youtu.be/ZIrq9GsN2HM?si=SPDEsBoqvQVxZnBO
#polars #books

Вот есть явно хорошая книжка, "Effective Polars: Optimized Data Manipulation".

Уже даже пройтись по примерам кода очень полезно, покрывает, наверное, 85% информации из книги.

Можно узнать про такие способности поларс:

>>> def standardize(col):
... return (col - col.mean()) / col.std()

>>> print(autos
... .filter(standardize(pl.col('city08')) > 3)
... .select(['year', 'make', 'model', 'VClass', 'city08'])
... )


More Filtering with Window Expressions
>>> print(autos
... .with_columns(
... model_age=(pl.col('year').max() - pl.col('year').min())
... .over('model'))
... )

>>> print(autos
... .select(pl.all().is_null().mean() * 100)
... )


>>> print(autos
... .with_columns(pl.col('make').cast(pl.String))
... .sort(by=pl.col('make').str.len_chars())
... )


... .filter(~pl.all_horizontal(pl.col('devil', 'snake').is_null()))
... .plot(x='datetime', y=['devil', 'snake'], rot=45, title='Gage Height',
... width=1800, height=600)


Using XGBoost to Predict Mileage
>>> import polars.selectors as cs
>>> X = (autos
... .select(cs.numeric() - cs.matches('(city08|highway08)'))
... )
>>> y = (autos.select(pl.col('city08')))


https://www.amazon.com/Effective-Polars-Optimized-Manipulation-Treading
#polars

Книжка сама не очень, кстати, я читал. Собственно, и доклад не блещет инсайтами, зато приводится профит от перехода на поларс в конкретно взятом проекте. Расходы на расчёты снизились с 120 до 4 тыс баксов.

https://www.youtube.com/watch?v=B2Ljp2Fb-l0
#dask #polars #duckdb

Что-то не верю я этим тестам. поларс медленнее даска? Да когда такое было? И как вдруг чудесным образом у даска заработал оптимизатор, если синтаксис вызовов не поменялся? У поларс то он возможен, потому что синтаксис операций на фрейме совсем другой.

https://www.youtube.com/watch?v=qyvLJ2LvKLc
#polars #pandas

Сравнительно недавно начал по-настоящему изучать поларс, потому что пандас уже задолбал своей неповоротливостью. Хочу поделиться некоторыми замечаниями о фреймворке.

Прежде всего, в глаза бросается жёсткая логика в наименовании методов. Наверное, если бы компьютеру поручили самому разработать фреймворк, к таким названиям как раз и пришли бы кремниевые мозги ИИ. Каждый раз, когда видишь полар-овские group_by, write_csv, кажется, как разрабы поларс со всего размаха тыкают носом страдающих слабой логикой, маразмом, и вообще очень ветреных разрабов пандас.

Какие фишки реально понравились:

Выбор столбцов.

Можно выбирать столбцы регуляркой, выбирать все столбцы одним ключевым словом, элегантно отбрасывать столбцы (в т.ч. опять же регуляркой). Этого реально не хватает в пандас, да и sql.

df.select(pl.col("ticker", "^.*_high$", "^.*_low$"))
df.select(pl.all().exclude("^day_.*$"))


Ещё больше упрощают дело селекторы:

import polars.selectors as cs
df.select(cs.string() | cs.ends_with("_high"))


Комбинации селекторов (в терминах логики множеств) уже менее интуитивны: df.select(cs.contains("_") - cs.string()),но всё равно крайне полезны.

Апдейты Поларс.

Их фигачат каждые чуть ли не 2 недели, причём там реальная работа, помногу. Что ж это будет за зверь через год?
И даже я сам уже запостил несколько issues, которые были приняты во внимание и исправлены (самим Риччи). Внёс свой скромный вклад в дело улучшения DS библ для всех 😅.

Многопоточное исполнение. Полная утилизация CPU. Оптимизация операций над lazy фреймами.

Это просто небо и земля, по сравнению с пандасом. Никогда для работы со сколько-нибудь серъёзным набором данных (от десятка гигов) я больше не буду использовать пандас. Получается, ты платишь за машину с сотней ядер, а с pandas используешь большую часть времени одно. Более того, уже есть поддержка engine="gpu", спасибо кодерам Nvidia (под капотом использует cudf. есть ограничения на виды операций и типы данных, которые можно исполнять на gpu, но потихоньку они снимаются).

Синтаксис операций.

Можно обращаться сразу к множеству столбцов, и элегантно применять ко всем одно преобразование. Можно писать генерики для сложной работы со столбцами, которые станут известны лишь в контексте. По сравнению с пандас, паттерн применения преобразований меняется - надо стараться вынести как можно больше операций в каждый вызов контекста (select, with_columns, group_by, group_by_dynamic), потому что так они параллелятся на все ядра. В операциях не обязательно использовать существующие столбцы, можно и просто выражения над ними! Вроде df.sort(pl.col("name").str.len_bytes(),descending=True).

Rust plugins.

Если какие-то операции недоступны в нативном polars, можно вызвать питоновские функции, черезе map_groups, например, но они не будут параллелиться. Для этих случаев можно написать и скомпилировать свой плагин на Расте. Я пробовал, это сложно ) В синтаксисе этого раста чёрт ногу сломит. И не одну. Но возможность есть!

Фишка которую я пока не пробовал - streaming . Работа с данными больше RAM.
#polars #pandas

Отличия от пандас в концепциях:

Нет индексов (кроме целочисленных), осей, модификаций столбцов датафреймов inplace. Хотя на самом деле при модификации столбца фрейм в памяти не пересобирается, компилируется (быстро) лишь новый мета-объект фрейма со ссылками на те же массивы arrow.

Поларс не поддерживает разные типы данных в одном и том же столбце (кроме null), пандас поддерживает.
Пандас по умолчанию удаляет строки с отсутствующими значениями при агрегации. Поларс сохраняет все группы.

Поларс различает отсутствующие значения (null) и некорректные числа (NaN, что означает "не число"), и это различие сохраняется независимо от типа данных.
В результате этого пандас преобразует столбцы с целыми числами в столбцы с плавающей точкой при появлении отсутствующих значений (хотя этого не происходит с новыми типами данных arrow, в посл версиях). Это связано с тем, что тип Integer не поддерживает отсутствующие значения. Поларс такого преобразования не делает, так как поддерживает отсутствующие значения для всех типов данных (хранит битовую маску nulls для каждого поля).

В пандас можно иметь несколько столбцов с одинаковым именем. В Поларс имена столбцов должны быть уникальными.

Странности поларса:

nan vs null. Не уверен, что это хорошая идея. Но вполне в духе строгостей поларса.

animals_pl.unique(subset="class") вместо animals_pd.drop_duplicates(subset="class")

sort(...,descending=). Зачем было отклоняться от привычного в пандас acscending? too much.

sum_horizontal() и подобные функции (вследствие отсутствия axis).

Из интересного вам глянуть:

df.to_dict vs df.to_dicts
approx_n_unique()
glimpse()
.set_sorted()
stacking vs pl.concat vs extending
1
#polars #pandas #codegems

Что в поларс НЕ понравилось.

Активное переименование методов от версии к версии. Это сводит с ума все AI, которые пробуешь использовать для генерации кода.

Мало примеров в доке к сложным методам типа .over и .group_by, .group_by_dynamic. Реально непонятно, как и на каких принципах оно отработает. В документации расписано слабовато.

Иногда polars начинает жрать всю память на относительно простых операциях, где мог бы быть более экономным. Иногда проскакивают ошибки/панические атаки, зачастую сложновоспроизводимые.

И самое главное. Если у вас большие фреймы и много операций (математических, group_by), поларс отработает быстро, но гарантированно забьёт вам всю оперативку. То же сделает и пандас, но за ним неиспользованную память можно хотя бы (если повезёт) подчистить вызовом ctypes.CDLL("libc.so.6").malloc_trim(0)

Поларс же, похоже, создаёт какие-то маленькие и разбросанные в адресном пространстве арены памяти, которые таким способом не подчищаются. И вообще никаким не подчищаются, только завершением процесса. К примеру, у меня после интенсивных расчётов и джойнов фрейм в 100 гигов, а занято памяти на терабайт. И освободить её на линуксе никак нельзя, я уже и аллокаторы пробовал альтернативные типа jemalloc, tmalloc - без толку. Это я считаю большой проблемой вычислений и big data, и странно, что никого это не колышет особо. По факту сбора мусора в линукс нет, если вы не знали, ребята. По крайней мере после отработки поларса. "Спасибо" странным авторам аллокаторов памяти.

Что забавно, на винде этот вопрос решается!!! )) Используйте

def trim_windows_process_memory(pid: int = None) -> bool:
"""Causes effect similar to malloc_trim on -nix."""

# Define SIZE_T based on the platform (32-bit or 64-bit)
if ctypes.sizeof(ctypes.c_void_p) == 4:
SIZE_T = ctypes.c_uint32
else:
SIZE_T = ctypes.c_uint64

# Get a handle to the current process
if not pid:
pid = ctypes.windll.kernel32.GetCurrentProcess()

# Define argument and return types for SetProcessWorkingSetSizeEx
ctypes.windll.kernel32.SetProcessWorkingSetSizeEx.argtypes = [
ctypes.wintypes.HANDLE, # Process handle
SIZE_T, # Minimum working set size
SIZE_T, # Maximum working set size
ctypes.wintypes.DWORD, # Flags
]
ctypes.windll.kernel32.SetProcessWorkingSetSizeEx.restype = ctypes.wintypes.BOOL

# Define constants for SetProcessWorkingSetSizeEx
QUOTA_LIMITS_HARDWS_MIN_DISABLE = 0x00000002

# Attempt to set the working set size
result = ctypes.windll.kernel32.SetProcessWorkingSetSizeEx(pid, SIZE_T(-1), SIZE_T(-1), QUOTA_LIMITS_HARDWS_MIN_DISABLE)

if result == 0:
# Retrieve the error code
error_code = ctypes.windll.kernel32.GetLastError()
logger.error(f"SetProcessWorkingSetSizeEx failed with error code: {error_code}")
return False
else:
return True


и спите спокойно.

Следующая жёсткая проблема - производительность с dtype=category. Не храните паркетные файлы с этим типом данных, лучше конвертируйте в string (с compression='zstd', конечно). Иначе потом они будут склеиваться ВЕЧНОСТЬ, при любых настройках StringCache.
#polars #pandas

Запостил мини-серию о polars по сравнению с pandas:

Преимущества polars
Отличия от pandas
Недостатки polars

Заключение.

Несмотря на его сыроватость, по умолчанию теперь всегда буду использовать в своих ds-проектах polars. Людям, которые пишут pandas, наплевать на производительность, я это знаю по личному общению с одним таким человеком. Их принцип Кнутовкий, premature optimization is the root of all evil, поэтому, чтобы избежать зла, они не оптимизируют вообще. С таким кредо вам не библиотеки для работы с данными писать, уважаемые. Ну вот история вас и оставляет на обочине.

Всё же, полностью от пандас отказываться неразумно, часто там определённые вещи можно закодить быстрее в силу большей гибкости (axis) или наличия индексов. Но, как правило, я pandas в этом ключе использую лишь для маленьких фреймов. Ещё он понимает больше форматов данных. И у поларса к нему есть быстрый интероп.

Так что используйте оба, но приоритет отдавайте polars. Жизнь слишком коротка, чтобы грузить лишь 1 ядро CPU из 100 )

Из новостей: компания Риччи работает над polars cloud, это будет что-то типа dask/coiled, похоже.
👍2🔥1
#polars #parquet #arrow

Практическое наблюдение: не надо категорийные столбцы с высокой кардинальностью сохранять в паркетный формат именно в виде категориек. Кто-то из цепочки polars-arrow-parquet жёстко лажает, и без причины может раздуть размер файла в сотни раз.

используйте хотя бы

df=df.with_columns(pl.col(pl.Categorical).cast(pl.Utf8))


перед сохранением на диск.
#polars #deltalake #orjson #codegems

Попробовал deltalake в решении по сбору данных. отстой, лучше бы любую СУБД заюзал типа постгре или даже монго. Некоторые выводы из мини-проекта:

1) orjson is x20 faster than json

2) xxhash.xxh128 is x6 faster than hashlib.sha256

3) deltalake package is (at least so far) the toy solution. does not support concurrent writes, I had to use manual locking. with many small updates, requires frequent tables "re-optimizing". i just needed a "primary key" functionality from it - and it's slow, while spending LOTS of CPU. I should have better used any RDBMS, or mongo, instead.

В каком случае deltalake можно использовать: когда записываете данные редко, и с таблицей работает один поток. Либо хочется хостить данные в облачном хранилище типа gcp напрямую в паркете. Еще можно воспользоваться полуручным локом на время операций с дельта таблицей:

import os
import logging
from urllib.parse import urlparse
from filelock import FileLock, Timeout

logger = logging.getLogger(__name__)


def is_local_path(path: str) -> bool:
parsed = urlparse(path)
# If there's no scheme or it's explicitly "file"
if parsed.scheme in ("", "file"):
return not path.startswith(("s3://", "azure://"))

# Special case: Windows drive letter (e.g., "R:\...")
if os.name == "nt" and len(parsed.scheme) == 1 and parsed.scheme.isalpha():
return True

return False


def safe_delta_write(path: str, delta_op_func, *, lock_timeout: int = 120, lock_suffix=".lock"):
"""
Wraps any Delta Lake operation (write_deltalake, merge+execute) with local file locking.

Parameters:
path (str): Delta table path.
delta_op_func (callable): A function that performs the actual Delta operation.
lock_timeout (int): How many seconds to wait for the lock before skipping.
lock_suffix (str): Suffix for the lock filename.

Usage Examples
🔁 For .merge().when_not_matched_insert_all().execute():

def merge_ads_static():
return DeltaTable(ADS_STATIC_PATH).merge(
static_df,
predicate="t.id = s.id",
source_alias="s",
target_alias="t",
writer_properties=DELTALAKE_OPTIONS.get("writer_properties")
).when_not_matched_insert_all().execute()

safe_delta_write(ADS_STATIC_PATH, merge_ads_static)

📝 For write_deltalake() appends:

def write_market_ads():
return write_deltalake(
MARKET_ADS_PATH,
market_df,
mode="append",
partition_by=["date"],
**DELTALAKE_OPTIONS
)

safe_delta_write(MARKET_ADS_PATH, write_market_ads)
"""
if is_local_path(path):
lock_file = os.path.join("/tmp", f"{os.path.basename(path).replace('/', '_')}{lock_suffix}")
lock = FileLock(lock_file)

try:
with lock.acquire(timeout=lock_timeout):
logger.debug(f"Acquired lock for local Delta path: {path}")
return delta_op_func()
except Timeout:
logger.warning(f"Timeout while waiting for lock on {path}. Skipping operation.")
except Exception as e:
logger.exception(f"Delta operation failed on {path}: {e}")
raise (e)
else:
logger.warning(f"Delta operation on non-local path: {path}. Proceeding without lock.")
try:
return delta_op_func()
except Exception as e:
logger.exception(f"Delta operation failed on {path}: {e}")