Python Заметки
2.3K subscribers
59 photos
2 videos
2 files
213 links
Интересные заметки и обучающие материалы по Python

Контакт: @paulwinex

⚠️ Рекламу на канале не делаю!⚠️

Хештеги для поиска:
#tricks
#libs
#pep
#basic
#regex
#qt
#django
#2to3
#source
#offtop
Download Telegram
Быстрый встроенный профайлинг на Linux с помощью time

time python -c 'for i in range(10**7): i**2'


Покажет время выполнения процесса
real    0m2,470s
user 0m2,405s
sys 0m0,074s

real - Общее время, прошедшее с момента запуска до завершения программы. Включая время ожидания I\O или переключения контекста.
user - Количество времени, которое CPU потратил на выполнение кода самой программы в пользовательском режиме.
sys - Количество времени, которое CPU потратил на выполнение системных вызовов (операций ядра, таких как чтение/запись файлов, управление памятью) от имени программы.

Но это встроенная команда из моей оболочки. Есть такая же GNU-утилита и она может показывать больше информации. Но нужно вызывать по абсолютному пути, так как builtin команда имеет бОльший приоритет.

/usr/bin/time -v python -c 'for i in range(10**7): i**2'

Command being timed: "python -c for i in range(10**7): i**2"
User time (seconds): 2.38
System time (seconds): 0.07
Percent of CPU this job got: 100%
...

Кроме времени исполнения будет также показано много другой полезной информации
- эффективность использования CPU (в %)
- максимальный объем занятой памяти
- обращения к файлам
- код выхода

И другие сведения.

#tricks
🔥82👍2
7.09.2025 состоялся релиз Pithon 3.14!

На фоне хайпа про NoGIL всё позабыли про другие фичи. Особенно про Multiple Interpreters, который обещает изоляцию процессов но с эффективностью потоков! На сколько действительно это будет эффективно мы узнаем позже, потому что сейчас это лишь первый релиз с ограничениями и недоработками.

Но что там про NoGIL? Теперь этот режим не экспериментальный, а официально поддерживаемый, но опциональный.
Чтобы запустить без GIL нужна специальная сборка. И перед стартом нужно объявить переменную PYTHON_GIL=0

Для вас я собрал готовый репозиторий где достаточно запустить скрпит, который всё сделает:
▫️ соберет релизный Python 3.14 в новый Docker-образ
▫️ запустит тесты в контейнере (GIL, NoGIL, MultiInterpreter)
▫️ распечатает результаты

Тест очень простой, усложняйте сами)
Вот какие результаты у меня:
=== Running ThreadPoolExecutor GIL ON
TOTAL TIME: 45.48 seconds
=== Running ThreadPoolExecutor GIL OFF
TOTAL TIME: 6.14 seconds
=== Running basic Thread GIL ON
TOTAL TIME: 45.54 seconds
=== Running basic Thread GIL OFF
TOTAL TIME: 4.74 seconds
=== Running with Multi Interpreter
TOTAL TIME: 18.30 seconds


Если сравнивать GIL и NoGIL, то на мои 32 ядра прирост х7-x10 (почему не х32? 🤷). При этом нам обещают что скорости будут расти с новыми релизами.
Режим без GIL похож (визуально) на async, тоже параллельно, тоже не по порядку. Но это не IO! и от того некоторый диссонанс в голове 😵‍💫, нас учили не так!

Интересно, что чистый Thread работает быстрей чем ThreadPoolExecutor без GIL.

Ну и где-то плачет один адепт мульти-интерпретаторов😭 Теперь нужно искать где они могут пригодиться с такой-то скоростью. Скорее всего своя область применения найдется.

Отдельно я затестил память и вот что вышло на 32 потока:
ThreadPoolExecutor GIL ON
305.228 MB
ThreadPoolExecutor GIL OFF
500.176 MB
basic Thread GIL ON
90.668 MB
basic Thread GIL OFF
472.444 MB
with Multi Interpreter
1267.788 MB

Пока не знаю как к этому относиться)

В целом - радует направление развития!

#release
12👍4🔥2👎1
Использование Pydantic сегодня стало нормой, и это правильно. Но иногда на ревью вижу, что используют его не всегда корректно.
Например, метод BaseModel.model_dump() по умолчанию не преобразует стандартные типы, такие как datetime, UUID или Decimal, в простой сериализуемый для JSON вид. Тогда пишут кастмоный сериализатор для этих типов чтобы функция json.dump() не падала с ошибкой.

import uuid
from datetime import datetime
from decimal import Decimal
from uuid import UUID
from pydantic import BaseModel

class MyModel(BaseModel):
id: UUID
date: datetime
value: Decimal


