Python Заметки
2.22K subscribers
62 photos
2 videos
2 files
229 links
Интересные заметки и обучающие материалы по Python

Контакт: @paulwinex

⚠️ Рекламу на канале не делаю!⚠️

Хештеги для поиска:
#tricks
#libs
#pep
#basic
#regex
#qt
#django
#2to3
#source
#offtop
Download Telegram
В работе с медиа файлами часто требуется определить не просто расширение, а его, скажем так, "категорию". Тоесть определить это видео, аудио или картинка. Примерно в 10 случаях из 10 в ревью я вижу обычный хардкодинг с большим мапингом и соответствующим поиском по нему.

file_type_by_ext = {
'video': ['.mp4', '.mov', '.mkv', ...],
'audio': ['.mp3', '.wav', '.ogg', ...],
'image': ['.jpg', '.png', '.exr', ...]
}


Для таких случаев есть простой способ - стандартная библиотека mimetypes.

import mimetypes
mimetypes.guess_type("example.txt")
# ('text/plain', None)

Причём ей не нужен файл, достаточно просто имени строкой.

Первый элемент кортежа это MIME-тип (Multipurpose Internet Mail Extensions Type) - стандартный способ идентификации формата файла.

Формат: type/subtype

type - общая категория данных (text, video, image)
subtype - конкретный формат внутри категории

mimetypes.guess_type("photo.jpg")
# ('image/jpeg', None)
mimetypes.guess_type("render.mp4")
# ('video/mp4', None)


Второй элемент это тип кодировки содержимого, обычно для контейнеров типа gz и аналогичных.
mimetypes.guess_type("file.tar.gz")
# ('application/x-tar', 'gzip')
mimetypes.guess_type("backup.tar.bz2")
# ('application/x-tar', 'bzip2')


Итого, узнать категорию файла одной строкой:
mimetypes.guess_type('myfile.mov')[0].split('/')[0]
# video

Конечно при условии, что тип будет распознан, иначе будет None а не строка. Но об этом в следующий раз.

#libs #tricks
👍20
import mimetypes
mimetypes.guess_type("example.fbx")
# (None, None)

Формат не распознан, так как не зарегистрирован в системе.
Регистрация происходит с помощью функции mimetypes.init(). Эта функция автоматически вызывается при первом обращении.
Для каждой OS работает по-разному. В Windows читает реестр, в Linux достает всё из файла /etc/mime.types, в MacOS читает из системной БД.

На linux можно попробовать распознать тип через вызов
file --mime-type -b <filename>

эта команда попробует прочитать метадату самого файла, то есть должен быть доступ к файлу. Но это не гарантия успеха.

Можно попробовать использовать нестрогое соответствие IANA с помощью флага strict=False. Тогда будут учтены старые и нестандартные типы. Обычно они с префиксом x-

Новые типы можно добавлять самостоятельно.
mimetypes.add_type('application/x-fbx', '.fbx') # с точкой
mimetypes.guess_type("example.fbx")
# ('application/x-fbx', None)


Либо вызвать init() еще раз передав список текстовых файлов с нужными вам типами (без точки)
# my-mime-types.txt
application/x-fbx fbx
application/x-ogo ogo
application/x-aga aga

mimetypes.init(['my-mime-types.txt'])
mimetypes.guess_type("example.ogo")
# ('application/x-ogo', None)


Есть и обратная операция - получить расширение файла из mime-типа
mimetypes.guess_extension('image/jpeg')
# .jpg

Или все подходящие расширения
mimetypes.guess_all_extensions('image/jpeg')
# ['.jpg', '.jpe', '.jpeg', '.jfif']


Советую почитать полную документацию
Также обратите внимание на библиотеку content-types для работы с mime-типами, где больше возможностей.

#libs #tricks
🔥4
Все знают синтаксический сахар с операторами +=, -= и тд
x += 1

Где под капотом он превращается в
x = x + 1

Останется ли переменная х той же переменной после +=?
Конечно нет, это же неизменяемый тип
x = 1
print(id(x))
# 135373664533280
x += 1
print(id(x))
# 135373664533312


Теперь провернём тоже самое со списком
ls = [1, 2]
print(id(ls))
# 135373622585344
ls = ls + [3]
print(id(ls))
# 135373619036608

