Oh My Py
2.63K subscribers
21 photos
55 links
Все о чистом коде на Python // antonz.ru
Download Telegram
Алгоритмические собеседования

Яндекс тут запустил бесплатный курс про алгоритмические собеседования. Это такая разновидность собеседования, на котором соискателя заставляют решать алгоритмические задачки на доске и на время.

Невозможно удержаться и не процитировать авторов курса:

В ряде крупных IT-компаний кандидаты на собеседованиях пишут код на доске или в онлайн-редакторе без возможности запустить код. В их число входит и Яндекс, и компании, которых собирательно обозначают FAANG.

Но в обычной жизни код на бумаге пишут редко, поэтому даже опытные разработчики бывают к этому не готовы и не показывают достаточно высокий результат.

К алгоритмическим собеседованиям нужно готовиться, поскольку это редко встречается в повседневной работе разработчика.

Другими словами, алгоритмическое собеседование — это отдельный специфический навык, которым необходимо овладеть только для того, чтобы пройти собеседование. На работе он вам не пригодится. Ге-ни-аль-но.

Алгоритмическое собеседование — это лучше, чем вообще не проверять навык человека работать с кодом. Но хуже любого другого способа:

— Заранее дать тестовое задание и на собеседовании обсудить решение («списанные» решения отлавливаются на раз).

— Попросить разобраться в готовом фрагменте кода (вот этот навык на работе пригождается каждый день).

— Наконец, дать неалгоритмическую задачку и разрешить запускать код.

Но нет. Давайте вместо нормальных собеседований организуем курс, где будем учить людей готовиться к бесполезным. А то еще чего доброго примем человека, который по памяти не сумеет алгоритм Дейкстры воспроизвести. Как с таким работать-то!
Закешировать результат «дорогих» вычислений

Предположим, написали вы функцию, которая возвращает емейл пользователя:

def get_user_email(user_id):
user = find_by_id(user_id)
return user["email"]


Одна беда: функция find_by_id() лезет в уж-ж-жасно медленную легаси-систему:

def find_by_id(user_id):
# представьте здесь медленный запрос по сети,
# который возвращает пользователя
time.sleep(1)
return { "email": "..." }


Если 100 раз вызвать get_user_email(42) — будет 100 медленных запросов. Хотя по уму хватило бы и одного. Что ж, давайте приделаем простенький кеш:

cache = {}

def get_user_email(user_id):
if user_id not in cache:
user = find_by_id(user_id)
cache[user_id] = user["email"]
return cache[user_id]


Вроде ничего сложного (не считая вопроса устаревания кеша, но об этом в другой раз). Но представьте, что медленных функций много, и в каждую придется приделывать такую штуку. Не слишком вдохновляет.

К счастью, в модуле functools есть декоратор @lru_cache. Он-то нам и пригодится. Добавляем одну строчку к исходной функции, и готово:

@functools.lru_cache(maxsize=256)
def get_user_email(user_id):
user = find_by_id(user_id)
return user["email"]


Теперь повторные вызовы get_user_email() с одним и тем же user_id вернут результат из кеша, не запрашивая find_by_id().

lru_cache прекрасен еще тем, что автоматически вытесняет старые записи из кеша, когда их становится больше maxsize. Так что всю память не съест.

Работает кеш внутри процесса, и погибнет вместе с ним. Так что если нужно что-то более масштабируемое — посмотрите на Redis или аналоги.

документацияпесочница

#stdlib
Кортеж здорового человека

Я нежно люблю именованный кортеж (namedtuple), даже цикл заметок о нем написал. Он компактный, неизменяемый и полностью совместим с обычными tuple и list. Идеально подходит для представления данных из файла или базы:

from collections import namedtuple

Pet = namedtuple("Pet", ("type", "name", "age"))

frank = Pet("pigeon", "Френк", 3)


print(frank)
# Pet(type='pigeon', name='Френк', age=3)

print(list(frank))
# ['pigeon', 'Френк', 3]

values = ('pigeon', 'Френк', 3)
frank = Pet(*values)
print(frank)
# Pet(type='pigeon', name='Френк', age=3)


Неплохо. Только вот объявление класса через namedtuple(...) и выглядит не очень, и совсем не сочетается с современной системой типов в питоне.

К счастью, в модуле typing есть класс NamedTuple, который позволяет нормально работать с именованными кортежами, словно с какими-нибудь дата-классами:

from typing import NamedTuple, Optional

class Pet(NamedTuple):
type: str
name: Optional[str]
age: int = 1

frank = Pet("pigeon", "Френк", 3)


print(frank)
# Pet(type='pigeon', name='Френк', age=3)


Здорово, а?

документацияпесочница

#stdlib
Считаем котов на ютубе

