this :: IO Diary
42 subscribers
19 photos
3 videos
10 links
Download Telegram
Затроллен нейронкой😔
2
К слову об искусстве нейминга🙊
☠️ Starvation

Не страшно, когда приложение падает, а журнал логов превращается в километровый красный ковёр трейсбэка ошибки. Не страшно даже, когда приложение падает без ошибок. Действительно страшно то, когда приложение умерло, но все выглядит так, что оно продолжает работать. Такую ошибку я поймал сегодня, буквально 2 часа назад, прогоняя нагрузочный тест – сервис просто внезапно умолк. Супервизор живой, все процессы живые, а в логах царит гробовая тишина. Не сказать, что я спец в конкурентности – у меня есть небольшой опыт разработки на языке Go (полтора-два года, если память не изменяет) – но так вышло, что сейчас я работаю над небольшим высоконагруженным сервисом, полностью пропитанным мультипроцессингом и конкурентностью. Думаю, многие слышали о deadlock и livelock, но сегодня я буду рассказывать не об этом

Эта ошибка не была для меня сюрпризом – я знал, где она спряталась. Более того – я осознанно допустил её, чтобы быстрее добиться рабочего результата на низких нагрузках. Сейчас пришло время её исправлять, и я об этом расскажу. То самое «исправлю потом» наступило для меня сегодня😃

У меня есть пайплайн обработки событий:
- события приходят из источника;
- они группируются по stream_id;
- для каждого stream_id создаётся отдельный воркер;
- внутри одного стрима порядок обработки строго последовательный, между стримами – параллель.

На первый взгляд всё выглядит правильно: много воркеров, каждый занимается своим потоком данных, никакой блокировки на уровне бизнес-логики.

Но есть один нюанс.
Центральный процесс (Processor) хранил состояние всех очередей, а воркеры:
- брали события на обработку;
- после каждого шага синхронно спрашивали у Processor, что у них сейчас в очереди.

То есть десятки, сотни, тысячи воркеров постоянно делали синхронный запрос к одному процессу. Под небольшой нагрузкой система работала идеально. Под средней – начинала думать. Под большой нагрузкой во время стресс-теста – гробовая тишина. Процессы живые, supervisor живой, нет эксепшенов, ошибок в логах.
Это не deadlock в классическом смысле – никто не держит ресурсы навсегда. Это не livelock – процессы не крутятся в бесполезном цикле. Это starvation.

Центральный Processor захлёбывается от входящих сообщений:
- асинхронные сообщения о завершённой обработке;
- синхронные сообщения от воркеров на получение новых событий;
- служебные сообщения.

Я нарушил одно из базовых правил конкурентного дизайна:
Процесс, который владеет состоянием, не должен быть узким горлышком для параллельных воркеров.


Ещё хуже – воркеры не владели своими очередями, постоянно синхронно читали чужое состояние и зависели от одного центрального процесса.

Выглядит это следующим образом. Наглядный пример, как делать не надо:
# Модуль Processor
# Структура состояния:
# state = %{stream_id => [item1, item2, ...]}

def handle_cast({:push, item}, state) do
queue = Map.get(state.items, item.stream_id, [])
new_queue = queue ++ [item]

# если очередь новая — стартуем воркер
start_worker_if_needed(item.stream_id)

# Добавляем новое событие в состояние Processor
{:noreply, push(state.items[item.stream_id], new_queue)}
end

# Модуль Worker
def process_queue(stream_id) do
case GenServer.call(Processor, {:get_queue, stream_id}) do
[item | _] ->
process_item(item)
GenServer.cast(Processor, {:item_processed, item})
process_queue(stream_id)

[] ->
:ok
end
end


Ремарка для любопытных: функция `process_queue` рекурсивная, поскольку в уже существующий стрим могут докладываться новые события.


Правильная модель оказалась сильно проще:
- каждый воркер владеет своей очередью;
- центральный процесс ничего не знает про содержимое очередей, он лишь запускает воркеры, мониторит их и реагирует на завершение или падение.

А чтобы очередь не терялась при падении воркера, источник данных становится единственным source of truth. Воркер загружает очередь при старте. Если он упал – его состояние легко восстановить.
2
Правильный пример:
# Модуль Processor
def handle_cast({:push, item}, state) do
unless worker_running?(item.stream_id) do
start_worker(item.stream_id)
end

:ok
end

# Модуль Worker
def init(stream_id) do
queue = Source.fetch_stream(stream_id)
send(self(), :process_next)
{:ok, %{stream_id: stream_id, queue: queue}}
end

def handle_info(:process_next, %{queue: [item | rest]} = state) do
process_item(item)
Source.mark_processed(item.id)
send(self(), :process_next)
{:noreply, %{state | queue: rest}}
end

def handle_info(:process_next, %{queue: []} = state) do
notify_processor(:queue_done, state.stream_id)
{:stop, :normal, state}
end


Более того, так и выглядит поаккуратнее.
5
«Напишу-ка я маленький сервис»
Маленький сервис через месяц:
😁4🔥1
⚡️Ports – разгон Elixir до скорости C

