Python Заметки
2.22K subscribers
62 photos
2 videos
2 files
229 links
Интересные заметки и обучающие материалы по Python

Контакт: @paulwinex

⚠️ Рекламу на канале не делаю!⚠️

Хештеги для поиска:
#tricks
#libs
#pep
#basic
#regex
#qt
#django
#2to3
#source
#offtop
Download Telegram
Стандартная библиотека asyncio это стандарт (начиная с Py3.4) для работы с асинхронным кодом. Но эта библиотека достаточно низкоуровневая, со своими проблемами, устаревшими подходами.
Чтобы исправить это, были созданы разные обертки и альтернативы с реализацией популярных инструментов и паттернов асинхронного программирования. Это такие библиотеки как:

- trio: улучшает корректность выполнения, не оставляя потерянных корутин при ошибках, то есть предлагает Structured Concurrency из коробки.

- curio: упрощение синтаксиса и читаемости кода, больше похоже на работу с потоками.

- anyio: универсальная обертка над asyncio или trio плюс множество вспомогательных инструментов.

anyio используется в FastAPI как основная библиотека для работы с асинхронным кодом и вызовом синхронного кода из асинхронного.


В общем, рекомендую почитать про возможности anyio, возможно вы более не будете использовать чистый asyncio в своих проектах)

Это совсем не значит что дефолтный asyncio плох, он тоже даёт достаточный для работы функционал и продолжает развиваться. Например, в версии 3.11 появились TaskGroup, с похожим на trio функционалом. Так что он тоже актуален, просто придется больше написать кода самостоятельно.

#libs #async
5👍2
Недавно делал быстрый прототип асинхронного приложения в котором требовалось вызывать много синхронного кода. Да, я знаю, что это не лучший дизайн, но нужно было быстрое решение на один процесс и без очередей. Поэтому я выполнял код в потоках.
Выглядело это примерно так:

from fastapi.concurrency import run_in_threadpool

async def execute(data: DataRequest) -> DataResponse:
    try:
        result = await run_in_threadpool(sync_function, data)
        return DataResponse(data=result)
    except Exception as e:
        return DataResponse(
            error=str(e),
            success=False,
        )


В общем работает нормально. Для всех вызовов под капотом используется общий тредпул, всё работает предсказуемо.
Но потребовалось изменить количество запускаемых в пуле потоков (по умолчанию создается 40 воркеров).
Так как дело происходит с FastAPI, делается это через lifespan используя настройки anyio:

import anyio

@asynccontextmanager
async def lifespan(app: FastAPI):
    limiter = anyio.to_thread.current_default_thread_limiter()
    limiter.total_tokens = 100
    yield
    # если вдруг нужно вернуть обратно
    limiter.total_tokens = 40


Зачем менять количество воркеров?
- уменьшить, если оперативки мало (один тред занимает ~8мб)
- увеличить чтобы выдержать нагрузку

Если есть предложения получше при тех же вводных - предлагайте😉

#async
5👍3🔥1
Nuitka 4.0

Библиотека для компиляции python-кода в исполняемые файлы получила мажорный апдейт.

Ключевые изменения:
- ускорение сборки бинарника в 15 раз
- экспериментальная поддержка компилятора Zig
- возможность выбранные функции оставлять как есть, в виде байт кода через декоратор @nuitka_ignore
- бинарники работают до 30% быстрей (CPU-bound задачи)
- теперь можно вместо отдельного скрипта сборки использовать pyproject.toml. Весь конфиг в одном файле!
- улучшен контроль подключения DLL библиотек для Windows
- улучшена совместимость с рядом популярных и не очень пакетами

И в целом выбран путь на повышение производительности бинарной сборки.

Полная информация ➡️ здесь.

#libs
9👍2👏1😱1
В асинхронных приложениях есть один не всегда очевидный момент, который приводит к неявным багам - это общие глобальные объекты. Да, это в целом антипаттерн, но иногда такие объекты действительно нужны и вполне уместны. Например, когда вы завязаны на уже существующем коде и не можете его поменять, но переменную подставлять надо, при этом явно пробросить её не получится. Для этого используется ContexctVar.
Случаи бывают разные но я приведу самый понятный пример - логирование.
Допустим, мы не можем передать id юзера куда-то внутрь фреймворка, но можем использовать его в своем хендлере подставляя как переменную. В примере будет без хендлера но суть та же.
import asyncio
from contextvars import ContextVar
import random