Предположим, у вас возникла острая потребность пересчитать всех котиков на ютубе. Добрые люди уже выгрузили список ссылок на ролики в файл urls.txt.

Вам осталось только проанализировать видео по каждой ссылке, обнаружить там шерстяную голову (или нет), и сохранить количество в выходной поток.

Подготовим функцию для каждого шага:

def reader(filename):
for line in open(filename):
# video url
yield line.strip()

def process(url):
video_id = url.partition("=")[2]
cat_count = count_cats(url)
return Video(video_id, url, cat_count)

def writer(out):
while True:
video = yield
line = f"{video.id},{video.cat_count}\n"
out.write(line)


И объединим в единый алгоритм:

w = writer(sys.stdout)
w.send(None)
for url in reader("data/urls.txt"):
video = process(url)
w.send(video)


Функция count_cats() вызывает огромный мощный кластер, который принимает на входе URL, обрабатывает видео и считает в нем котов. Обработка одного ролика всегда занимает 1 сек, но сам кластер при этом загружен на 0.01%.

К сожалению, наш код засылает урлы по одному, так что весь процесс работает довольно медленно.

Что же делать? Опрос следует.

песочница

#задачка
Считаем котов: async

Наш алгоритм подсчета котов прост и линеен. Одна беда — работает медленно:

Processed 10 videos, took 10.03 seconds
Processed 100 videos, took 100.28 seconds
... дальше не стал замерять, лень ждать


Логично: в функции process() мы отправлям запрос на кластер, секунду ждем ответа, отправляем следующий запрос, ждем еще секунду, и так далее.

Кластер явно способен обрабатывать больше одного видео за раз. Нагрузим его посильнее!

Логично было бы не ждать ответа, а сразу отправить следующий запрос, за ним еще один, и так далее. А ответы обрабатывать по мере того, как кластер их отдает. Для такого подхода идеально подходят инструменты из пакета asyncio.

Делаем обработчик урлов асинхронным:

async def process(url):
video_id = url.partition("=")[2]
cat_count = await count_cats(url)
return Video(video_id, url, cat_count)


Создаем обработчик на каждый урл из файла:

processors = [process(url) for url in reader("data/urls.txt")]


Выполняем асинхронно, получаем результаты по готовности:

for task in asyncio.as_completed(processors):
video = await task
w.send(video)


Остальной код не меняется. Смотрите, как хорошо:

Processed 10 videos, took 1.00 seconds
Processed 100 videos, took 1.01 seconds
Processed 1000 videos, took 1.03 seconds
Processed 10000 videos, took 1.25 seconds
Processed 100000 videos, took 4.63 seconds
Processed 1000000 videos, took 49.30 seconds


Но есть у такого подхода небольшой недостаток, который в реальной жизни начисто разрушит идиллию. О нем в следующий раз.

песочница

#stdlib
Считаем котов: async с лимитами

В прошлый раз мы асинхронно обработали 10 тысяч видео за одну секунду:

processors = [process(url) for url in reader("data/urls.txt")]
for task in asyncio.as_completed(processors):
video = await task


Processed 10000 videos, took 1.25 seconds


Ну разве не успех? В теории — да, на практике — не особо.

Во-первых, в списке processors лежит 10 тысяч корутин еще до того, как мы начали обработку. Конечно, 10 тысяч — не слишком много, но что делать, если их там будет 10 миллионов?

Во-вторых (и это намного хуже), asyncio.as_completed() разом выстрелит 10 тысяч запросов к кластеру обработки видео.

Скорее всего, большую часть запросов кластер отобьет с ошибкой типа «Too Many Requests», на чем все и закончится. Либо попытается переварить эти 10 тысяч видео разом и уйдет в 100% загрузку (а то и вовсе упадет), за что нам немедленно настучат по голове админы.

На практике никто не даст обработать прям все данные за один раз. Вместо этого нам скажут: «можете делать не более 16 одновременных запросов к кластеру». Но как реализовать это ограничение? asyncio.as_completed() так не умеет.

Придется сделать обертку, которая гарантирует, что одновременно запущены не более N задач:

max_workers = 16
processors = [process(url) for url in reader("data/urls.txt")]
for task in as_completed(max_workers, processors):
video = await task


Все, что изменилась — наша реализация as_completed() вместо стандартной. Детали можете посмотреть в песочнице, но основная мысль — использовать семафор. Это объект, который ведет счетчик использованных ресурсов (запущенных задач в нашем случае), и не дает использовать более N ресурсов одновременно.

Конечно, обработка теперь занимает не одну секунду:

Processed 10 videos, took 1.00 seconds
Processed 100 videos, took 6.62 seconds
Processed 1000 videos, took 59.91 seconds


Но все еще в 16 раз быстрее, чем обрабатывать видео последовательно.