Ожидаемо работает так же, ведь мы создали новую переменную.
А теперь попробуем иначе:
ls = [1, 2]
print(id(ls))
# 135373622585344
ls += [3]
print(id(ls))
# 135373622585344
print(ls)
# [1, 2, 3]

И, внезапно, это работает не так как с int, со списками оператор += работает как extend()!
То же самое будет с *=, объект останется тем же.
ls = [1, 2]
print(id(ls))
# 135373622585344
ls *= 2
print(id(ls))
# 135373622585344
print(ls)
# [1, 2, 1, 2]

Следует помнить о такой важной разнице!
(Особенно на собесах 😉)

#tricks
👍141
Не запуская код определите, что покажет терминал если выполнить следующее:

_A__b = 'c'
class A:
def get(self):
return __b
print(A().get())


Ответ: Несмотря на то, что ваш IDE покажет ошибку, ошибки не будет. Распечатается "c"

Объяснение:

1. Mangling
За это отвечает механизм mangling - искажение имени. Так работают приватные атрибуты классов.
При создании атрибута по правилу: минимум 2 "_" в начале и максимум 1 "_" в конце" имя автоматически становится вида _{classname}{attr}
В нашем случае атрибутов класса не создается, но это не отменяет Mangling при обращении к другим объектам внутри класса.

2. Обращение к атрибуту
Когда внутри класса происходит обращение к любому объекту с именем по указанному выше правилу, его имя на уровне байт кода также преобразуется.

3. Поиск
Далее происходит поиск такой переменной по неймспейсам в порядке LEGB - Local, Enclosing, Global, Built-in.
И не трудно догадаться что мы находим нужный атрибут в Global, В итоге получаем результат!

Проверить можно так:
import dis
dis.dis(A.get)
# 4 RESUME 0
#
# 5 LOAD_GLOBAL 0 (_A__b)
# RETURN_VALUE

Либо удалите переменную _A__b и запустите еще раз, поулчите ошибку:
NameError: name '_A__b' is not defined


Как думаете, это норма или баг?

#tricks
5😢3
Потоковая обработка часто встречается при работе с большими файлами или когда данные приходят частями. В Python есть множество инструментов для работы с такими данными. Самый известный - итератор файла по строкам. В веб-приложениях это стандарт для передачи файлов. Далее приведу несколько примеров.

Чтение файлов
with open('huge-file.txt') as file:
for line in file:
process_line(line)

Это позволяет нам читать текстовый файл по строкам не загружая всё в память.
Конечно, если позволяет формат данных. С JSON такое не сработает (ijson может в этом помочь).

Запись файла чанками
with open('file-to-save.txt',
'w') as file:
for line in iter_data():
file.write(line)


Частные случаи есть в разных библиотеках. Например DictWriter и DictReader из модуля csv позволяет работать с конкретным форматом данных а не просто текст.
import csv

with open('data.csv', 'r') as file:
reader = csv.DictReader(file)
for row in reader:
print(row)

with open('data.csv', 'a',
newline='') as f:
writer = csv.DictWriter(f,
fieldnames=['col1', 'col2']
)
for row in iter_objects():
writer.writerow(row)


Отдельно интересен ZipFile, позволяющий "открыть" файл сразу внутри архива и записывать его частями
import zipfile as zf

with zf.ZipFile(
'archive.zip',
'w',
compression=zf.ZIP_DEFLATED) as zf:
with zf.open(
'large_data.bin',
mode='w') as in_file:
with open(
'large_data.bin',
'rb') as source:
for chunk in iter(
lambda: source.read(1024),
b''):
in_file.write(chunk)


Создание хеша для большого файла
import hashlib

sha256 = hashlib.sha256()
with open(
'large-file.bin',
'rb') as f:
for block in iter(
lambda: f.read(1024), b''
):
sha256.update(block)
hash_sum = sha256.hexdigest()


Сжатие данных в файл отдельными чанками
import gzip

with gzip.open('data.gz', 'wb') as f:
for bin_chunk in iter_bin_data():
f.write(bin_chunk)


Чтение с записью в файл
with gzip.open('data.gz', 'rb') as f_in:
with open(
'extracted_data.txt',
'wb') as f_out:
for chunk in iter(
lambda: f_in.read(1024),
b''):
f_out.write(chunk)


Подсчет объектов из стрима. Добавление обновляет счетчики.
from collections import Counter

c = Counter()
for data in iter_objects():
c.update(data)