# это переменная, которая будет разной для каждой корутины
user_id_ctx = ContextVar("user_id", default=-1)

async def handle_request(user_name, user_id):
# устанавливаем значение для текущей корутины и её дочерних корутин
user_id_ctx.set(user_id)
print(f"Create {user_name} == {user_id}")
await asyncio.sleep(random.random())
# id не передаётся в вызов
await process_order(user_name)

async def process_order(user_name):
# получаем значение из локального контекста
current_user_id = user_id_ctx.get()
print(f"Log {user_name}: {current_user_id}")

async def main():
await asyncio.gather(
*[handle_request(f"user {i}", i) for i in range(10)],
)

if __name__ == "__main__":
asyncio.run(main())

Во всех выводах id должен совпадать.

- Точно так же можно подставлять атрибуты у инстансов синглтонов.
- Контекст наследуется дочерними корутинами.
- Не стоит увлекаться этим способом, он прилично усложняет логику. Явное лучше неявного.

@asyncio @tricks
👍52
Теперь аналогичная история с тредами. Для тредов используется объект threading.local.
Он позволяет создать локальный динамический атрибут (да, вот так костыльно) для треда.

Вот базовый пример:

import threading
import time
import random

# глобальная переменная
thread_data = threading.local()

def execute():
# поулчаем локальное значение для текущего треда
current_user_id = getattr(thread_data, "user_id", -1)
print(f"Log {threading.current_thread().name}: {current_user_id}")

def thread_task(user_id):
# устанавливаем значение для текущего треда
time.sleep(random.random())
thread_data.user_id = user_id
print(f"Create {threading.current_thread().name} == {user_id}")
execute()

threads = [
threading.Thread(
target=thread_task,
args=(i,),
name=f"Thread-{i}")
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()

Вывод должен быть аналогичным, с соотетстивем номера треда и id юзера.

Есть еще один пример здесь


#tricks
2👍1
Мы рассмотрели два способа управления конеткстом переменных. Если вам показалось, что это выглядит излишне и можно было бы оставить один, то вам не показалось.
Способ с threading.local придуман для разделения переменных между потоками. CоntextVar был добавлен как новый метод для асинхронного кода, но оказался настолько универсальным, что его можно использовать и с потоками.
После появления ContextVar в PEP567 его рекомендовано использовать вместо threading.local.
И даже был сделан бекпорт для версия ниже 3.7.1.

Теперь, если совместить ContextVar и Proxy-класс из прошлого примера то получим такой класс↗️.

Но у этого класса есть две проблемы:

1️⃣ Нигде не вызывается reset для сброса переменной, что может приводить проблемам

- утечка памяти
- "грязный" конеткст при переиспользовании потоков
- невозможность вернуться к дефолту

Решим это с помощью конектстного менеджера:
@contextlib.contextmanager
def configure_context(self, *args, **kwargs):
"""Синхронный контекстный менеджер (для `with`)"""
tok_cfg = self._cv_config.set((args, kwargs))
tok_obj = self._cv_object.set(None)
try:
yield self
finally:
self._cv_object.reset(tok_obj)
self._cv_config.reset(tok_cfg)

@contextlib.asynccontextmanager
async def aconfigure_context(self, *args, **kwargs):
"""Асинхронный контекстный менеджер (для `async with`)"""
tok_cfg = self._cv_config.set((args, kwargs))
tok_obj = self._cv_object.set(None)
try:
yield self
finally:
self._cv_object.reset(tok_obj)
self._cv_config.reset(tok_cfg)


Пример использования:
with proxy.configure_context(val1, val2):
proxy.do_something()


Теперь прокси готов, но...

2️⃣ В асинхронном коде, для которого и придуманы ContextVar, созданием корутин занимается Event Loop, именно он отвечает за наследование контекста дочерними корутинами. В случае с потоками ничего такого нет, мы сами себе "эвентлуп", поэтому приходится прописывать копирование конеткста самстоятельно.

Пример проблемы с отсутствием наследованием конеткста в потоках↗️

Для решения есть функция копирования текущего контекста и метод запуска функции с новым конектстом:
сontextvars.copy_context().run(func, *args, **kwargs)


Здесь сложно придумать универсальное автоматическое копирование контекста, самая простая функция будет выглядеть так:
def run_in_thread_with_context(
func: Callable, *args, **kwargs
) -> threading.Thread:
ctx = contextvars.copy_context()
t = threading.Thread(
target=lambda: ctx.run(func, *args, **kwargs)
)
t.start()
return t


И если вернуться к нашему синхронному ApiClient, то придётся следить за конектстом самостоятельно. И если где-то в коде библиотеки уже есть вызов тредов, то это работать не будет, придется переписывать.

threading.local тоже не наследует конеткст.


Полный пример Proxy с CоntextVar↗️

Пример использования:
client = ContextVarProxy(ApiClient)

def worker_in_thread(token):
with client.configure_context(token=token):
use_client(...)


Еще вариант, это кастомные ThreadExecutor и Thread с поддержкой автокопирования контекста. Забираем здесь↗️

И нет, это не пример как надо делать в проде) Это просто эксперемент для понимания процесса.