Проблему с размером списка processors тоже можно решить, сделав постраничный итератор вместо обычного. Если интересно — напишу в комментариях, заметка и без того большая.

песочница

#stdlib
Считаем котов: потоки

async — сравнительно новый «родной» для питона способ многозадачности. Вычисления выполняются в одном потоке, но можно насоздавать кучу легковесных задач. В каждый момент времени выполняется одна задача, а прочие ожидают I/O (ответа от кластера в нашем случае). Если I/O много, то работает весьма шустро.

Если по каким-то причинам async вам недоступен, можно воспользоваться старым проверенным способом — родными потоками операционной системы (threads).

Здесь мы приходим к задаче «читатель-обработчик-писатель», которая в разных вариациях постоянно встречается в реальной жизни. Читатель, обработчик и писатель — независимые акторы, каждый из которых делает свое дело. Между собой эти ребята общаются через очереди:

reader → in_queue → processor → out_queue → writer


Получается конвейер из трех шагов. Реализуем их:

def read_step(filename, in_queue):
for url in reader(filename):
in_queue.put(url)

def process_step(in_queue, out_queue):
while True:
url = in_queue.get()
video = process(url)
out_queue.put(video)

def write_step(out_queue):
while True:
video = out_queue.get(timeout=1)
w.send(video)


Запустим в отдельных потоках одного читателя, одного писателя и N обработчиков. И подружим их между собой с помощью очередей:

filename = "data/urls.txt"
in_queue = Queue(1024)
out_queue = Queue(1024)
max_processors = 16

with ThreadPoolExecutor(2 + max_processors) as executor:
executor.submit(read_step, filename, in_queue)
for _ in range(max_processors):
executor.submit(process_step, in_queue, out_queue)
executor.submit(write_step, out_queue)


Реальный код сложнее, потому нужно отслеживать, когда все акторы закончат работу — иначе программа никогда не завершится. Это можно сделать простым «грязным» способом через таймауты или сложным, но точным — через примитивы синхронизации. В песочнице вариант с таймаутами.

Время обработки примерно такое же, как у варианта с async:

Processed 10 videos, took 1.00 seconds
Processed 100 videos, took 6.63 seconds
Processed 1000 videos, took 60.04 seconds


Замечу, что потоки в Python совсем не те, что в Java или C#. Из-за глобальной блокировки интерпретатора (GIL) в каждый момент времени может выполняться только один поток. Так что для I/O задач потоки подходят (как и async), а вот для CPU задач — совсем нет.

песочница

#stdlib
Да, кстати. Если по всей этой теме с многозадачностью что-то непонятно — можно уточнить в комментариях. Я отвечу там же или в отдельной заметке.

Если у вас лапки и совсем ничего не понятно, не переживайте — скоро переключимся на более лайтовые темы ツ
Считаем котов: процессы

Многозадачность на async и потоках сработала, потому что наши акторы в основном ждут ответа от внешнего кластера. Python (в случае с async) или ОС (в случае с потоками) постоянно «жонглируют» обработчиками, а на CPU в каждый момент выполняется только один.

Но что делать, если обработчикам нужно 100% времени CPU? Например, если они что-то вычислают, выполняют алгоритмы или работают со структурами данных? И потоки, и async здесь бессильны — от многозадачности не останется и следа.

Единственный рабочий вариант с CPU-bound задачами — использовать честные процессы операционной системы. Благо на Python с процессами можно работать почти так же, как с потоками.

Наши читатель (read_step), писатель (write_step) и обработчик (process_step) остаются прежними. Меняем пул потоков на пул процессов — и готово:

filename = "data/urls.txt"
manager = Manager()
in_queue = manager.Queue(1024)
out_queue = manager.Queue(1024)
max_processors = 16


with ProcessPoolExecutor(2 + max_processors) as executor:
executor.submit(read_step, filename, in_queue)
for _ in range(max_processors):
executor.submit(process_step, in_queue, out_queue)
executor.submit(write_step, out_queue)


Время обработки:

Processed 10 videos, took 1.36 seconds
Processed 100 videos, took 6.96 seconds
Processed 1000 videos, took 60.40 seconds


Процессы — наиболее «тяжеловесная» конструкция из рассмотренных. Но только они и подходят для CPU-bound задач.

песочница

#stdlib
Считаем котов: итоги

Некоторые выводы по многозадачности в питоне:

— Даже если программа не использует многозадачность, удобно явно выделить акторов: читателя, обработчика и писателя. С ними код проще понять, модифицировать и повторно использовать.

— Многозадачность через async — отличный легковесный и «родной» вариант для i/o-bound задач. Не забывайте только ограничивать количество одновременных задач.

— Многозадачность через потоки подойдет для i/o-bound задач в легаси-коде, где нет поддержки async. Либо если хотите в перспективе переключиться на процессы с минимальными доработками.