Elixir, как и любой другой исполняемый на виртуальных машинах язык программирования, не может похвастаться высокой производительностью в CPU-bound задачах, и даже JIT не всегда решает эту проблему. Причиной этому являются планировщик процессов, сборка мусора, hot-reloading, иммутабельность данных. Его суперсила заключается в параллелизме – если CPU-bound задачу можно распараллелить, то проблема производительности встаёт не так остро. Но что делать, если перформанса не хватает? В этом случае на подмогу приходят они – Ports и NIF. Благодаря этим решениям можно реализовать сложную логику на низкоуровневом высокопроизводительном языке вроде C, Rust или Zig и вызывать её из BEAM. Сегодня я расскажу о самом простом и безопасном способе – Ports

Port – канал связи между BEAM и внешним миром. С помощью него виртуальная машина запускает скомпилированный исполняемый файл в отдельном процессе операционной системы, общается с ним по stdio и управляет его жизненным циклом. Выглядит это следующим образом:
// adder.c
// Будет скомпилирован в adder
#include <stdio.h>
#include <stdlib.h>

int main(int argc, char *argv[]) {
if (argc != 3) {
printf("ERROR: Need exactly 2 arguments\n");
return 1;
}

int a = atoi(argv[1]);
int b = atoi(argv[2]);

printf("%d\n", a + b);
return 0;
}

Реализуем для этой программы порт на Elixir:
defmodule AdderPort do
def add(a, b) do
# Запускаем порт с аргументами
port = Port.open(
{:spawn, "./adder #{a} #{b}"},
[:binary, :exit_status]
)

# Получаем результат из stdout
receive do
{^port, {:data, result}} ->
result
|> String.trim()
|> String.to_integer()

{^port, {:exit_status, status}} ->
{:error, "Exit status: #{status}"}
after
1000 -> {:error, :timeout}
end
end
end


Пример прост: программа adder принимает два аргумента, складывает их и записывает в stdout. BEAM читает результат и выводит его на экран. В случае аварийного завершения процесса функция AdderPort.add вернёт ошибку со статусом завершения работы. Использование написанного порта выглядит следующим образом:

iex> AdderPort.add(5, 2)
7
iex> AdderPort.add("a", 5)
{:error, "Exit status: -1"}


Благодаря Ports можно безопасно запускать внешние процессы, передавать им сообщения и получать результат. Однако этот подход имеет и недостатки:
- Безопасность: можно запустить что угодно, даже rm -rf /
- Производительность: port запускает отдельный процесс операционной системы, вследствие чего требуется дополнительное время на сериализации и межпроцессное взаимодействие IPC. Пропускная способность также ограничена
- Утечки: каждый port – отдельный процесс ОС, а это значит, что есть риск забить сервер до отказа, если порты запускаются в цикле и не закрываются
- Эффективность: данное решение эффективно в случае, если процесс живёт долго. Для частых вызовов это не подойдёт

Это самый простой способ взаимодействия BEAM с кодом на других языках. О NIF – молниеносно быстром вызове функций без накладных ресурсов – я расскажу в следующем посте
4
🚀NIF – разгоняем Elixir до предела

Как я уже упомянул в прошлом посте, порты – не единственный способ взаимодействия с кодом на других языках программирования. Они являются безопасными, поскольку краш внешнего процесса не убьёт BEAM, но влекут дополнительные задержки, о которых я говорил ранее. Для отдельных случаев, требующих частого вызова функций из внешнего кода, есть ещё несколько решений, и сегодня я хочу рассказать о NIF – самом производительном способе интеграции с внешним кодом.

NIF (Native Implemented Functions) – механизм, с помощью которого можно запускать функции внутри виртуальной машины BEAM без создания дополнительных процессов ОС. Функции выполняются синхронно в контексте планировщика BEAM и являются блокирующими. Этот способ имеет ряд преимуществ:
- Максимальная производительность: NIF выполняются как машинный код процессора, минуя BEAM, что даёт тысячекратную разницу в скорости в сравнении с Ports
- Нулевые накладные расходы: нет запуска процессов ОС и контекстных переключений между ними, межпроцессной коммуникации, сериализации/десериализации данных
- Прямой доступ к данным BEAM: NIF работают с внутренним представлением данных виртуальной машины и полностью совместимы с системой типов Erlang
- Единое адресное пространство: все выполняется в одном процессе ОС, что даёт совместное использование памяти, единый стек вызовов и кеширование данных между вызовами NIF

Реализация NIF выглядит следующим образом:
// adder.c
// Будет скомпилирован в adder.so
#include "erl_nif.h"

// Декларация NIF функции
static ERL_NIF_TERM add(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);

// Регистрируем NIF функции
static ErlNifFunc nif_funcs[] = {
// {"Имя функции в Elixir", количество аргументов, C функция}
{"add", 2, add}
};

// Инициализация NIF - макрос ERL_NIF_INIT
ERL_NIF_INIT(Elixir.NifExample, nif_funcs, NULL, NULL, NULL, NULL)

