Python Заметки
2.22K subscribers
62 photos
2 videos
2 files
229 links
Интересные заметки и обучающие материалы по Python

Контакт: @paulwinex

⚠️ Рекламу на канале не делаю!⚠️

Хештеги для поиска:
#tricks
#libs
#pep
#basic
#regex
#qt
#django
#2to3
#source
#offtop
Download Telegram
Три способа выполнить множество задач с asyncio

Функция для примера:
async def do_it(n):
await asyncio.sleep(random.uniform(0.5, 1))
return n


1. Последовательный вызов
async def main():
for i in range(100):
result = await do_it(i)

Такой вызов имеет смысл только тогда, когда результат одной задачи требуется для вызова следующей.
Если они независимы, то это антипаттерн, так как аналогичен простому синхронному вызову по очереди.

2. Упорядоченный результат
async def main():
tasks = [do_it(i) for i in range(100)]
results = await asyncio.gather(*tasks)

Выполняет корутины конкурентно и возвращает результат в виде списка.
Полезен когда требуется получить результаты в том же порядке в котором задачи отправлены.

3. Результат по мере готовности
tasks = [asyncio.create_task(do_it(i)) for i in range(100)]
for cor in asyncio.as_completed(tasks):
result = await cor

Так же выполняет корутины конкурентно, но не гарантирует порядок. Результат возвращается по мере готовности, каждый отдельно.
Полезен когда нужно обработать любой ответ как можно скорее.

#async
8👍6
Функция asyncio.wait() это еще один способ вызвать множество асинхронных задач.
Она работает в нескольких режимах.

1. Самый простой - ждем завершения всех задач
async def main():
tasks = [asyncio.create_task(do_it(i)) for i in range(10)]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
try:
print(task.result())
except Exception as e:
print(e)

Очень похоже на gather, но работает не так.
▫️возвращает не результаты, а два сета с объектами Task у которых можно забрать результат через task.result() если они в списке done
▫️не гарантирует порядок результатов так как оба объекта это set
▫️не выбрасывает исключение когда оно появляется, а сохраняет его в Task. Исключение появится когда попробуете забрать резултьтат.

2. Ждем завершения первой задачи, даже если там ошибка.
async def main():
tasks = [asyncio.create_task(do_it(i)) for i in range(3)]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# в done может быть несколько задач!
for task in done:
try:
print(task.result())
except Exception as e:
print(f"Fail: {e}")
# Оставшиеся задачи в pending, как правило, нужно отменить, иначе они будут продолжать работать
for task in pending:
task.cancel()

В сете done будут таски которые успели завершится, причем как успешно так и нет.

3. До первой ошибки.
Тоже самое, но с аргументом FIRST_EXCEPTION
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION
)

Функция завершается как только первая задача упадет с ошибкой.

Учтите, что в любом случае done вы можете обранужить несколько задач, как с ошибками так и успешные.

↗️ Полный листинг примеров здесь

#async
👍72
Отдельно разберём TaskGroup, который пришел на замену gather в Python 3.11.

Ключевые отличия
▫️create_task() возвращает объект asyncio.Task, у которого есть соответствюущие методы управления. То есть у нас больше контроля
▫️это контекстный менеджер, который гарантирует что все таски будут остановлены по выходу из контекста
▫️ошибка автоматически отменяет незавершенные задачи,
▫️except* передает нам ExceptionGroup, в котором каждую ошибку можно обработать отдельно
import asyncio
import random

async def do_it() -> str:
if random.random() < 0.1:
raise ValueError('Oops')
delay = random.uniform(0.5, 1.5)
await asyncio.sleep(delay)
return delay

async def main():
try:
async with asyncio.TaskGroup() as tg:
for _ in range(10):
tasks.append(tg.create_task(do_it()))
for t in tasks:
print(t.result())
except *ValueError as e:
for err in e.exceptions:
print(err)

asyncio.run(main())


Рекомендую изучить страницу Coroutines and Tasks из документации, где представлено больше интересных примеров и механизмов
- таймауты
- отмена задач
- создание задач из другого потока

#async
4👍1
Если запустить REPL с модулем asyncio, то вы входите в особый асинхронный REPL.

user@host:~$ python -m asyncio
asyncio REPL 3.12.7 ...
Use "await" directly instead of "asyncio.run()".
>>> import asyncio
>>>


В этом режиме
- создаётся и настраивается event loop
- уже импортирован asyncio
- работает await на верхнем уровне

То есть такая команда сработает без ошибок!
await asyncio.sleep(3)


Удобно для тестирования асинхронных функций без создания ивентлупов и остальной обвязки.
Работает в: 3.8+


#tricks #async
🔥14😁2👏1
Стандартная библиотека asyncio это стандарт (начиная с Py3.4) для работы с асинхронным кодом. Но эта библиотека достаточно низкоуровневая, со своими проблемами, устаревшими подходами.
Чтобы исправить это, были созданы разные обертки и альтернативы с реализацией популярных инструментов и паттернов асинхронного программирования. Это такие библиотеки как:

- trio: улучшает корректность выполнения, не оставляя потерянных корутин при ошибках, то есть предлагает Structured Concurrency из коробки.

- curio: упрощение синтаксиса и читаемости кода, больше похоже на работу с потоками.

- anyio: универсальная обертка над asyncio или trio плюс множество вспомогательных инструментов.

anyio используется в FastAPI как основная библиотека для работы с асинхронным кодом и вызовом синхронного кода из асинхронного.


В общем, рекомендую почитать про возможности anyio, возможно вы более не будете использовать чистый asyncio в своих проектах)

Это совсем не значит что дефолтный asyncio плох, он тоже даёт достаточный для работы функционал и продолжает развиваться. Например, в версии 3.11 появились TaskGroup, с похожим на trio функционалом. Так что он тоже актуален, просто придется больше написать кода самостоятельно.

#libs #async
5👍2
Недавно делал быстрый прототип асинхронного приложения в котором требовалось вызывать много синхронного кода. Да, я знаю, что это не лучший дизайн, но нужно было быстрое решение на один процесс и без очередей. Поэтому я выполнял код в потоках.
Выглядело это примерно так:

from fastapi.concurrency import run_in_threadpool

async def execute(data: DataRequest) -> DataResponse:
    try:
        result = await run_in_threadpool(sync_function, data)
        return DataResponse(data=result)
    except Exception as e:
        return DataResponse(
            error=str(e),
            success=False,
        )


В общем работает нормально. Для всех вызовов под капотом используется общий тредпул, всё работает предсказуемо.
Но потребовалось изменить количество запускаемых в пуле потоков (по умолчанию создается 40 воркеров).
Так как дело происходит с FastAPI, делается это через lifespan используя настройки anyio:

import anyio

@asynccontextmanager
async def lifespan(app: FastAPI):
    limiter = anyio.to_thread.current_default_thread_limiter()
    limiter.total_tokens = 100
    yield
    # если вдруг нужно вернуть обратно
    limiter.total_tokens = 40


Зачем менять количество воркеров?
- уменьшить, если оперативки мало (один тред занимает ~8мб)
- увеличить чтобы выдержать нагрузку

Если есть предложения получше при тех же вводных - предлагайте😉

#async
5👍3🔥1