Это не все доступные примеры, их еще много. Каждый из них позволяет обрабатывать данные из потока не ожидая весь набор и не загружая их в оперативку.
Это очень полезная техника, которую я призываю использовать по назначению!

#tricks #libs
👍12
reload_flag=""
if [[ -n "${DEBUG}" ]]; then
reload_flag="--reload"
fi

if [[ -n "${WORKER_COUNT}" ]]; then
workers=${WORKER_COUNT}
else
workers=2
fi

gunicorn --workers ${workers} \
--bind 0.0.0.0:8000 \
${reload_flag} main.wsgi

Писали такие конструкции чтобы проверить наличие флага и сформировать команду правильно?
На самом деле можно сделать тоже самое проще. Для этого используются операторы условной подстановки, доступные в оболочках семейства POSIX.

:- для установки значений по умолчанию
${WORKER_COUNT:-2}

Если переменная не объявлена, то будет дефолтное значение 2.

:+ подставляет указанный текст, если переменная не пуста
${DEBUG:+--reload}

Если что-то есть в переменной то распечатается текст после символа +, в противном случае - ничего. Удобно для опциональных флагов, как в нашем примере.

Итого наш скрипт может выглядеть так:
gunicorn --workers ${WORKER_COUNT:-2} \
--bind 0.0.0.0:8000 \
${DEBUG:+--reload} main.wsgi


Есть еще два оператора.

:= не только подставить дефолтное значение, но и присвоить его переменной, если она пуста
# никаких переменных еще нет
VAL1=${VAL2:=hello}
# теперь доступны обе
echo $VAL1 $VAL2
# hello hello


:? остановить выполнение с ошибкой, если переменной нет.
echo ${MISS:?is required}
bash: MISS: is required

Код выхода будет 1.

#tricks #linux
👍5
Вы до сих пор используете в проекте "магические" строки?😖
@dataclass
class Task:
status: str
...

def create_pending_task(data: dict) -> Task:
task = Task(**data)
task.status = "pending" # < магическая строка
return task

Где тут проблема?

🔸 Если "pending" изменится на "wait", вам придется искать это слово по всему проекту
🔸 Напишете panding вместо pending и баг вылезет только в рантайме в непредсказуемом месте
🔸 Вам очень повезет, если в проекте нет такой же строки но с другим смыслом

Как делать правильно?

Используем модуль enum
from enum import StrEnum