static ERL_NIF_TERM add(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
int a, b;

// Извлекаем первый аргумент как целое число
if (!enif_get_int(env, argv[0], &a)) {
return enif_make_badarg(env);
}

// Извлекаем второй аргумент как целое число
if (!enif_get_int(env, argv[1], &b)) {
return enif_make_badarg(env);
}

int result = a + b;

// Возвращаем результат как Erlang термин
return enif_make_int(env, result);
}

Делаем обвязку для Elixir:
defmodule NifExample do
# Функция загрузки NIF при старте модуля
@on_load :load_nif

# Публичный API модуля
def add(_a, _b), do: :erlang.nif_error(:nif_not_loaded)

defp load_nif do
# Определяем путь к скомпилированной NIF библиотеке
nif_path = :filename.join(:code.priv_dir(:nif_example), 'adder')

# Загружаем NIF
case :erlang.load_nif(nif_path, 0) do
:ok -> :ok
{:error, {:reload, _}} -> :ok # Уже загружена
error -> error
end
end
end

Использование:
iex> NifExample.add(5, 2)
7

Хоть преимуществ и много, есть и серьезные недостатки:
- Хрупкость: один SEGFAULT убивает абсолютно ВСЁ, вместе с функцией падает вся виртуальная машина
- Сложность работы с памятью: в NIF работа с памятью является двойной – можно выделять как в рамках виртуальной машины, так и с помощью malloc. Соответственно, возможны утечки памяти
- Отсутствие сборки мусора: NIF не может вызвать сборщик мусора BEAM во время выполнения, поэтому в циклах и рекурсивных функциях стоит использовать malloc/free

Стоит заметить, что на С в принципе всё выглядит монструозно. Для языков Rust и Zig есть библиотеки, обеспечивающие наиболее приятный Developer Experience – все сводится к написанию непосредственно логики и использованию специального декоратора, который преобразует это в нужный вид
2🔥2❤‍🔥1
К слову о DX в библиотеке Zig для NIF. Отдельно ничего даже компилить руками не нужно, все скомпилируется благодаря mix compile
Если хочется вынести код в отдельный файл – указываем путь к нему в макросе

Гениально, не иначе
Channel photo updated
Я тут начал писать небольшую библиотеку для Elixir, которую раздует в большой фреймворк. Перед вами telegram_ex – macro-based библиотека для разработки телеграм-ботов🚀

Почему?
Я не нашел в существующей экосистеме ничего, что бы мне понравилось. Может быть, я плохо искал, но ничего, к сожалению, не приглянулось.

Чем оно отличается от уже существующего?
Макросы. Мне очень нравится способ реализации GenServer, к примеру. Найденные мной библиотеки предоставляют исключительно функции-обёртки над Telegram Bot API.

Что там есть уже сейчас?
Минимальный функционал для написания echo-бота – получение и отправка текстовых сообщений. Выглядит это добро следующим образом:
defmodule EchoBot do
use TelegramEx,
name: "echo_bot",
token: "YOUR_BOT_TOKEN"

def handle_message(message) do
send_message(message["from"]["id"], message["text"])
end
end


Буду рад, если тебе будет интересно наблюдать за этим. Замков не строю, пилить буду в свободное время, которого сейчас есть совсем немножко.
🔥4
Итоги сегодняшнего вечера:
– Получение и отправка текста
– Поддержка ReplyKeyboard и InlineKeyboard
– Красивое решение для отправки разных видов сообщений (этим я горд больше всего)

Небольшой пример:
defmodule MyBot do
use TelegramEx,
name: "my_bot",
token: "YOUR_BOT_TOKEN"

def handle_message(%{chat: chat}) do
keyboard = [[
%{text: "Button 1", callback_data: "btn_1"},
%{text: "Button 2", callback_data: "btn_2"}
]]

Message.new(chat["id"])
|> Message.text("Choose an option:", "Markdown")
|> Message.inline_keyboard(keyboard)
|> Message.send(@bot_token)
end
end


README завёл. Кому интересно – смотрим
👍3
Начало успешно положено – я опубликовал первую версию библиотеки в официальный репозиторий Hex!
Вроде как оно даже работает, но это не точно😃

Что можно:
– Получать текстовые сообщения
– Отправлять текстовые сообщения с Inline и Reply клавиатурами
– Отправлять изображения и документы (как удаленные, так и локальные)
– Реагировать на callbacks

Документацию можно найти пока только в README в репозитории. Кто умеет в Elixir и как-то связан с телеграм-ботами – зелёный свет, можно пробовать!
🔥4🥰3
Продолжаю работать над библиотекой. Есть вероятность, что это перерастет в нечто большее, но пока что это только вероятность🙂‍↕️

Пока что это только черновик – реализация FSM:
defmodule Testbot do
use TelegramEx

def handle_message(%{text: "/start", chat: chat}) do
Message.text("You started me")
|> Message.send(chat["id"])

transition_to(chat["id"], :started)
end

defstate :started do
def handle_message(%{chat: chat}) do
Message.text("You already started me")
|> Message.send(chat["id"])
end
end
end


Всё просто – для каждого состояния прописываем свои обработчики внутри макроса defstate, переходы между состояниями происходят с помощью transition_to. Данные туда-сюда гонять пока что нельзя, да и эксепшены вылетают при некоторых обстоятельствах
🔥6