Стандартная библиотека asyncio это стандарт (начиная с Py3.4) для работы с асинхронным кодом. Но эта библиотека достаточно низкоуровневая, со своими проблемами, устаревшими подходами.
Чтобы исправить это, были созданы разные обертки и альтернативы с реализацией популярных инструментов и паттернов асинхронного программирования. Это такие библиотеки как:
- trio: улучшает корректность выполнения, не оставляя потерянных корутин при ошибках, то есть предлагает Structured Concurrency из коробки.
- curio: упрощение синтаксиса и читаемости кода, больше похоже на работу с потоками.
- anyio: универсальная обертка над asyncio или trio плюс множество вспомогательных инструментов.
В общем, рекомендую почитать про возможности anyio, возможно вы более не будете использовать чистый asyncio в своих проектах)
Это совсем не значит что дефолтный asyncio плох, он тоже даёт достаточный для работы функционал и продолжает развиваться. Например, в версии 3.11 появились TaskGroup, с похожим на trio функционалом. Так что он тоже актуален, просто придется больше написать кода самостоятельно.
#libs #async
Чтобы исправить это, были созданы разные обертки и альтернативы с реализацией популярных инструментов и паттернов асинхронного программирования. Это такие библиотеки как:
- trio: улучшает корректность выполнения, не оставляя потерянных корутин при ошибках, то есть предлагает Structured Concurrency из коробки.
- curio: упрощение синтаксиса и читаемости кода, больше похоже на работу с потоками.
- anyio: универсальная обертка над asyncio или trio плюс множество вспомогательных инструментов.
anyio используется в FastAPI как основная библиотека для работы с асинхронным кодом и вызовом синхронного кода из асинхронного.
В общем, рекомендую почитать про возможности anyio, возможно вы более не будете использовать чистый asyncio в своих проектах)
Это совсем не значит что дефолтный asyncio плох, он тоже даёт достаточный для работы функционал и продолжает развиваться. Например, в версии 3.11 появились TaskGroup, с похожим на trio функционалом. Так что он тоже актуален, просто придется больше написать кода самостоятельно.
#libs #async
❤5👍2
Недавно делал быстрый прототип асинхронного приложения в котором требовалось вызывать много синхронного кода. Да, я знаю, что это не лучший дизайн, но нужно было быстрое решение на один процесс и без очередей. Поэтому я выполнял код в потоках.
Выглядело это примерно так:
В общем работает нормально. Для всех вызовов под капотом используется общий тредпул, всё работает предсказуемо.
Но потребовалось изменить количество запускаемых в пуле потоков (по умолчанию создается 40 воркеров).
Так как дело происходит с FastAPI, делается это через lifespan используя настройки anyio:
Зачем менять количество воркеров?
- уменьшить, если оперативки мало (один тред занимает ~8мб)
- увеличить чтобы выдержать нагрузку
Если есть предложения получше при тех же вводных - предлагайте😉
#async
Выглядело это примерно так:
from fastapi.concurrency import run_in_threadpool
async def execute(data: DataRequest) -> DataResponse:
try:
result = await run_in_threadpool(sync_function, data)
return DataResponse(data=result)
except Exception as e:
return DataResponse(
error=str(e),
success=False,
)
В общем работает нормально. Для всех вызовов под капотом используется общий тредпул, всё работает предсказуемо.
Но потребовалось изменить количество запускаемых в пуле потоков (по умолчанию создается 40 воркеров).
Так как дело происходит с FastAPI, делается это через lifespan используя настройки anyio:
import anyio
@asynccontextmanager
async def lifespan(app: FastAPI):
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100
yield
# если вдруг нужно вернуть обратно
limiter.total_tokens = 40
Зачем менять количество воркеров?
- уменьшить, если оперативки мало (один тред занимает ~8мб)
- увеличить чтобы выдержать нагрузку
Если есть предложения получше при тех же вводных - предлагайте😉
#async
❤5👍3🔥1
Nuitka 4.0
Библиотека для компиляции python-кода в исполняемые файлы получила мажорный апдейт.
Ключевые изменения:
- ускорение сборки бинарника в 15 раз
- экспериментальная поддержка компилятора Zig
- возможность выбранные функции оставлять как есть, в виде байт кода через декоратор
- бинарники работают до 30% быстрей (CPU-bound задачи)
- теперь можно вместо отдельного скрипта сборки использовать pyproject.toml. Весь конфиг в одном файле!
- улучшен контроль подключения DLL библиотек для Windows
- улучшена совместимость с рядом популярных и не очень пакетами
И в целом выбран путь на повышение производительности бинарной сборки.
Полная информация ➡️ здесь.
#libs
Библиотека для компиляции python-кода в исполняемые файлы получила мажорный апдейт.
Ключевые изменения:
- ускорение сборки бинарника в 15 раз
- экспериментальная поддержка компилятора Zig
- возможность выбранные функции оставлять как есть, в виде байт кода через декоратор
@nuitka_ignore- бинарники работают до 30% быстрей (CPU-bound задачи)
- теперь можно вместо отдельного скрипта сборки использовать pyproject.toml. Весь конфиг в одном файле!
- улучшен контроль подключения DLL библиотек для Windows
- улучшена совместимость с рядом популярных и не очень пакетами
И в целом выбран путь на повышение производительности бинарной сборки.
Полная информация ➡️ здесь.
#libs
❤9👍2👏1😱1
В асинхронных приложениях есть один не всегда очевидный момент, который приводит к неявным багам - это общие глобальные объекты. Да, это в целом антипаттерн, но иногда такие объекты действительно нужны и вполне уместны. Например, когда вы завязаны на уже существующем коде и не можете его поменять, но переменную подставлять надо, при этом явно пробросить её не получится. Для этого используется
Случаи бывают разные но я приведу самый понятный пример - логирование.
Допустим, мы не можем передать id юзера куда-то внутрь фреймворка, но можем использовать его в своем хендлере подставляя как переменную. В примере будет без хендлера но суть та же.
Во всех выводах id должен совпадать.
- Точно так же можно подставлять атрибуты у инстансов синглтонов.
- Контекст наследуется дочерними корутинами.
- Не стоит увлекаться этим способом, он прилично усложняет логику. Явное лучше неявного.
@asyncio @tricks
ContexctVar.Случаи бывают разные но я приведу самый понятный пример - логирование.
Допустим, мы не можем передать id юзера куда-то внутрь фреймворка, но можем использовать его в своем хендлере подставляя как переменную. В примере будет без хендлера но суть та же.
import asyncio
from contextvars import ContextVar
import random
# это переменная, которая будет разной для каждой корутины
user_id_ctx = ContextVar("user_id", default=-1)
async def handle_request(user_name, user_id):
# устанавливаем значение для текущей корутины и её дочерних корутин
user_id_ctx.set(user_id)
print(f"Create {user_name} == {user_id}")
await asyncio.sleep(random.random())
# id не передаётся в вызов
await process_order(user_name)
async def process_order(user_name):
# получаем значение из локального контекста
current_user_id = user_id_ctx.get()
print(f"Log {user_name}: {current_user_id}")
async def main():
await asyncio.gather(
*[handle_request(f"user {i}", i) for i in range(10)],
)
if __name__ == "__main__":
asyncio.run(main())
Во всех выводах id должен совпадать.
- Точно так же можно подставлять атрибуты у инстансов синглтонов.
- Контекст наследуется дочерними корутинами.
- Не стоит увлекаться этим способом, он прилично усложняет логику. Явное лучше неявного.
@asyncio @tricks
👍5❤2
Теперь аналогичная история с тредами. Для тредов используется объект
Он позволяет создать локальный динамический атрибут (да, вот так костыльно) для треда.
Вот базовый пример:
Вывод должен быть аналогичным, с соотетстивем номера треда и id юзера.
#tricks
threading.local.Он позволяет создать локальный динамический атрибут (да, вот так костыльно) для треда.
Вот базовый пример:
import threading
import time
import random
# глобальная переменная
thread_data = threading.local()
def execute():
# поулчаем локальное значение для текущего треда
current_user_id = getattr(thread_data, "user_id", -1)
print(f"Log {threading.current_thread().name}: {current_user_id}")
def thread_task(user_id):
# устанавливаем значение для текущего треда
time.sleep(random.random())
thread_data.user_id = user_id
print(f"Create {threading.current_thread().name} == {user_id}")
execute()
threads = [
threading.Thread(
target=thread_task,
args=(i,),
name=f"Thread-{i}")
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
Вывод должен быть аналогичным, с соотетстивем номера треда и id юзера.
Есть еще один пример здесь
#tricks
Gist
thread-local-obj.ipynb
GitHub Gist: instantly share code, notes, and snippets.
❤2👍1
Мы рассмотрели два способа управления конеткстом переменных. Если вам показалось, что это выглядит излишне и можно было бы оставить один, то вам не показалось.
Способ с
После появления ContextVar в PEP567 его рекомендовано использовать вместо
И даже был сделан бекпорт для версия ниже 3.7.1.
Теперь, если совместить ContextVar и Proxy-класс из прошлого примера то получим такой класс↗️.
Но у этого класса есть две проблемы:
1️⃣ Нигде не вызывается reset для сброса переменной, что может приводить проблемам
- утечка памяти
- "грязный" конеткст при переиспользовании потоков
- невозможность вернуться к дефолту
Решим это с помощью конектстного менеджера:
Пример использования:
Теперь прокси готов, но...
2️⃣ В асинхронном коде, для которого и придуманы ContextVar, созданием корутин занимается Event Loop, именно он отвечает за наследование контекста дочерними корутинами. В случае с потоками ничего такого нет, мы сами себе "эвентлуп", поэтому приходится прописывать копирование конеткста самстоятельно.
Пример проблемы с отсутствием наследованием конеткста в потоках↗️
Для решения есть функция копирования текущего контекста и метод запуска функции с новым конектстом:
Здесь сложно придумать универсальное автоматическое копирование контекста, самая простая функция будет выглядеть так:
И если вернуться к нашему синхронному ApiClient, то придётся следить за конектстом самостоятельно. И если где-то в коде библиотеки уже есть вызов тредов, то это работать не будет, придется переписывать.
Полный пример Proxy с CоntextVar↗️
Пример использования:
Еще вариант, это кастомные
И нет, это не пример как надо делать в проде) Это просто эксперемент для понимания процесса.
#tricks
Способ с
threading.local придуман для разделения переменных между потоками. CоntextVar был добавлен как новый метод для асинхронного кода, но оказался настолько универсальным, что его можно использовать и с потоками. После появления ContextVar в PEP567 его рекомендовано использовать вместо
threading.local. И даже был сделан бекпорт для версия ниже 3.7.1.
Теперь, если совместить ContextVar и Proxy-класс из прошлого примера то получим такой класс↗️.
Но у этого класса есть две проблемы:
1️⃣ Нигде не вызывается reset для сброса переменной, что может приводить проблемам
- утечка памяти
- "грязный" конеткст при переиспользовании потоков
- невозможность вернуться к дефолту
Решим это с помощью конектстного менеджера:
@contextlib.contextmanager
def configure_context(self, *args, **kwargs):
"""Синхронный контекстный менеджер (для `with`)"""
tok_cfg = self._cv_config.set((args, kwargs))
tok_obj = self._cv_object.set(None)
try:
yield self
finally:
self._cv_object.reset(tok_obj)
self._cv_config.reset(tok_cfg)
@contextlib.asynccontextmanager
async def aconfigure_context(self, *args, **kwargs):
"""Асинхронный контекстный менеджер (для `async with`)"""
tok_cfg = self._cv_config.set((args, kwargs))
tok_obj = self._cv_object.set(None)
try:
yield self
finally:
self._cv_object.reset(tok_obj)
self._cv_config.reset(tok_cfg)
Пример использования:
with proxy.configure_context(val1, val2):
proxy.do_something()
Теперь прокси готов, но...
2️⃣ В асинхронном коде, для которого и придуманы ContextVar, созданием корутин занимается Event Loop, именно он отвечает за наследование контекста дочерними корутинами. В случае с потоками ничего такого нет, мы сами себе "эвентлуп", поэтому приходится прописывать копирование конеткста самстоятельно.
Пример проблемы с отсутствием наследованием конеткста в потоках↗️
Для решения есть функция копирования текущего контекста и метод запуска функции с новым конектстом:
сontextvars.copy_context().run(func, *args, **kwargs)
Здесь сложно придумать универсальное автоматическое копирование контекста, самая простая функция будет выглядеть так:
def run_in_thread_with_context(
func: Callable, *args, **kwargs
) -> threading.Thread:
ctx = contextvars.copy_context()
t = threading.Thread(
target=lambda: ctx.run(func, *args, **kwargs)
)
t.start()
return t
И если вернуться к нашему синхронному ApiClient, то придётся следить за конектстом самостоятельно. И если где-то в коде библиотеки уже есть вызов тредов, то это работать не будет, придется переписывать.
threading.local тоже не наследует конеткст.
Полный пример Proxy с CоntextVar↗️
Пример использования:
client = ContextVarProxy(ApiClient)
def worker_in_thread(token):
with client.configure_context(token=token):
use_client(...)
Еще вариант, это кастомные
ThreadExecutor и Thread с поддержкой автокопирования контекста. Забираем здесь↗️И нет, это не пример как надо делать в проде) Это просто эксперемент для понимания процесса.
#tricks
Python Enhancement Proposals (PEPs)
PEP 567 – Context Variables | peps.python.org
This PEP proposes a new contextvars module and a set of new CPython C APIs to support context variables. This concept is similar to thread-local storage (TLS), but, unlike TLS, it also allows correctly keeping track of values per asynchronous task, e.g...
❤3👍1
Как-то давно писал трансфер файлов по сети.
В этом проекте требовалось создавать файл, который сразу существует на диске, имеет нужный размер но еще не содержит данных.
Вот примеры как создать такой файл:
Файл создается моментально и получается полностью состоящий из нулей. Более того, он не занимает место над диске!
Это называется sparse files - разреженные файлы. На таких файловых системах как
Если запустить тоже самое на Windows, то результат будет другой. Файл будет создаваться долго и реально займет место на диске.
Таким образом мы делаем преалокацию файла с возможностью писать в любое место, например так работают торренты.
В моем случае было многопоточное скачивание разных кусков файлов с возможностью докачки.
При копировании таких файлов чаще всего копия занимает всё положенное ей место.
Чтобы учитывать такое свойство файла нужно использовать специальные опции
Для тестирования трансфера требовалось создавать реальные файлы с рандомными данными. Сделать это просто:
Тут, конечно, никаких разреженных файлов быть не может.
#tricks
В этом проекте требовалось создавать файл, который сразу существует на диске, имеет нужный размер но еще не содержит данных.
Вот примеры как создать такой файл:
length = 1024 * 1024 * 1024 * 100
with open(file_path, "wb") as out:
out.seek(length-1)
out.write(b"\0")
with open(file_path, "wb") as out:
out.truncate(1024 * 1024 * 1024 * 120)
truncate -s 100M test
Файл создается моментально и получается полностью состоящий из нулей. Более того, он не занимает место над диске!
Это называется sparse files - разреженные файлы. На таких файловых системах как
ext4, XFS, Btrfs, ZFS файл автоматически становится разреженным если процесс пишет за пределы конца файла. В структуре файла создаются "дырки" которые автоматически при чтении вернут нули.Если запустить тоже самое на Windows, то результат будет другой. Файл будет создаваться долго и реально займет место на диске.
NTFS умеет создавать разреженные файлы, но это надо активировать явно:import os
import msvcrt
import ctypes
file_path = r"C:\file"
length = 1024 * 1024 * 1024 * 100 # 100 GB
with open(file_path, "wb") as f:
handle = msvcrt.get_osfhandle(f.fileno())
FSCTL_SET_SPARSE = 0x900C4
bytes_returned = ctypes.c_ulong()
ctypes.windll.kernel32.DeviceIoControl(
handle, FSCTL_SET_SPARSE, None, 0, None, 0,
ctypes.byref(bytes_returned), None
)
f.seek(length-1)
f.write(b"\0")
Таким образом мы делаем преалокацию файла с возможностью писать в любое место, например так работают торренты.
В моем случае было многопоточное скачивание разных кусков файлов с возможностью докачки.
При копировании таких файлов чаще всего копия занимает всё положенное ей место.
Чтобы учитывать такое свойство файла нужно использовать специальные опции
shutil.copyfile(src, dst, follow_symlinks=False)
rsync -S ...
robocopy /SPARSE ...
Для тестирования трансфера требовалось создавать реальные файлы с рандомными данными. Сделать это просто:
import os
with open(file_path, "wb") as out:
for _ in range(1024):
out.write(os.urandom(1024*1024*10))
dd if=/dev/urandom of=file.bin bs=1M count=10
Тут, конечно, никаких разреженных файлов быть не может.
#tricks
Wikipedia
Sparse file
computer file stored with logically zero, physically unallocated holes
🔥8❤3👍2
В Python 3.6 был полностью переработан стандартный dict. Вместо разреженной таблицы данные стали храниться в плотных массивах. Это дало буст к скорости и экономию памяти. И, как сайд эффект, ключи стали упорядочены. В каком порядке добавляешь ключи, в таком можно и забрать.
Но при этом OrderedDict никуда не делся. Это сделано для совместимости?
Нет, между
▫️ При сравнении в обычном
▫️
А метод
▫️
В версии 3.7 он был переписан на С и стал быстрей, но всё еще уступает обычному dict.
Немного тестов:
Память: в 2.5-3 раза больше
Создание: в ~2 раза дольше
Удаление: в ~3 раза быстрей (
Поиск по ключу: примерно одинаково (хеш таблицы)
Код тестов↗️
#libs
Но при этом OrderedDict никуда не делся. Это сделано для совместимости?
Нет, между
dict и OrderedDict всё ещё большая разница.▫️ При сравнении в обычном
dict проверяется только наличие ключа, а в OrderedDict проверяется их порядок▫️
OrderedDict основан на связном списке и имеет метод move_to_end() для изменения порядка элементов.А метод
popitem() позволяет удалять элемент как с конца так и из начала.▫️
OrderedDict это кастомный класс. Он не так оптимизирован как обычный dict. Работает дольше, места занимает больше. В версии 3.7 он был переписан на С и стал быстрей, но всё еще уступает обычному dict.
Немного тестов:
Память: в 2.5-3 раза больше
Создание: в ~2 раза дольше
Удаление: в ~3 раза быстрей (
popitem против del)Поиск по ключу: примерно одинаково (хеш таблицы)
Код тестов↗️
Если вы используете OrderedDict, то это предполагает, что порядок ключей важен для логики программы.
#libs
Python documentation
collections — Container datatypes
Source code: Lib/collections/__init__.py This module implements specialized container datatypes providing alternatives to Python’s general purpose built-in containers, dict, list, set, and tuple.,,...
❤5👍5