class TaskStatus(StrEnum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"

@dataclass
class Task:
status: TaskStatus
...

def create_pending_task(data: dict) -> Task:
task = Task(**data)
task.status = TaskStatus.PENDING
return task


Почему это лучше:
▫️Теперь это не строка а объект
▫️ IDE сможет подсказать какие статусы существуют, вам не нужно лезть в документацию или базу
▫️ Единый источник истины. Изменяем в одном месте вместо поиска на всему проекту
▫️ Типизация - наше всё, mypy умеет с этим работать
▫️ Читаемость кода повышается. Ведь читаем мы его чаще чем пишем
▫️ Автоматическая валидация допустимых значений в моделях Pydantic

#tricks
👍145👎2
Почему в прошлом посте я использовал StrEnum а не Enum?
Всё просто, дефолтный Enum не поддерживает нативное сравнение с нужным нам типом.
from enum import Enum

class DefaultEnum(Enum):
KEY = "value"

"value" == DefaultEnum.KEY # False
"value" == DefaultEnum.KEY.value # True

Как видите, приходится вызывать .value, что неудобно в некоторых случаях и более многословно. StrEnum это исправляет:
from enum import StrEnum

class StringEnum(StrEnum):
KEY = "value"

"value" == StringEnum.KEY # True

Для примера из прошлого поста это выглядело бы так:
if task.status == TaskStatus.PENDING:
...

Точно так же работает и IntEnum.

StrEnum появился в версии 3.11, для более ранних использовали комбинацию MyEnum(str, Enum), что не тоже самое.
StrEnum правильно создает значения с функцией auto(). Сочетание str+Enum создает числа, но в виде строк. Приходится явно писать строки. Сделал пару примеров для сравнения↗️

Когда не стоит использовать StrEnum:
- когда нужно явное отличие значений энума от строки
- когда в проекте уже используется обычный Enum

#tricks
👍9
Еще одно применение пайпов - в контексте с Enum.
Но для этого нужен специальный Enum основанный на типе Flag.
В связке с auto он генерирует битовые маски, которые впоследствии можно использовать с оператором |

from enum import Flag, auto

class Perm(Flag):
READ = auto() # 1 (0001)
WRITE = auto() # 2 (0010)
EXECUTE = auto() # 4 (0100)
DELETE = auto() # 8 (1000)


Теперь мы можем комбинировать их через пайп
admin_perms = Perm.READ | Perm.WRITE | Perm.EXECUTE
user_perms = Perm.READ | Perm.EXECUTE
print(admin_perms)
# <Perm.READ|WRITE|EXECUTE: 7>


Можно делать проверки через in (возвращает bool)
if Perm.READ in admin_perms:
print("Success!")


Либо через & (возвращает совпадение либо 0)
print(Perm.READ & admin_perms)
# <Perm.READ: 1>
print(Perm.WRITE & user_perms)
# <Perm: 0>


Оператор ~ инвертирует все флаги
print(~admin_perms)
#<Perm.DELETE: 8>


Можно заранее создать комбинацию.
class Perm(Flag):
READ = auto() # 1 (0001)
WRITE = auto() # 2 (0010)
EXECUTE = auto() # 4 (0100)
DELETE = auto() # 8 (1000)
RW = READ | WRITE

mode = Perm.READ
print(mode & Perm.RW)
# <Perm.READ: 1> (True)
print(mode & Perm.EXECUTE)
# <Perm: 0> (False)


Flag более изолирован. Он не равен числу напрямую, что защищает от случайных ошибок в логике.

#tricks
7👍5👎1
Если запустить REPL с модулем asyncio, то вы входите в особый асинхронный REPL.

user@host:~$ python -m asyncio
asyncio REPL 3.12.7 ...
Use "await" directly instead of "asyncio.run()".
>>> import asyncio
>>>


В этом режиме
- создаётся и настраивается event loop
- уже импортирован asyncio
- работает await на верхнем уровне

То есть такая команда сработает без ошибок!
await asyncio.sleep(3)


Удобно для тестирования асинхронных функций без создания ивентлупов и остальной обвязки.
Работает в: 3.8+


#tricks #async
🔥14😁2👏1
Теперь аналогичная история с тредами. Для тредов используется объект threading.local.
Он позволяет создать локальный динамический атрибут (да, вот так костыльно) для треда.

Вот базовый пример:

import threading
import time
import random

# глобальная переменная
thread_data = threading.local()

def execute():
# поулчаем локальное значение для текущего треда
current_user_id = getattr(thread_data, "user_id", -1)
print(f"Log {threading.current_thread().name}: {current_user_id}")

def thread_task(user_id):
# устанавливаем значение для текущего треда
time.sleep(random.random())
thread_data.user_id = user_id
print(f"Create {threading.current_thread().name} == {user_id}")
execute()

threads = [
threading.Thread(
target=thread_task,
args=(i,),
name=f"Thread-{i}")
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()

Вывод должен быть аналогичным, с соотетстивем номера треда и id юзера.

Есть еще один пример здесь


#tricks
2👍1
Мы рассмотрели два способа управления конеткстом переменных. Если вам показалось, что это выглядит излишне и можно было бы оставить один, то вам не показалось.
Способ с threading.local придуман для разделения переменных между потоками. CоntextVar был добавлен как новый метод для асинхронного кода, но оказался настолько универсальным, что его можно использовать и с потоками.
После появления ContextVar в PEP567 его рекомендовано использовать вместо threading.local.
И даже был сделан бекпорт для версия ниже 3.7.1.

Теперь, если совместить ContextVar и Proxy-класс из прошлого примера то получим такой класс↗️.

Но у этого класса есть две проблемы:

1️⃣ Нигде не вызывается reset для сброса переменной, что может приводить проблемам

- утечка памяти
- "грязный" конеткст при переиспользовании потоков
- невозможность вернуться к дефолту

Решим это с помощью конектстного менеджера:
@contextlib.contextmanager
def configure_context(self, *args, **kwargs):
"""Синхронный контекстный менеджер (для `with`)"""
tok_cfg = self._cv_config.set((args, kwargs))
tok_obj = self._cv_object.set(None)
try:
yield self
finally:
self._cv_object.reset(tok_obj)
self._cv_config.reset(tok_cfg)

@contextlib.asynccontextmanager
async def aconfigure_context(self, *args, **kwargs):
"""Асинхронный контекстный менеджер (для `async with`)"""
tok_cfg = self._cv_config.set((args, kwargs))
tok_obj = self._cv_object.set(None)
try:
yield self
finally:
self._cv_object.reset(tok_obj)
self._cv_config.reset(tok_cfg)


Пример использования:
with proxy.configure_context(val1, val2):
proxy.do_something()


Теперь прокси готов, но...

2️⃣ В асинхронном коде, для которого и придуманы ContextVar, созданием корутин занимается Event Loop, именно он отвечает за наследование контекста дочерними корутинами. В случае с потоками ничего такого нет, мы сами себе "эвентлуп", поэтому приходится прописывать копирование конеткста самстоятельно.

Пример проблемы с отсутствием наследованием конеткста в потоках↗️

Для решения есть функция копирования текущего контекста и метод запуска функции с новым конектстом:
сontextvars.copy_context().run(func, *args, **kwargs)


Здесь сложно придумать универсальное автоматическое копирование контекста, самая простая функция будет выглядеть так:
def run_in_thread_with_context(
func: Callable, *args, **kwargs
) -> threading.Thread:
ctx = contextvars.copy_context()
t = threading.Thread(
target=lambda: ctx.run(func, *args, **kwargs)
)
t.start()
return t


И если вернуться к нашему синхронному ApiClient, то придётся следить за конектстом самостоятельно. И если где-то в коде библиотеки уже есть вызов тредов, то это работать не будет, придется переписывать.

threading.local тоже не наследует конеткст.


Полный пример Proxy с CоntextVar↗️

Пример использования:
client = ContextVarProxy(ApiClient)

def worker_in_thread(token):
with client.configure_context(token=token):
use_client(...)


Еще вариант, это кастомные ThreadExecutor и Thread с поддержкой автокопирования контекста. Забираем здесь↗️

И нет, это не пример как надо делать в проде) Это просто эксперемент для понимания процесса.

#tricks
3👍1
Как-то давно писал трансфер файлов по сети.
В этом проекте требовалось создавать файл, который сразу существует на диске, имеет нужный размер но еще не содержит данных.
Вот примеры как создать такой файл:

length = 1024 * 1024 * 1024 * 100
with open(file_path, "wb") as out:
out.seek(length-1)
out.write(b"\0")

with open(file_path, "wb") as out:
out.truncate(1024 * 1024 * 1024 * 120)

truncate -s 100M test


Файл создается моментально и получается полностью состоящий из нулей. Более того, он не занимает место над диске!
Это называется sparse files - разреженные файлы. На таких файловых системах как ext4, XFS, Btrfs, ZFS файл автоматически становится разреженным если процесс пишет за пределы конца файла. В структуре файла создаются "дырки" которые автоматически при чтении вернут нули.

Если запустить тоже самое на Windows, то результат будет другой. Файл будет создаваться долго и реально займет место на диске.

NTFS умеет создавать разреженные файлы, но это надо активировать явно:

import os
import msvcrt
import ctypes

file_path = r"C:\file"
length = 1024 * 1024 * 1024 * 100 # 100 GB

with open(file_path, "wb") as f:
handle = msvcrt.get_osfhandle(f.fileno())
FSCTL_SET_SPARSE = 0x900C4
bytes_returned = ctypes.c_ulong()
ctypes.windll.kernel32.DeviceIoControl(
handle, FSCTL_SET_SPARSE, None, 0, None, 0,
ctypes.byref(bytes_returned), None
)
f.seek(length-1)
f.write(b"\0")


Таким образом мы делаем преалокацию файла с возможностью писать в любое место, например так работают торренты.
В моем случае было многопоточное скачивание разных кусков файлов с возможностью докачки.

При копировании таких файлов чаще всего копия занимает всё положенное ей место.
Чтобы учитывать такое свойство файла нужно использовать специальные опции

shutil.copyfile(src, dst, follow_symlinks=False)

rsync -S ...

robocopy /SPARSE ...


Для тестирования трансфера требовалось создавать реальные файлы с рандомными данными. Сделать это просто:

import os
with open(file_path, "wb") as out:
for _ in range(1024):
out.write(os.urandom(1024*1024*10))


dd if=/dev/urandom of=file.bin bs=1M count=10


Тут, конечно, никаких разреженных файлов быть не может.

#tricks
🔥83👍2