— Многозадачность через процессы подойдет для cpu-bound задач.

Как справедливо отметили в комментариях, async вполне можно комбинировать с процессами. Но об этом как-нибудь в другой раз.

#stdlib
Постраничный итератор для быстрой пакетной обработки

Предположим, вы считаете статистику по огромному датасету игрушек, проданных по всей стране за прошлый год:

reader = fetch_toys()
for item in reader:
process_single(item)


process_single() занимает 10 мс, так что 400 млн игрушек обработаются за 46 дней 😱

В результате оживленного диалога вам удается убедить разработчиков, что так не очень быстро. На свет появляется функция process_batch(), которая обрабатывает 10000 игрушек за 1 сек. Это уже 11 часов на все игрушки, что значительно приятнее.

Как бы теперь пройти по датасету пакетами по 10 тысяч записей? Тут и пригодится постраничный итератор!

def paginate(iterable, page_size):
page = []
for item in iterable:
page.append(item)
if len(page) == page_size:
yield page
page = []
yield page


reader = fetch_toys()
page_size = 10_000
for page in paginate(reader, page_size):
process_batch(page)


Реализация рабочая, но есть проблемка. Такой постраничный обход заметно медленнее обычного итерирования по одной записи:

One-by-one (baseline):   161 ms
Fill page with append(): 227 ms


Если интересно, в чем причина и что можно сделать — читайте подробный разбор.

Постраничные итераторы отлично работают везде, где пакетная операция выполняется сильно быстрее набора одиночных. Рекомендую!

#stdlib
Храним состояние в URL

Если вы разрабатываете веб-приложение, то рано или поздно столкнетесь с проблемой сохранения текущего состояния системы для пользователя.

Например, вы продаете через интернет элитный картофель. Покупатель заходит, настраивает фильтры поиска, видит список из 300 позиций.

Переходит на третью страницу, открывает карточку картофелины и случайно нажимает на рефреш страницы. Как поступит ваше приложение?

Я знаю такие варианты:

— Не хранить состояние вообще
— Хранить состояние локально
— Хранить набор URL-параметров
— Хранить сериализованное состояние
— Гибридные подходы

Вот особенности каждого

#код
Как передать состояние в URL-параметрах

На первый взгляд, передавать состояние в виде набора параметров URL легко и приятно.

Строку или число — элементарно:

{
"search": "potatoes",
"page": 5
}


→ ?search=potatoes&page=5


Логическое значение — не сильно сложнее:

{ "popular": true }


→ popular=true
popular=1


Дата и время обычно использует RFC 3339 (2020-01-02T10:11:12Z) или unix time (секунды после полуночи 01.01.1970), реже — миллисекунды:

{ "after": "2020-01-02" }


→ after=2020-01-02
after=1577923200


Неизвестное значение часто передают как пустое или не передают вовсе, реже используют маркерное значение:

{ "country": null }


→ country=null
country=
<empty>


Список сложнее. Классический вариант — повторять название свойства для каждого значения, как диктует RFC 6570.

Иногда добавляют [], чтобы подчеркнуть, что это список, бывает что и с индексами:

{
"country": ["bo", "za"]
}


→ country=bo&country=za
country[]=bo&country[]=za
country[0]=bo&country[1]=za


Фанаты краткости передают название свойства один раз, а значения разделяют запятыми.

→ country=bo,za


Словарь содержит пары ключ-значение. Чаще всего название основного свойства дублируют, а названия вложенных указывают в []. Реже используют точку.

{
"size": {
"min": 3,
"max": 7
}
}


→ size[min]=3&size[max]=7
size.min=3&size.max=7


Вложенность больше 2 уровней обычно не используют.

В целом, URL-параметры неплохо подходят для необходимого контекста. Они наглядные и позволяют передавать достаточно сложные структуры данных.

#код
Как передать состояние в URL-параметрах
Новости стандартной библиотеки

Когда выходит очередная версия Python, все внимание достается новым фичам языка: моржовому оператору, слиянию словарей, паттерн-матчингу. Еще много пишут об изменениях в асинхронной работе (модуль asyncio) и типизации (модуль typing) — эти модули на виду и бурно развиваются.

Остальным модулям стандартной библиотеки достается незаслуженно мало внимания. Хочу это исправить и рассказать, что интересного появилось в версиях 3.8–3.10.

Планировал небольшую заметку, но не преуспел: получилась здоровенная статья. Старался выбрать только самое интересное, но все равно в обзор попали аж 17 модулей. Питон, он такой 😁

Читайте на хабре:
https://habr.com/ru/post/665020/

P.S. Для каждого описанного новшества в статье есть ссылка на интерактивную песочницу (например, graphlib)! Спасибо Степику за нее ❤️