obj = MyModel(
id=uuid.uuid4(),
date=datetime.now(),
value='1.23'
)
print(obj.model_dump())
# не подходит для json.dump
# {
# 'id': UUID('4f8c1bc4-25fd-40cd-9dbe-2c73639b0dc1'),
# 'date': datetime.datetime(2025, 12, 12, 12, 12, 12, 111111),
# 'value': Decimal('1.23')
# }
# добавляем свой кастомный сериализатор
json.dumps(obj.model_dump(), cls=MySerializer)
# {
# 'id': '4f8c1bc4-25fd-40cd-9dbe-2c73639b0dc1',
# 'date': '2025-12-12T12:12:12.111111',
# 'value': '1.23'
# }


В данном случае класс MySerializer обрабатывает datetime, UUID и Decimal. Например так:
class MySerializer(json.JSONEncoder):
def default(self, o):
if isinstance(o, Decimal):
return str(o)
elif isinstance(o, datetime):
return o.isoformat()
elif isinstance(o, UUID):
return str(o)
return super().default(o)


Специально для тех, кто всё еще так делает - в этом нет необходимости!
Pydantic может это сделать сам, просто нужно добавить параметр mode="json".
json.dumps(obj.model_dump(mode="json"))
# {
# 'id': '4f8c1bc4-25fd-40cd-9dbe-2c73639b0dc1',
# 'date': '2012-12-12T12:12:12.111111',
# 'value': '1.23'
# }


#pydantic #libs
9👍7🔥5
Всех 3Dшников спраздником!!!🔥💫💥
14🎉5👍1
Три способа выполнить множество задач с asyncio

Функция для примера:
async def do_it(n):
await asyncio.sleep(random.uniform(0.5, 1))
return n


1. Последовательный вызов
async def main():
for i in range(100):
result = await do_it(i)

Такой вызов имеет смысл только тогда, когда результат одной задачи требуется для вызова следующей.
Если они независимы, то это антипаттерн, так как аналогичен простому синхронному вызову по очереди.

2. Упорядоченный результат
async def main():
tasks = [do_it(i) for i in range(100)]
results = await asyncio.gather(*tasks)

Выполняет корутины конкурентно и возвращает результат в виде списка.
Полезен когда требуется получить результаты в том же порядке в котором задачи отправлены.

3. Результат по мере готовности
tasks = [asyncio.create_task(do_it(i)) for i in range(100)]
for cor in asyncio.as_completed(tasks):
result = await cor

Так же выполняет корутины конкурентно, но не гарантирует порядок. Результат возвращается по мере готовности, каждый отдельно.
Полезен когда нужно обработать любой ответ как можно скорее.

#async
6👍5
Функция asyncio.wait() это еще один способ вызвать множество асинхронных задач.
Она работает в нескольких режимах.

1. Самый простой - ждем завершения всех задач
async def main():
tasks = [asyncio.create_task(do_it(i)) for i in range(10)]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
try:
print(task.result())
except Exception as e:
print(e)

Очень похоже на gather, но работает не так.
▫️возвращает не результаты, а два сета с объектами Task у которых можно забрать результат через task.result() если они в списке done
▫️не гарантирует порядок результатов так как оба объекта это set
▫️не выбрасывает исключение когда оно появляется, а сохраняет его в Task. Исключение появится когда попробуете забрать резултьтат.

2. Ждем завершения первой задачи, даже если там ошибка.
async def main():
tasks = [asyncio.create_task(do_it(i)) for i in range(3)]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# в done может быть несколько задач!
for task in done:
try:
print(task.result())
except Exception as e:
print(f"Fail: {e}")
# Оставшиеся задачи в pending, как правило, нужно отменить, иначе они будут продолжать работать
for task in pending:
task.cancel()

В сете done будут таски которые успели завершится, причем как успешно так и нет.

3. До первой ошибки.
Тоже самое, но с аргументом FIRST_EXCEPTION
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION
)

Функция завершается как только первая задача упадет с ошибкой.

Учтите, что в любом случае done вы можете обранужить несколько задач, как с ошибками так и успешные.

↗️ Полный листинг примеров здесь

#async
👍52
Отдельно разберём TaskGroup, который пришел на замену gather в Python 3.11.

Ключевые отличия
▫️create_task() возвращает объект asyncio.Task, у которого есть соответствюущие методы управления. То есть у нас больше контроля
▫️это контекстный менеджер, который гарантирует что все таски будут остановлены по выходу из контекста
▫️ошибка автоматически отменяет незавершенные задачи,
▫️except* передает нам ExceptionGroup, в котором каждую ошибку можно обработать отдельно
import asyncio
import random

async def do_it() -> str:
if random.random() < 0.1:
raise ValueError('Oops')
delay = random.uniform(0.5, 1.5)
await asyncio.sleep(delay)
return delay

async def main():
try:
async with asyncio.TaskGroup() as tg:
for _ in range(10):
tasks.append(tg.create_task(do_it()))
for t in tasks:
print(t.result())
except *ValueError as e:
for err in e.exceptions:
print(err)

asyncio.run(main())


Рекомендую изучить страницу Coroutines and Tasks из документации, где представлено больше интересных примеров и механизмов
- таймауты
- отмена задач
- создание задач из другого потока

#async
3