«Отнаследовать» функцию от существующей
Некоторые справедливо заметили, что если формат исходной строки заранее известен, то отсортировать список можно через стандартную
Чтобы добавить семантичности и не таскать везде дополнительный параметр
Есть и более лакончиный способ сделать это — через
Таким образом,
* Строго говоря, не функцию, а вызываемый объект, у которого определен дандер
#stdlib
Некоторые справедливо заметили, что если формат исходной строки заранее известен, то отсортировать список можно через стандартную
sorted()
:data = [
"4 - Дуглас",
"2 - Клер",
"11 - Зоя",
"1 - Френк",
"31 - Питер",
]
def _key(src):
parts = src.partition(" - ")
return int(parts[0])
>>> sorted(data, key=_key)
['1 - Френк', '2 - Клер', '4 - Дуглас', '11 - Зоя', '31 - Питер']
Чтобы добавить семантичности и не таскать везде дополнительный параметр
key
, можно создать собственную функцию на основе sorted()
:def natsorted(iterable, reverse=False):
return sorted(iterable, key=_key, reverse=reverse)
>>> natsorted(data)
['1 - Френк', '2 - Клер', '4 - Дуглас', '11 - Зоя', '31 - Питер']
Есть и более лакончиный способ сделать это — через
functools.partial()
:import functools
natsorted = functools.partial(sorted, key=_key)
partial()
создает новую функцию* на основе существующей. При этом можно «зафиксировать» один или несколько параметров (мы зафиксировали key
), разрешив менять остальные (iterable
и reverse
в нашем случае).Таким образом,
partial()
помогает создавать узкоспециализированные функции на базе более универсальных.* Строго говоря, не функцию, а вызываемый объект, у которого определен дандер
__call__
— его можно вызывать, как будто это функция.#stdlib
Поэлементно сравнить коллекции
Однажды мы уже смотрели, как множества помогают быстро проверить, входит ли элемент в коллекцию.
Конечно, это не единственная возможность. Множества в питоне идеально подходят, чтобы поэлементно сравнивать коллекции.
Допустим, мы ведем учет посетителей:
И хотим узнать, кто приходил в январе и феврале. Нет ни малейшего желания писать вложенный цикл с перебором
Были в январе и феврале:
В январе или марте:
В феврале, но не в марте:
В январе или феврале, но не в оба месяца:
Все эти операции выполняются за линейное время
Кроме обычных множеств бывают замороженные (их нельзя менять):
Множество можно слепить из любого iterable-типа. Например, из строки:
Или даже из диапазона:
В общем, полезная штука.
#stdlib
Однажды мы уже смотрели, как множества помогают быстро проверить, входит ли элемент в коллекцию.
Конечно, это не единственная возможность. Множества в питоне идеально подходят, чтобы поэлементно сравнивать коллекции.
Допустим, мы ведем учет посетителей:
jan = ["Питер", "Клер", "Френк"]
feb = ["Френк", "Зоя", "Дуглас"]
mar = ["Клер", "Питер", "Зоя"]
И хотим узнать, кто приходил в январе и феврале. Нет ни малейшего желания писать вложенный цикл с перебором
jan
и feb
. Намного приятнее (и быстрее) использовать множества.jan = {"Питер", "Клер", "Френк"}
feb = {"Френк", "Зоя", "Дуглас"}
mar = {"Клер", "Питер", "Зоя"}
Были в январе и феврале:
>>> jan & feb
{'Френк'}
В январе или марте:
>>> jan | mar
{'Питер', 'Клер', 'Зоя', 'Френк'}
В феврале, но не в марте:
>>> feb - mar
{'Френк', 'Дуглас'}
В январе или феврале, но не в оба месяца:
>>> jan ^ feb
{'Питер', 'Клер', 'Зоя', 'Дуглас'}
Все эти операции выполняются за линейное время
O(n)
вместо квадратичного O(n²)
, как было бы на списках.Кроме обычных множеств бывают замороженные (их нельзя менять):
>>> visitors = frozenset().union(jan, feb, mar)
>>> visitors
frozenset({'Питер', 'Клер', 'Зоя', 'Френк', 'Дуглас'})
Множество можно слепить из любого iterable-типа. Например, из строки:
>>> frozenset('abcde')
frozenset({'b', 'd', 'e', 'c', 'a'})
Или даже из диапазона:
>>> set(range(1, 10))
{1, 2, 3, 4, 5, 6, 7, 8, 9}
В общем, полезная штука.
#stdlib
День списков
Давайте проведем тематический день! (если честно, то несколько)
Посвятим его структуре данных номер один в мире — массивам. Если вы еще не гуру алгоритмов и структур данных — гарантирую, что лучше поймете списки в питоне, их преимущества и ограничения. А если и так все знаете — освежите ключевые моменты ツ
Все знают, как работать со списком в питоне:
Наверняка вы знаете, что выборка элемента по индексу —
А знаете, за счет чего так быстро работает? Как внутри устроено? Опрос следует.
#stdlib
Давайте проведем тематический день! (если честно, то несколько)
Посвятим его структуре данных номер один в мире — массивам. Если вы еще не гуру алгоритмов и структур данных — гарантирую, что лучше поймете списки в питоне, их преимущества и ограничения. А если и так все знаете — освежите ключевые моменты ツ
Все знают, как работать со списком в питоне:
>>> guests = ["Френк", "Клер", "Зоя"]
>>> guests[1]
'Клер'
Наверняка вы знаете, что выборка элемента по индексу —
guests[idx]
— отработает очень быстро даже на списке из миллиона элементов. Более точно, выборка по индексу работает за константное время O(1)
— то есть не зависит от количества элементов в списке.А знаете, за счет чего так быстро работает? Как внутри устроено? Опрос следует.
#stdlib
Список = массив?
В основе питонячего списка лежит массив. Массив — это набор элементов (1) одинакового размера и (2) расположенных в памяти подряд друг за другом, без пропусков.
Раз элементы одинаковые и идут подряд, получить элемент массива по индексу несложно — достаточно знать адрес самого первого элемента («головы» массива).
Допустим, голова находится по адресу
Поскольку операция «получить значение по адресу» выполняется за константное время, то и выборка из массива по индексу выполняется за
Грубо говоря, питонячий список именно так и устроен. Он хранит указатель на голову массива и количество элементов в массиве. Количество хранится отдельно, чтобы функция
Все хорошо, но есть пара проблем:
— все элементы массива одного размера, а список умеет хранить разные (true/false, числа, строки разной длины);
— массив имеет фиксированную длину, а в список можно добавить сколько угодно элементов.
Чуть позже посмотрим, как их решить.
#stdlib
В основе питонячего списка лежит массив. Массив — это набор элементов (1) одинакового размера и (2) расположенных в памяти подряд друг за другом, без пропусков.
Раз элементы одинаковые и идут подряд, получить элемент массива по индексу несложно — достаточно знать адрес самого первого элемента («головы» массива).
Допустим, голова находится по адресу
0x00001234
, а каждый элемент занимает 8 байт. Тогда элемент с индексом idx находится по адресу 0x00001234 + idx*8
(картинка прилагается).Поскольку операция «получить значение по адресу» выполняется за константное время, то и выборка из массива по индексу выполняется за
O(1)
.Грубо говоря, питонячий список именно так и устроен. Он хранит указатель на голову массива и количество элементов в массиве. Количество хранится отдельно, чтобы функция
len()
тоже отрабатывала за O(1)
, а не считала каждый раз фактическое количество элементов списка.Все хорошо, но есть пара проблем:
— все элементы массива одного размера, а список умеет хранить разные (true/false, числа, строки разной длины);
— массив имеет фиксированную длину, а в список можно добавить сколько угодно элементов.
Чуть позже посмотрим, как их решить.
#stdlib
Ну очень примитивный список
Лучший способ освоить структуру данных — реализовать ее с нуля. К сожалению, питон плохо подходит для таких низкоуровненых структур как массив, потому что не дает явно работать с указателями (адресами в памяти).
Но кое-что можно сделать:
Наш самописный список имеет фиксированную вместимость (
Модуль
Завтра продолжим ツ
#stdlib
Лучший способ освоить структуру данных — реализовать ее с нуля. К сожалению, питон плохо подходит для таких низкоуровненых структур как массив, потому что не дает явно работать с указателями (адресами в памяти).
Но кое-что можно сделать:
class OhMyList:
def __init__(self):
self.length = 0
self.capacity = 8
self.array = (self.capacity * ctypes.py_object)()
def append(self, item):
self.array[self.length] = item
self.length += 1
def __len__(self):
return self.length
def __getitem__(self, idx):
return self.array[idx]
Наш самописный список имеет фиксированную вместимость (
capacity
= 8 элементов) и хранит элементы в массиве array
.Модуль
ctypes
дает доступ к сишным структурам, на которых построена стандартная библиотека. В даннам случае мы используем его, чтобы создать массив размером в capacity
элементов.Завтра продолжим ツ
#stdlib
Список = массив указателей
Итак, список моментально выбирает элемент по индексу, потому что внутри у него массив. А массив такой быстрый, потому что все элементы у него одинакового размера.
Но при этом в списке элементы могут быть очень разные:
Чтобы решить эту задачку, придумали хранить в массиве не сами значения, а указатели на них. Элемент массива — адрес в памяти, а если обратиться по адресу — получишь настоящее значение (картинка прилагается).
Поскольку указатели фиксированного размера (8 байт на современных 64-битных процессорах), то все прекрасно работает. Да, получается, что вместо одной операции (получить значение из элемента массива) мы делаем две:
1. Получить адрес из элемента массива.
2. Получить значение по адресу.
Но это все еще константное время O(1).
#stdlib
Итак, список моментально выбирает элемент по индексу, потому что внутри у него массив. А массив такой быстрый, потому что все элементы у него одинакового размера.
Но при этом в списке элементы могут быть очень разные:
guests = ["Френк", "Клер", "Зоя", True, 42]
Чтобы решить эту задачку, придумали хранить в массиве не сами значения, а указатели на них. Элемент массива — адрес в памяти, а если обратиться по адресу — получишь настоящее значение (картинка прилагается).
Поскольку указатели фиксированного размера (8 байт на современных 64-битных процессорах), то все прекрасно работает. Да, получается, что вместо одной операции (получить значение из элемента массива) мы делаем две:
1. Получить адрес из элемента массива.
2. Получить значение по адресу.
Но это все еще константное время O(1).
#stdlib
Список = динамический массив
Если в массиве под списком остались свободные места, то метод
Но что делать, если массив уже заполнен?
Приходится выделять память под новый массив, побольше, и копировать все элементы старого массива в новый (картинка прилагается).
Примерно так:
Если удалить из списка больше половины элементов через
Таким образом, список все время жонглирует массивами, чтобы это не приходилось делать нам ツ
#stdlib
Если в массиве под списком остались свободные места, то метод
.append(item)
выполнится за константное время — достаточно записать новое значение в свободную ячейку и увеличить счетчик элементов на 1:def append(self, item):
self.array[self.length] = item
self.length += 1
Но что делать, если массив уже заполнен?
Приходится выделять память под новый массив, побольше, и копировать все элементы старого массива в новый (картинка прилагается).
Примерно так:
def append(self, item):
if self.length == self.capacity:
self._resize(self.capacity*2)
self.array[self.length] = item
self.length += 1
def _resize(self, new_cap):
new_arr = (new_cap * ctypes.py_object)()
for idx in range(self.length):
new_arr[idx] = self.array[idx]
self.array = new_arr
self.capacity = new_cap
_resize()
— затратная операция, так что новый массив создают с запасом. В примере выше новый массив в два раза больше старого, а в питоне используют более скромный коэффициент — примерно 1.12.Если удалить из списка больше половины элементов через
.pop()
, то питон его скукожит — выделит новый массив поменьше и перенесет элементы в него.Таким образом, список все время жонглирует массивами, чтобы это не приходилось делать нам ツ
#stdlib
Добавление элемента в конец списка
Выборка из списка по индексу работает за O(1) — с этим разобрались. Метод
Оценивать отдельную операцию вставки было бы неправильно — как мы выяснили, она иногда выполняется за O(1), а иногда и за O(n). Поэтому используют амортизационный анализ — оценивают общее время, которое займет последовательность из K операций, затем делят его на K и получают амортизированное время одной операции.
Так вот. Не вдаваясь в подробности скажу, что амортизированное время для
Итого, у списка есть такие гарантированно быстрые операции:
P.S. Если интересно, как именно получается амортизированное O(1) — подробности в комментариях.
#stdlib
Выборка из списка по индексу работает за O(1) — с этим разобрались. Метод
.append(item)
тоже отрабатывает за O(1), пока не приходится расширять массив под списком. Но любое расширение массива — это операция O(n). Так за сколько же в итоге отрабатывает append()
?Оценивать отдельную операцию вставки было бы неправильно — как мы выяснили, она иногда выполняется за O(1), а иногда и за O(n). Поэтому используют амортизационный анализ — оценивают общее время, которое займет последовательность из K операций, затем делят его на K и получают амортизированное время одной операции.
Так вот. Не вдаваясь в подробности скажу, что амортизированное время для
.append(item)
получается константным — O(1). Так что вставка в конец списка работает очень быстро.Итого, у списка есть такие гарантированно быстрые операции:
# O(1)
lst[idx]
# O(1)
len(lst)
# амортизированное O(1)
lst.append(item)
lst.pop()
P.S. Если интересно, как именно получается амортизированное O(1) — подробности в комментариях.
#stdlib
Список: итоги
Как мы выяснили, у списка работают за O(1):
— выборка по индексу
— запрос длины
— добавление элемента в конец списка
— удаление элемента из конца списка
Остальные операции — «медленные».
Вставка и удаление из произвольной позиции —
Поиск и удаление элемента по значению —
Выборка среза из k элементов —
Значит ли это, что «медленные» операции нельзя использовать? Конечно, нет. Если у вас список из 1000 элементов, разница между O(1) и O(n) для единичной операции незаметна.
С другой стороны, если вы миллион раз выполняете «медленную» операцию на списке из 1000 элементов — это уже заметно. Или если список из миллиона элементов — тоже.
Поэтому полезно знать, что у списка работает за константное время, а что за линейное — чтобы осознанно принимать решение в конкретной ситуации.
#stdlib
Как мы выяснили, у списка работают за O(1):
— выборка по индексу
lst[idx]
— запрос длины
len(lst)
— добавление элемента в конец списка
.append(item)
— удаление элемента из конца списка
.pop()
Остальные операции — «медленные».
Вставка и удаление из произвольной позиции —
.insert(idx, item)
и .pop(idx)
— работают за линейное время O(n), потому что сдвигают все элементы после целевого.Поиск и удаление элемента по значению —
item in lst
, .index(item)
и .remove(item)
— работают за линейное время O(n), потому что перебирают все элементы.Выборка среза из k элементов —
lst[from:to]
— работает за O(k).Значит ли это, что «медленные» операции нельзя использовать? Конечно, нет. Если у вас список из 1000 элементов, разница между O(1) и O(n) для единичной операции незаметна.
С другой стороны, если вы миллион раз выполняете «медленную» операцию на списке из 1000 элементов — это уже заметно. Или если список из миллиона элементов — тоже.
Поэтому полезно знать, что у списка работает за константное время, а что за линейное — чтобы осознанно принимать решение в конкретной ситуации.
#stdlib
Закешировать результат «дорогих» вычислений
Предположим, написали вы функцию, которая возвращает емейл пользователя:
Одна беда: функция
Если 100 раз вызвать
Вроде ничего сложного (не считая вопроса устаревания кеша, но об этом в другой раз). Но представьте, что медленных функций много, и в каждую придется приделывать такую штуку. Не слишком вдохновляет.
К счастью, в модуле
Теперь повторные вызовы
Работает кеш внутри процесса, и погибнет вместе с ним. Так что если нужно что-то более масштабируемое — посмотрите на Redis или аналоги.
документация • песочница
#stdlib
Предположим, написали вы функцию, которая возвращает емейл пользователя:
def get_user_email(user_id):
user = find_by_id(user_id)
return user["email"]
Одна беда: функция
find_by_id()
лезет в уж-ж-жасно медленную легаси-систему:def find_by_id(user_id):
# представьте здесь медленный запрос по сети,
# который возвращает пользователя
time.sleep(1)
return { "email": "..." }
Если 100 раз вызвать
get_user_email(42)
— будет 100 медленных запросов. Хотя по уму хватило бы и одного. Что ж, давайте приделаем простенький кеш:cache = {}
def get_user_email(user_id):
if user_id not in cache:
user = find_by_id(user_id)
cache[user_id] = user["email"]
return cache[user_id]
Вроде ничего сложного (не считая вопроса устаревания кеша, но об этом в другой раз). Но представьте, что медленных функций много, и в каждую придется приделывать такую штуку. Не слишком вдохновляет.
К счастью, в модуле
functools
есть декоратор @lru_cache
. Он-то нам и пригодится. Добавляем одну строчку к исходной функции, и готово:@functools.lru_cache(maxsize=256)
def get_user_email(user_id):
user = find_by_id(user_id)
return user["email"]
Теперь повторные вызовы
get_user_email()
с одним и тем же user_id
вернут результат из кеша, не запрашивая find_by_id()
.lru_cache
прекрасен еще тем, что автоматически вытесняет старые записи из кеша, когда их становится больше maxsize
. Так что всю память не съест.Работает кеш внутри процесса, и погибнет вместе с ним. Так что если нужно что-то более масштабируемое — посмотрите на Redis или аналоги.
документация • песочница
#stdlib
Кортеж здорового человека
Я нежно люблю именованный кортеж (
Неплохо. Только вот объявление класса через
К счастью, в модуле
Здорово, а?
документация • песочница
#stdlib
Я нежно люблю именованный кортеж (
namedtuple
), даже цикл заметок о нем написал. Он компактный, неизменяемый и полностью совместим с обычными tuple
и list
. Идеально подходит для представления данных из файла или базы:from collections import namedtuple
Pet = namedtuple("Pet", ("type", "name", "age"))
frank = Pet("pigeon", "Френк", 3)
print(frank)
# Pet(type='pigeon', name='Френк', age=3)
print(list(frank))
# ['pigeon', 'Френк', 3]
values = ('pigeon', 'Френк', 3)
frank = Pet(*values)
print(frank)
# Pet(type='pigeon', name='Френк', age=3)
Неплохо. Только вот объявление класса через
namedtuple(...)
и выглядит не очень, и совсем не сочетается с современной системой типов в питоне.К счастью, в модуле
typing
есть класс NamedTuple
, который позволяет нормально работать с именованными кортежами, словно с какими-нибудь дата-классами:from typing import NamedTuple, Optional
class Pet(NamedTuple):
type: str
name: Optional[str]
age: int = 1
frank = Pet("pigeon", "Френк", 3)
print(frank)
# Pet(type='pigeon', name='Френк', age=3)
Здорово, а?
документация • песочница
#stdlib
Считаем котов: async
Наш алгоритм подсчета котов прост и линеен. Одна беда — работает медленно:
Логично: в функции
Кластер явно способен обрабатывать больше одного видео за раз. Нагрузим его посильнее!
Логично было бы не ждать ответа, а сразу отправить следующий запрос, за ним еще один, и так далее. А ответы обрабатывать по мере того, как кластер их отдает. Для такого подхода идеально подходят инструменты из пакета
Делаем обработчик урлов асинхронным:
Создаем обработчик на каждый урл из файла:
Выполняем асинхронно, получаем результаты по готовности:
Остальной код не меняется. Смотрите, как хорошо:
Но есть у такого подхода небольшой недостаток, который в реальной жизни начисто разрушит идиллию. О нем в следующий раз.
песочница
#stdlib
Наш алгоритм подсчета котов прост и линеен. Одна беда — работает медленно:
Processed 10 videos, took 10.03 seconds
Processed 100 videos, took 100.28 seconds
... дальше не стал замерять, лень ждать
Логично: в функции
process()
мы отправлям запрос на кластер, секунду ждем ответа, отправляем следующий запрос, ждем еще секунду, и так далее.Кластер явно способен обрабатывать больше одного видео за раз. Нагрузим его посильнее!
Логично было бы не ждать ответа, а сразу отправить следующий запрос, за ним еще один, и так далее. А ответы обрабатывать по мере того, как кластер их отдает. Для такого подхода идеально подходят инструменты из пакета
asyncio
.Делаем обработчик урлов асинхронным:
async def process(url):
video_id = url.partition("=")[2]
cat_count = await count_cats(url)
return Video(video_id, url, cat_count)
Создаем обработчик на каждый урл из файла:
processors = [process(url) for url in reader("data/urls.txt")]
Выполняем асинхронно, получаем результаты по готовности:
for task in asyncio.as_completed(processors):
video = await task
w.send(video)
Остальной код не меняется. Смотрите, как хорошо:
Processed 10 videos, took 1.00 seconds
Processed 100 videos, took 1.01 seconds
Processed 1000 videos, took 1.03 seconds
Processed 10000 videos, took 1.25 seconds
Processed 100000 videos, took 4.63 seconds
Processed 1000000 videos, took 49.30 seconds
Но есть у такого подхода небольшой недостаток, который в реальной жизни начисто разрушит идиллию. О нем в следующий раз.
песочница
#stdlib
Считаем котов: async с лимитами
В прошлый раз мы асинхронно обработали 10 тысяч видео за одну секунду:
Ну разве не успех? В теории — да, на практике — не особо.
Во-первых, в списке
Во-вторых (и это намного хуже),
Скорее всего, большую часть запросов кластер отобьет с ошибкой типа «Too Many Requests», на чем все и закончится. Либо попытается переварить эти 10 тысяч видео разом и уйдет в 100% загрузку (а то и вовсе упадет), за что нам немедленно настучат по голове админы.
На практике никто не даст обработать прям все данные за один раз. Вместо этого нам скажут: «можете делать не более 16 одновременных запросов к кластеру». Но как реализовать это ограничение?
Придется сделать обертку, которая гарантирует, что одновременно запущены не более N задач:
Все, что изменилась — наша реализация
Конечно, обработка теперь занимает не одну секунду:
Но все еще в 16 раз быстрее, чем обрабатывать видео последовательно.
Проблему с размером списка
песочница
#stdlib
В прошлый раз мы асинхронно обработали 10 тысяч видео за одну секунду:
processors = [process(url) for url in reader("data/urls.txt")]
for task in asyncio.as_completed(processors):
video = await task
Processed 10000 videos, took 1.25 seconds
Ну разве не успех? В теории — да, на практике — не особо.
Во-первых, в списке
processors
лежит 10 тысяч корутин еще до того, как мы начали обработку. Конечно, 10 тысяч — не слишком много, но что делать, если их там будет 10 миллионов?Во-вторых (и это намного хуже),
asyncio.as_completed()
разом выстрелит 10 тысяч запросов к кластеру обработки видео.Скорее всего, большую часть запросов кластер отобьет с ошибкой типа «Too Many Requests», на чем все и закончится. Либо попытается переварить эти 10 тысяч видео разом и уйдет в 100% загрузку (а то и вовсе упадет), за что нам немедленно настучат по голове админы.
На практике никто не даст обработать прям все данные за один раз. Вместо этого нам скажут: «можете делать не более 16 одновременных запросов к кластеру». Но как реализовать это ограничение?
asyncio.as_completed()
так не умеет.Придется сделать обертку, которая гарантирует, что одновременно запущены не более N задач:
max_workers = 16
processors = [process(url) for url in reader("data/urls.txt")]
for task in as_completed(max_workers, processors):
video = await task
Все, что изменилась — наша реализация
as_completed()
вместо стандартной. Детали можете посмотреть в песочнице, но основная мысль — использовать семафор. Это объект, который ведет счетчик использованных ресурсов (запущенных задач в нашем случае), и не дает использовать более N ресурсов одновременно.Конечно, обработка теперь занимает не одну секунду:
Processed 10 videos, took 1.00 seconds
Processed 100 videos, took 6.62 seconds
Processed 1000 videos, took 59.91 seconds
Но все еще в 16 раз быстрее, чем обрабатывать видео последовательно.
Проблему с размером списка
processors
тоже можно решить, сделав постраничный итератор вместо обычного. Если интересно — напишу в комментариях, заметка и без того большая.песочница
#stdlib
Считаем котов: потоки
async — сравнительно новый «родной» для питона способ многозадачности. Вычисления выполняются в одном потоке, но можно насоздавать кучу легковесных задач. В каждый момент времени выполняется одна задача, а прочие ожидают I/O (ответа от кластера в нашем случае). Если I/O много, то работает весьма шустро.
Если по каким-то причинам async вам недоступен, можно воспользоваться старым проверенным способом — родными потоками операционной системы (threads).
Здесь мы приходим к задаче «читатель-обработчик-писатель», которая в разных вариациях постоянно встречается в реальной жизни. Читатель, обработчик и писатель — независимые акторы, каждый из которых делает свое дело. Между собой эти ребята общаются через очереди:
Получается конвейер из трех шагов. Реализуем их:
Запустим в отдельных потоках одного читателя, одного писателя и N обработчиков. И подружим их между собой с помощью очередей:
Реальный код сложнее, потому нужно отслеживать, когда все акторы закончат работу — иначе программа никогда не завершится. Это можно сделать простым «грязным» способом через таймауты или сложным, но точным — через примитивы синхронизации. В песочнице вариант с таймаутами.
Время обработки примерно такое же, как у варианта с async:
Замечу, что потоки в Python совсем не те, что в Java или C#. Из-за глобальной блокировки интерпретатора (GIL) в каждый момент времени может выполняться только один поток. Так что для I/O задач потоки подходят (как и async), а вот для CPU задач — совсем нет.
песочница
#stdlib
async — сравнительно новый «родной» для питона способ многозадачности. Вычисления выполняются в одном потоке, но можно насоздавать кучу легковесных задач. В каждый момент времени выполняется одна задача, а прочие ожидают I/O (ответа от кластера в нашем случае). Если I/O много, то работает весьма шустро.
Если по каким-то причинам async вам недоступен, можно воспользоваться старым проверенным способом — родными потоками операционной системы (threads).
Здесь мы приходим к задаче «читатель-обработчик-писатель», которая в разных вариациях постоянно встречается в реальной жизни. Читатель, обработчик и писатель — независимые акторы, каждый из которых делает свое дело. Между собой эти ребята общаются через очереди:
reader → in_queue → processor → out_queue → writer
Получается конвейер из трех шагов. Реализуем их:
def read_step(filename, in_queue):
for url in reader(filename):
in_queue.put(url)
def process_step(in_queue, out_queue):
while True:
url = in_queue.get()
video = process(url)
out_queue.put(video)
def write_step(out_queue):
while True:
video = out_queue.get(timeout=1)
w.send(video)
Запустим в отдельных потоках одного читателя, одного писателя и N обработчиков. И подружим их между собой с помощью очередей:
filename = "data/urls.txt"
in_queue = Queue(1024)
out_queue = Queue(1024)
max_processors = 16
with ThreadPoolExecutor(2 + max_processors) as executor:
executor.submit(read_step, filename, in_queue)
for _ in range(max_processors):
executor.submit(process_step, in_queue, out_queue)
executor.submit(write_step, out_queue)
Реальный код сложнее, потому нужно отслеживать, когда все акторы закончат работу — иначе программа никогда не завершится. Это можно сделать простым «грязным» способом через таймауты или сложным, но точным — через примитивы синхронизации. В песочнице вариант с таймаутами.
Время обработки примерно такое же, как у варианта с async:
Processed 10 videos, took 1.00 seconds
Processed 100 videos, took 6.63 seconds
Processed 1000 videos, took 60.04 seconds
Замечу, что потоки в Python совсем не те, что в Java или C#. Из-за глобальной блокировки интерпретатора (GIL) в каждый момент времени может выполняться только один поток. Так что для I/O задач потоки подходят (как и async), а вот для CPU задач — совсем нет.
песочница
#stdlib
Считаем котов: процессы
Многозадачность на async и потоках сработала, потому что наши акторы в основном ждут ответа от внешнего кластера. Python (в случае с async) или ОС (в случае с потоками) постоянно «жонглируют» обработчиками, а на CPU в каждый момент выполняется только один.
Но что делать, если обработчикам нужно 100% времени CPU? Например, если они что-то вычислают, выполняют алгоритмы или работают со структурами данных? И потоки, и async здесь бессильны — от многозадачности не останется и следа.
Единственный рабочий вариант с CPU-bound задачами — использовать честные процессы операционной системы. Благо на Python с процессами можно работать почти так же, как с потоками.
Наши читатель (
Время обработки:
Процессы — наиболее «тяжеловесная» конструкция из рассмотренных. Но только они и подходят для CPU-bound задач.
песочница
#stdlib
Многозадачность на async и потоках сработала, потому что наши акторы в основном ждут ответа от внешнего кластера. Python (в случае с async) или ОС (в случае с потоками) постоянно «жонглируют» обработчиками, а на CPU в каждый момент выполняется только один.
Но что делать, если обработчикам нужно 100% времени CPU? Например, если они что-то вычислают, выполняют алгоритмы или работают со структурами данных? И потоки, и async здесь бессильны — от многозадачности не останется и следа.
Единственный рабочий вариант с CPU-bound задачами — использовать честные процессы операционной системы. Благо на Python с процессами можно работать почти так же, как с потоками.
Наши читатель (
read_step
), писатель (write_step
) и обработчик (process_step
) остаются прежними. Меняем пул потоков на пул процессов — и готово:filename = "data/urls.txt"
manager = Manager()
in_queue = manager.Queue(1024)
out_queue = manager.Queue(1024)
max_processors = 16
with ProcessPoolExecutor(2 + max_processors) as executor:
executor.submit(read_step, filename, in_queue)
for _ in range(max_processors):
executor.submit(process_step, in_queue, out_queue)
executor.submit(write_step, out_queue)
Время обработки:
Processed 10 videos, took 1.36 seconds
Processed 100 videos, took 6.96 seconds
Processed 1000 videos, took 60.40 seconds
Процессы — наиболее «тяжеловесная» конструкция из рассмотренных. Но только они и подходят для CPU-bound задач.
песочница
#stdlib
Считаем котов: итоги
Некоторые выводы по многозадачности в питоне:
— Даже если программа не использует многозадачность, удобно явно выделить акторов: читателя, обработчика и писателя. С ними код проще понять, модифицировать и повторно использовать.
— Многозадачность через async — отличный легковесный и «родной» вариант для i/o-bound задач. Не забывайте только ограничивать количество одновременных задач.
— Многозадачность через потоки подойдет для i/o-bound задач в легаси-коде, где нет поддержки async. Либо если хотите в перспективе переключиться на процессы с минимальными доработками.
— Многозадачность через процессы подойдет для cpu-bound задач.
Как справедливо отметили в комментариях, async вполне можно комбинировать с процессами. Но об этом как-нибудь в другой раз.
#stdlib
Некоторые выводы по многозадачности в питоне:
— Даже если программа не использует многозадачность, удобно явно выделить акторов: читателя, обработчика и писателя. С ними код проще понять, модифицировать и повторно использовать.
— Многозадачность через async — отличный легковесный и «родной» вариант для i/o-bound задач. Не забывайте только ограничивать количество одновременных задач.
— Многозадачность через потоки подойдет для i/o-bound задач в легаси-коде, где нет поддержки async. Либо если хотите в перспективе переключиться на процессы с минимальными доработками.
— Многозадачность через процессы подойдет для cpu-bound задач.
Как справедливо отметили в комментариях, async вполне можно комбинировать с процессами. Но об этом как-нибудь в другой раз.
#stdlib
Постраничный итератор для быстрой пакетной обработки
Предположим, вы считаете статистику по огромному датасету игрушек, проданных по всей стране за прошлый год:
В результате оживленного диалога вам удается убедить разработчиков, что так не очень быстро. На свет появляется функция
Как бы теперь пройти по датасету пакетами по 10 тысяч записей? Тут и пригодится постраничный итератор!
Реализация рабочая, но есть проблемка. Такой постраничный обход заметно медленнее обычного итерирования по одной записи:
Если интересно, в чем причина и что можно сделать — читайте подробный разбор.
Постраничные итераторы отлично работают везде, где пакетная операция выполняется сильно быстрее набора одиночных. Рекомендую!
#stdlib
Предположим, вы считаете статистику по огромному датасету игрушек, проданных по всей стране за прошлый год:
reader = fetch_toys()
for item in reader:
process_single(item)
process_single()
занимает 10 мс, так что 400 млн игрушек обработаются за 46 дней 😱В результате оживленного диалога вам удается убедить разработчиков, что так не очень быстро. На свет появляется функция
process_batch()
, которая обрабатывает 10000 игрушек за 1 сек. Это уже 11 часов на все игрушки, что значительно приятнее.Как бы теперь пройти по датасету пакетами по 10 тысяч записей? Тут и пригодится постраничный итератор!
def paginate(iterable, page_size):
page = []
for item in iterable:
page.append(item)
if len(page) == page_size:
yield page
page = []
yield page
reader = fetch_toys()
page_size = 10_000
for page in paginate(reader, page_size):
process_batch(page)
Реализация рабочая, но есть проблемка. Такой постраничный обход заметно медленнее обычного итерирования по одной записи:
One-by-one (baseline): 161 ms
Fill page with append(): 227 ms
Если интересно, в чем причина и что можно сделать — читайте подробный разбор.
Постраничные итераторы отлично работают везде, где пакетная операция выполняется сильно быстрее набора одиночных. Рекомендую!
#stdlib
Компактные объекты
Питон — объектный язык. Это здорово и удобно, пока не придется создать 10 млн объектов в памяти, которые благополучно ее и съедят. Поговорим о том, как уменьшить аппетит.
Допустим, есть у вас простенький объект «питомец» с атрибутами «имя» (строка) и «стоимость» (целое). Интуитивно кажется, что самое компактное предоставление — в виде кортежа:
Замерим, сколько займет в памяти один такой красавчик:
161 байт. Будем использовать как основу для сравнения.
С чистыми кортежами, конечно, работать неудобно. Наверняка вы используете датакласс:
А что у него с размером?
Ого, какой толстенький!
Попробуем использовать именованный кортеж:
Теперь вы понимаете, за что я его так люблю. Удобный интерфейс как у датакласса — а вес как у кортежа. Идеально.
Или нет? В Python 3.10 приехали датаклассы со слотами:
Ого! Магия слотов создает специальные худощавые объекты, у которых внутри нет словаря, в отличие от обычных питонячих объектов. И такой датакласс ничуть не уступает кортежу.
Что делать, если 3.10 вам еще не завезли? Использовать
У слотовых объектов есть свои недостатки. Но они отлично подходят для простых случаев (без наследования и прочих наворотов).
P.S. Конечно, настоящий победитель — numpy-массив. Но с ним неинтересно соревноваться ツ
песочница
#stdlib
Питон — объектный язык. Это здорово и удобно, пока не придется создать 10 млн объектов в памяти, которые благополучно ее и съедят. Поговорим о том, как уменьшить аппетит.
Допустим, есть у вас простенький объект «питомец» с атрибутами «имя» (строка) и «стоимость» (целое). Интуитивно кажется, что самое компактное предоставление — в виде кортежа:
("Frank the Pigeon", 50000)
Замерим, сколько займет в памяти один такой красавчик:
def fields():
# генерит name длины 10
# и price до 100К
# ...
return (name, price)
n = 10_000
pets = [fields() for _ in range(n)]
size = round(asizeof(pets) / n)
print(f"Pet size (tuple) = {size} bytes")
Pet size (tuple) = 161 bytes
161 байт. Будем использовать как основу для сравнения.
С чистыми кортежами, конечно, работать неудобно. Наверняка вы используете датакласс:
@dataclass
class PetData:
name: str
price: int
А что у него с размером?
Pet size (dataclass) = 257 bytes
x1.60 to baseline
Ого, какой толстенький!
Попробуем использовать именованный кортеж:
class PetTuple(NamedTuple):
name: str
price: int
Pet size (named tuple) = 161 bytes
x1.00 to baseline
Теперь вы понимаете, за что я его так люблю. Удобный интерфейс как у датакласса — а вес как у кортежа. Идеально.
Или нет? В Python 3.10 приехали датаклассы со слотами:
@dataclass(slots=True)
class PetData:
name: str
price: int
Pet size (slots dataclass) = 153 bytes
x0.95 to baseline
Ого! Магия слотов создает специальные худощавые объекты, у которых внутри нет словаря, в отличие от обычных питонячих объектов. И такой датакласс ничуть не уступает кортежу.
Что делать, если 3.10 вам еще не завезли? Использовать
NamedTuple
. Или прописывать слоты вручную:@dataclass
class PetData:
__slots__ = ("name", "price")
name: str
price: int
У слотовых объектов есть свои недостатки. Но они отлично подходят для простых случаев (без наследования и прочих наворотов).
P.S. Конечно, настоящий победитель — numpy-массив. Но с ним неинтересно соревноваться ツ
песочница
#stdlib
Великий Рандом
Все знают про
Но модуль
Например, можно выбрать из диапазона с шагом:
Или случайный элемент последовательности:
А то и несколько элементов:
Можно еще и веса элементам назначить — чтобы одни выбирались чаще других:
Хотите выборку без повторений? Нет проблем:
Или можно всю последовательность перемешать:
И напоследок. Если используете случайные числа в тестах, всегда инициализируйте генератор константой, чтобы он давал воспроизводимые результаты:
А в продакшене, наоборот, вызывайте
#stdlib
Все знают про
random.randint(a, b)
, который возвращает случайное число в указанном диапазоне:random.randint(10, 99)
# 59
Но модуль
random
предоставляет намного больше возможностей. Так много, что одной заметкой и не охватишь.Например, можно выбрать из диапазона с шагом:
random.randrange(10, 99, 3)
# 91
Или случайный элемент последовательности:
numbers = [7, 9, 13, 42, 64, 99]
random.choice(numbers)
# 42
А то и несколько элементов:
numbers = range(99, 10, -1)
random.choices(numbers, k=3)
# [32, 62, 76]
Можно еще и веса элементам назначить — чтобы одни выбирались чаще других:
numbers = [7, 9, 13, 42, 64, 99]
weights = [10, 1, 1, 1, 1, 1]
random.choices(numbers, weights, k=3)
# [42, 13, 7]
random.choices(numbers, weights, k=3)
# [7, 7, 7]
random.choices(numbers, weights, k=3)
# [13, 7, 7]
Хотите выборку без повторений? Нет проблем:
numbers = [7, 9, 13, 42, 64, 99]
random.sample(numbers, k=3)
# [42, 99, 7]
Или можно всю последовательность перемешать:
numbers = [1, 2, 3, 4, 5]
random.shuffle(numbers)
# [3, 2, 1, 5, 4]
И напоследок. Если используете случайные числа в тестах, всегда инициализируйте генератор константой, чтобы он давал воспроизводимые результаты:
random.seed(42)
А в продакшене, наоборот, вызывайте
seed()
без параметров — так питон использует генератор шума операционной системы (или текущее время, если его нет).#stdlib
Многозначительное многоточие
Не знаю, заметили вы или нет в посте о протоколах. Не самая известная штука:
Это вполне рабочий код. В питоне
Авторы Python в основном используют
И в тайп-хинтах:
Ну а обычные разработчики... Кто во что горазд ツ
#stdlib
Не знаю, заметили вы или нет в посте о протоколах. Не самая известная штука:
class Flyer:
def fly(self):
...
Это вполне рабочий код. В питоне
...
(он же Ellipsis
) — это реальный объект, который можно использовать в коде.Ellipsis
— единственный экземпляр типа EllipsisType
(аналогично тому, как None
— единственный экземпляр типа NoneType
):>>> ... is Ellipsis
True
>>> Ellipsis is ...
True
Авторы Python в основном используют
...
, чтобы показать, что у типа, метода или функции отсутствует реализация — как в примере с fly()
.И в тайп-хинтах:
Tuple[str, ...]
Callable[..., str]
Ну а обычные разработчики... Кто во что горазд ツ
#stdlib