#tricks
3👍1
Как-то давно писал трансфер файлов по сети.
В этом проекте требовалось создавать файл, который сразу существует на диске, имеет нужный размер но еще не содержит данных.
Вот примеры как создать такой файл:

length = 1024 * 1024 * 1024 * 100
with open(file_path, "wb") as out:
out.seek(length-1)
out.write(b"\0")

with open(file_path, "wb") as out:
out.truncate(1024 * 1024 * 1024 * 120)

truncate -s 100M test


Файл создается моментально и получается полностью состоящий из нулей. Более того, он не занимает место над диске!
Это называется sparse files - разреженные файлы. На таких файловых системах как ext4, XFS, Btrfs, ZFS файл автоматически становится разреженным если процесс пишет за пределы конца файла. В структуре файла создаются "дырки" которые автоматически при чтении вернут нули.

Если запустить тоже самое на Windows, то результат будет другой. Файл будет создаваться долго и реально займет место на диске.

NTFS умеет создавать разреженные файлы, но это надо активировать явно:

import os
import msvcrt
import ctypes

file_path = r"C:\file"
length = 1024 * 1024 * 1024 * 100 # 100 GB

with open(file_path, "wb") as f:
handle = msvcrt.get_osfhandle(f.fileno())
FSCTL_SET_SPARSE = 0x900C4
bytes_returned = ctypes.c_ulong()
ctypes.windll.kernel32.DeviceIoControl(
handle, FSCTL_SET_SPARSE, None, 0, None, 0,
ctypes.byref(bytes_returned), None
)
f.seek(length-1)
f.write(b"\0")


Таким образом мы делаем преалокацию файла с возможностью писать в любое место, например так работают торренты.
В моем случае было многопоточное скачивание разных кусков файлов с возможностью докачки.

При копировании таких файлов чаще всего копия занимает всё положенное ей место.
Чтобы учитывать такое свойство файла нужно использовать специальные опции

shutil.copyfile(src, dst, follow_symlinks=False)

rsync -S ...

robocopy /SPARSE ...


Для тестирования трансфера требовалось создавать реальные файлы с рандомными данными. Сделать это просто:

import os
with open(file_path, "wb") as out:
for _ in range(1024):
out.write(os.urandom(1024*1024*10))


dd if=/dev/urandom of=file.bin bs=1M count=10


Тут, конечно, никаких разреженных файлов быть не может.

#tricks
🔥83👍2
В Python 3.6 был полностью переработан стандартный dict. Вместо разреженной таблицы данные стали храниться в плотных массивах. Это дало буст к скорости и экономию памяти. И, как сайд эффект, ключи стали упорядочены. В каком порядке добавляешь ключи, в таком можно и забрать.

Но при этом OrderedDict никуда не делся. Это сделано для совместимости?

Нет, между dict и OrderedDict всё ещё большая разница.

▫️ При сравнении в обычном dict проверяется только наличие ключа, а в OrderedDict проверяется их порядок
▫️OrderedDict основан на связном списке и имеет метод move_to_end() для изменения порядка элементов.
А метод popitem() позволяет удалять элемент как с конца так и из начала.
▫️OrderedDict это кастомный класс. Он не так оптимизирован как обычный dict. Работает дольше, места занимает больше.

В версии 3.7 он был переписан на С и стал быстрей, но всё еще уступает обычному dict.

Немного тестов:

Память: в 2.5-3 раза больше
Создание: в ~2 раза дольше
Удаление: в ~3 раза быстрей (popitem против del)
Поиск по ключу: примерно одинаково (хеш таблицы)

Код тестов↗️

Если вы используете OrderedDict, то это предполагает, что порядок ключей важен для логики программы.


#libs
5👍5