Три способа выполнить множество задач с asyncio
Функция для примера:
1. Последовательный вызов
Такой вызов имеет смысл только тогда, когда результат одной задачи требуется для вызова следующей.
Если они независимы, то это антипаттерн, так как аналогичен простому синхронному вызову по очереди.
2. Упорядоченный результат
Выполняет корутины конкурентно и возвращает результат в виде списка.
Полезен когда требуется получить результаты в том же порядке в котором задачи отправлены.
3. Результат по мере готовности
Так же выполняет корутины конкурентно, но не гарантирует порядок. Результат возвращается по мере готовности, каждый отдельно.
Полезен когда нужно обработать любой ответ как можно скорее.
#async
Функция для примера:
async def do_it(n):
await asyncio.sleep(random.uniform(0.5, 1))
return n
1. Последовательный вызов
async def main():
for i in range(100):
result = await do_it(i)
Такой вызов имеет смысл только тогда, когда результат одной задачи требуется для вызова следующей.
Если они независимы, то это антипаттерн, так как аналогичен простому синхронному вызову по очереди.
2. Упорядоченный результат
async def main():
tasks = [do_it(i) for i in range(100)]
results = await asyncio.gather(*tasks)
Выполняет корутины конкурентно и возвращает результат в виде списка.
Полезен когда требуется получить результаты в том же порядке в котором задачи отправлены.
3. Результат по мере готовности
tasks = [asyncio.create_task(do_it(i)) for i in range(100)]
for cor in asyncio.as_completed(tasks):
result = await cor
Так же выполняет корутины конкурентно, но не гарантирует порядок. Результат возвращается по мере готовности, каждый отдельно.
Полезен когда нужно обработать любой ответ как можно скорее.
#async
❤8👍6
Функция
Она работает в нескольких режимах.
1. Самый простой - ждем завершения всех задач
Очень похоже на
▫️возвращает не результаты, а два сета с объектами
▫️не гарантирует порядок результатов так как оба объекта это
▫️не выбрасывает исключение когда оно появляется, а сохраняет его в Task. Исключение появится когда попробуете забрать резултьтат.
2. Ждем завершения первой задачи, даже если там ошибка.
В сете
3. До первой ошибки.
Тоже самое, но с аргументом
Функция завершается как только первая задача упадет с ошибкой.
↗️ Полный листинг примеров здесь
#async
asyncio.wait() это еще один способ вызвать множество асинхронных задач.Она работает в нескольких режимах.
1. Самый простой - ждем завершения всех задач
async def main():
tasks = [asyncio.create_task(do_it(i)) for i in range(10)]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
try:
print(task.result())
except Exception as e:
print(e)
Очень похоже на
gather, но работает не так.▫️возвращает не результаты, а два сета с объектами
Task у которых можно забрать результат через task.result() если они в списке done▫️не гарантирует порядок результатов так как оба объекта это
set▫️не выбрасывает исключение когда оно появляется, а сохраняет его в Task. Исключение появится когда попробуете забрать резултьтат.
2. Ждем завершения первой задачи, даже если там ошибка.
async def main():
tasks = [asyncio.create_task(do_it(i)) for i in range(3)]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# в done может быть несколько задач!
for task in done:
try:
print(task.result())
except Exception as e:
print(f"Fail: {e}")
# Оставшиеся задачи в pending, как правило, нужно отменить, иначе они будут продолжать работать
for task in pending:
task.cancel()
В сете
done будут таски которые успели завершится, причем как успешно так и нет.3. До первой ошибки.
Тоже самое, но с аргументом
FIRST_EXCEPTIONdone, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION
)
Функция завершается как только первая задача упадет с ошибкой.
Учтите, что в любом случае done вы можете обранужить несколько задач, как с ошибками так и успешные.
↗️ Полный листинг примеров здесь
#async
Gist
asyncio-wait-example.py
GitHub Gist: instantly share code, notes, and snippets.
👍7❤2
Отдельно разберём
Ключевые отличия
▫️
▫️это контекстный менеджер, который гарантирует что все таски будут остановлены по выходу из контекста
▫️ошибка автоматически отменяет незавершенные задачи,
▫️
Рекомендую изучить страницу Coroutines and Tasks из документации, где представлено больше интересных примеров и механизмов
- таймауты
- отмена задач
- создание задач из другого потока
#async
TaskGroup, который пришел на замену gather в Python 3.11.Ключевые отличия
▫️
create_task() возвращает объект asyncio.Task, у которого есть соответствюущие методы управления. То есть у нас больше контроля▫️это контекстный менеджер, который гарантирует что все таски будут остановлены по выходу из контекста
▫️ошибка автоматически отменяет незавершенные задачи,
▫️
except* передает нам ExceptionGroup, в котором каждую ошибку можно обработать отдельно import asyncio
import random
async def do_it() -> str:
if random.random() < 0.1:
raise ValueError('Oops')
delay = random.uniform(0.5, 1.5)
await asyncio.sleep(delay)
return delay
async def main():
try:
async with asyncio.TaskGroup() as tg:
for _ in range(10):
tasks.append(tg.create_task(do_it()))
for t in tasks:
print(t.result())
except *ValueError as e:
for err in e.exceptions:
print(err)
asyncio.run(main())
Рекомендую изучить страницу Coroutines and Tasks из документации, где представлено больше интересных примеров и механизмов
- таймауты
- отмена задач
- создание задач из другого потока
#async
Python documentation
Coroutines and tasks
This section outlines high-level asyncio APIs to work with coroutines and Tasks. Coroutines, Awaitables, Creating tasks, Task cancellation, Task groups, Sleeping, Running tasks concurrently, Eager ...
❤4👍1
Если запустить REPL с модулем asyncio, то вы входите в особый асинхронный REPL.
В этом режиме
- создаётся и настраивается event loop
- уже импортирован asyncio
- работает await на верхнем уровне
То есть такая команда сработает без ошибок!
Удобно для тестирования асинхронных функций без создания ивентлупов и остальной обвязки.
#tricks #async
user@host:~$ python -m asyncio
asyncio REPL 3.12.7 ...
Use "await" directly instead of "asyncio.run()".
>>> import asyncio
>>>
В этом режиме
- создаётся и настраивается event loop
- уже импортирован asyncio
- работает await на верхнем уровне
То есть такая команда сработает без ошибок!
await asyncio.sleep(3)
Удобно для тестирования асинхронных функций без создания ивентлупов и остальной обвязки.
Работает в: 3.8+
#tricks #async
🔥14😁2👏1
Стандартная библиотека asyncio это стандарт (начиная с Py3.4) для работы с асинхронным кодом. Но эта библиотека достаточно низкоуровневая, со своими проблемами, устаревшими подходами.
Чтобы исправить это, были созданы разные обертки и альтернативы с реализацией популярных инструментов и паттернов асинхронного программирования. Это такие библиотеки как:
- trio: улучшает корректность выполнения, не оставляя потерянных корутин при ошибках, то есть предлагает Structured Concurrency из коробки.
- curio: упрощение синтаксиса и читаемости кода, больше похоже на работу с потоками.
- anyio: универсальная обертка над asyncio или trio плюс множество вспомогательных инструментов.
В общем, рекомендую почитать про возможности anyio, возможно вы более не будете использовать чистый asyncio в своих проектах)
Это совсем не значит что дефолтный asyncio плох, он тоже даёт достаточный для работы функционал и продолжает развиваться. Например, в версии 3.11 появились TaskGroup, с похожим на trio функционалом. Так что он тоже актуален, просто придется больше написать кода самостоятельно.
#libs #async
Чтобы исправить это, были созданы разные обертки и альтернативы с реализацией популярных инструментов и паттернов асинхронного программирования. Это такие библиотеки как:
- trio: улучшает корректность выполнения, не оставляя потерянных корутин при ошибках, то есть предлагает Structured Concurrency из коробки.
- curio: упрощение синтаксиса и читаемости кода, больше похоже на работу с потоками.
- anyio: универсальная обертка над asyncio или trio плюс множество вспомогательных инструментов.
anyio используется в FastAPI как основная библиотека для работы с асинхронным кодом и вызовом синхронного кода из асинхронного.
В общем, рекомендую почитать про возможности anyio, возможно вы более не будете использовать чистый asyncio в своих проектах)
Это совсем не значит что дефолтный asyncio плох, он тоже даёт достаточный для работы функционал и продолжает развиваться. Например, в версии 3.11 появились TaskGroup, с похожим на trio функционалом. Так что он тоже актуален, просто придется больше написать кода самостоятельно.
#libs #async
❤5👍2
Недавно делал быстрый прототип асинхронного приложения в котором требовалось вызывать много синхронного кода. Да, я знаю, что это не лучший дизайн, но нужно было быстрое решение на один процесс и без очередей. Поэтому я выполнял код в потоках.
Выглядело это примерно так:
В общем работает нормально. Для всех вызовов под капотом используется общий тредпул, всё работает предсказуемо.
Но потребовалось изменить количество запускаемых в пуле потоков (по умолчанию создается 40 воркеров).
Так как дело происходит с FastAPI, делается это через lifespan используя настройки anyio:
Зачем менять количество воркеров?
- уменьшить, если оперативки мало (один тред занимает ~8мб)
- увеличить чтобы выдержать нагрузку
Если есть предложения получше при тех же вводных - предлагайте😉
#async
Выглядело это примерно так:
from fastapi.concurrency import run_in_threadpool
async def execute(data: DataRequest) -> DataResponse:
try:
result = await run_in_threadpool(sync_function, data)
return DataResponse(data=result)
except Exception as e:
return DataResponse(
error=str(e),
success=False,
)
В общем работает нормально. Для всех вызовов под капотом используется общий тредпул, всё работает предсказуемо.
Но потребовалось изменить количество запускаемых в пуле потоков (по умолчанию создается 40 воркеров).
Так как дело происходит с FastAPI, делается это через lifespan используя настройки anyio:
import anyio
@asynccontextmanager
async def lifespan(app: FastAPI):
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100
yield
# если вдруг нужно вернуть обратно
limiter.total_tokens = 40
Зачем менять количество воркеров?
- уменьшить, если оперативки мало (один тред занимает ~8мб)
- увеличить чтобы выдержать нагрузку
Если есть предложения получше при тех же вводных - предлагайте😉
#async
❤5👍3🔥1