Кусочки кода
30 subscribers
126 photos
1 link
Полезные кусочки кода, советы по инструментам, разборы практик и технологий.

Полезные видео:
https://www.youtube.com/@Codeb1tes
Download Telegram
Знаете ли вы, как настроить Celery для асинхронных задач в Python?

# tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def send_email(recipient, subject, body):
# Симуляция отправки письма
print(f"Отправка письма на {recipient} с темой '{subject}'")
# Здесь может быть код интеграции с SMTP-сервером
return f"Письмо отправлено на {recipient}"


# main.py
from tasks import send_email

if __name__ == "__main__":
result = send_email.delay('user@example.com', 'Привет!', 'Это тестовое письмо.')
print(f"Статус задачи: {result.status}")
print(f"Результат задачи: {result.get(timeout=10)}")


Для запуска воркера выполните команду в терминале:

celery -A tasks worker --loglevel=info


Обработка задач с использованием периодических расписаний:

# tasks.py (дополнение)
from celery.schedules import crontab

app.conf.beat_schedule = {
'send-weekly-newsletter': {
'task': 'tasks.send_email',
'schedule': crontab(day_of_week='monday', hour=7, minute=30),
'args': ('newsletter@example.com', 'Еженедельная рассылка', 'Содержание рассылки...')
},
}
app.conf.timezone = 'UTC'


Запустите Celery Beat для выполнения периодических задач:

celery -A tasks beat --loglevel=info


Использование цепочек и групп задач:

from celery import chain, group

@app.task
def process_data(data):
# Обработка данных
return data * 2

@app.task
def store_result(result):
print(f"Результат хранения: {result}")


Вызов цепочки задач:

workflow = chain(process_data.s(10), store_result.s())
workflow.delay()


Групповое выполнение задач:

job = group(process_data.s(i) for i in range(5))
result = job.apply_async()
print(result.get())


Celery предоставляет гибкие возможности для управления асинхронными задачами, поддерживая как простые фоновые задачи, так и сложные рабочие процессы. Правильная настройка брокера сообщений и бэкенда результатов обеспечивает надежное выполнение и мониторинг задач.
Знаете ли вы, как создавать собственные контекстные менеджеры в Python?

Контекстные менеджеры позволяют управлять ресурсами, обеспечивая их правильное открытие и закрытие. Помимо встроенных, таких как open или lock, вы можете создавать свои собственные с помощью модуля contextlib.

Пример использования декоратора @contextmanager для создания контекстного менеджера:

from contextlib import contextmanager

@contextmanager
def file_manager(filename, mode):
try:
f = open(filename, mode)
yield f
finally:
f.close()

# Использование
with file_manager('example.txt', 'w') as f:
f.write('Привет, мир!')


В этом примере file_manager управляет открытием и закрытием файла. Код до yield выполняется при входе в контекст, а после — при выходе, гарантируя закрытие файла независимо от того, произошла ошибка внутри блока with.

Создание класса с методами __enter__ и __exit__:

class Resource:
def __enter__(self):
print('Ресурс открыт')
return self

def __exit__(self, exc_type, exc_val, exc_tb):
print('Ресурс закрыт')

# Использование
with Resource() as r:
print('Работа с ресурсом')


Вывод:
Ресурс открыт
Работа с ресурсом
Ресурс закрыт


Такой подход полезен для управления любыми ресурсами, требующими инициализации и очистки, например, сетевыми соединениями или временными файлами.

Дополнительные возможности contextlib:

- contextlib.ExitStack позволяет динамически управлять несколькими контекстными менеджерами.
- contextlib.nullcontext используется, когда требуется контекстный менеджер, который ничего не делает.

Пример использования ExitStack:

from contextlib import ExitStack

with ExitStack() as stack:
files = [stack.enter_context(open(f'test_{i}.txt', 'w')) for i in range(3)]
for i, f in enumerate(files):
f.write(f'Файл {i}\n')


Этот код открывает несколько файлов, гарантируя их закрытие после выхода из блока with, независимо от количества файлов.

Использование контекстных менеджеров повышает надежность кода, делая управление ресурсами более читаемым и безопасным.
Знаете ли вы, как эффективно использовать pipenv для управления зависимостями?

Pipenv упрощает управление зависимостями Python, создавая изолированные виртуальные окружения и файл Pipfile для описания зависимостей. При создании нового проекта используйте:

pipenv install


Это создаст виртуальное окружение и файл Pipfile. Для добавления зависимости, например, requests, выполните:

pipenv install requests


Pipenv автоматически обновит ваш Pipfile и создаст Pipfile.lock, что гарантирует воспроизводимость окружения. Активировать окружение можно с помощью команды:

pipenv shell


Для разработки удобно использовать команды:

pipenv install --dev pytest


Это установит pytest как зависимость для разработки. Чтобы установить все зависимости из Pipfile.lock, используйте:

pipenv install --ignore-pipfile


Это особенно полезно в CI/CD пайплайнах, где важно точно воспроизвести окружение. Кроме того, Pipenv позволяет легко выводить список зависимостей:

pipenv graph


Эта команда отобразит дерево зависимостей, помогая анализировать и оптимизировать используемые пакеты. Использование Pipenv способствует чистоте проекта и упрощает совместную работу в команде, предотвращая конфликты зависимостей.
Знаете ли вы, как реализовать потоковую обработку данных с помощью Apache Kafka?

Использование Apache Kafka для потоковой обработки данных позволяет обрабатывать большие объемы информации в реальном времени. Рассмотрим пример создания простого приложения, которое считывает данные из Kafka, обрабатывает их и отправляет результаты обратно в Kafka.

Для начала необходимо установить библиотеку kafka-python:

pip install kafka-python


Создадим продюсера, который будет отправлять сообщения в топик input_topic:

from kafka import KafkaProducer
import json

producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

data = {'event': 'user_signup', 'user_id': 12345}

producer.send('input_topic', value=data)
producer.flush()


Теперь создадим консумера, который будет читать сообщения из input_topic, обрабатывать их и отправлять результаты в output_topic:

from kafka import KafkaConsumer, KafkaProducer
import json

consumer = KafkaConsumer(
'input_topic',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for message in consumer:
event = message.value
# Пример обработки: добавление метки обработки
event['processed'] = True
producer.send('output_topic', value=event)


Этот простой пример демонстрирует базовую настройку продюсера и консумера в Kafka. В реальных приложениях можно использовать более сложные схемы обработки, интеграцию с потоковыми фреймворками, такими как Apache Flink или Spark Streaming, и обеспечивать масштабируемость и надежность системы.

Дополнительно, для обеспечения устойчивости можно настроить репликацию топиков и использовать Kafka Streams для реализации сложных потоковых операций. Это позволяет строить надежные и масштабируемые системы обработки данных в реальном времени.
🤯1
Знаете ли вы, как интегрировать Prometheus с Python-приложением?

Prometheus собирает метрики в реальном времени, и для интеграции с Python-приложением используется библиотека prometheus_client. Ниже приведен расширенный пример, демонстрирующий не только экспорт метрик, но и добавление счетчиков и гистограмм для более детального мониторинга.

from prometheus_client import start_http_server, Summary, Counter, Histogram
import time
import random

# Создаем метрики
REQUEST_TIME = Summary('request_processing_seconds', 'Время обработки запроса')
REQUEST_COUNT = Counter('request_count', 'Количество обработанных запросов')
REQUEST_SIZE = Histogram('request_size_bytes', 'Размер запроса в байтах')

@REQUEST_TIME.time()
def process_request(size):
REQUEST_COUNT.inc()
REQUEST_SIZE.observe(size)
# Эмулируем обработку запроса
time.sleep(random.uniform(0.1, 0.5))

if __name__ == '__main__':
# Запуск HTTP-сервера для метрик на порту 8000
start_http_server(8000)
while True:
# Генерация случайного размера запроса
req_size = random.randint(100, 1000)
process_request(req_size)


В этом примере добавлены следующие метрики:

- Counter request_count: отслеживает общее количество обработанных запросов.
- Histogram request_size_bytes: измеряет распределение размеров запросов в байтах.

После запуска приложения метрики будут доступны по адресу http://localhost:8000/metrics. Это позволяет Prometheus собирать данные о времени обработки запросов, количестве запросов и их размерах. Такие метрики помогают выявлять узкие места и оптимизировать производительность приложения.

Для более сложных сценариев можно создавать собственные метрики, комбинировать разные типы метрик и использовать лейблы для детального категорирования данных. Это обеспечивает гибкость и точность мониторинга, необходимую для поддержания высоких стандартов производительности и надежности приложений.
🔥1
Знаете ли вы, как использовать Git Hooks для автоматизации разработки?

Git Hooks позволяют автоматически выполнять скрипты при определенных событиях в репозитории. Например, с помощью pre-commit hook можно запускать тесты перед каждым коммитом, что помогает предотвратить попадание неработающего кода в историю.

Создайте файл .git/hooks/pre-commit со следующим содержимым:

#!/bin/sh
npm test


Сделайте файл исполняемым:

chmod +x .git/hooks/pre-commit


Теперь при каждой попытке коммита будут автоматически запускаться тесты. Если тесты не пройдут, коммит будет отменен. Это гарантирует, что в репозиторий попадает только проверенный код.

Дополнительно можно настроить pre-push hook для проверки кода перед отправкой на удаленный сервер. Создайте файл .git/hooks/pre-push:

#!/bin/sh
npm run lint


Сделайте его исполняемым:

chmod +x .git/hooks/pre-push


Теперь перед каждой операцией git push будет запускаться линтер, обеспечивая соответствие кода стандартам качества.

Использование Git Hooks значительно повышает надежность и качество процесса разработки, автоматизируя рутинные проверки и предотвращая распространение ошибок.
🔥1
Знаете ли вы, как эффективно обрабатывать ошибки в асинхронном коде на Python?

Асинхронное программирование требует особого подхода к обработке ошибок. Простое использование try/except может не сработать, если ошибка возникает внутри async функции.

Для обработки ошибок в асинхронных контекстах можно использовать asyncio.gather, указывая return_exceptions=True:

import asyncio

async def risky_operation():
raise ValueError('Ошибка!')

async def main():
results = await asyncio.gather(risky_operation(), return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f'Произошла ошибка: {result}')
else:
print(f'Результат: {result}')

asyncio.run(main())


В этом примере функция risky_operation всегда вызывает исключение ValueError. При запуске main с помощью asyncio.gather и параметра return_exceptions=True, результат выполнения будет содержать сами исключения вместо того, чтобы прерывать выполнение.

Вывод программы будет следующим:

Произошла ошибка: Ошибка!


Этот метод особенно полезен при одновременном выполнении нескольких задач, где некоторые из них могут завершиться с ошибкой. Кроме того, это позволяет централизованно обрабатывать все ошибки после завершения выполнения задач, что упрощает отладку и улучшает устойчивость приложения.

Для более сложных сценариев можно комбинировать asyncio.gather с логированием или повторными попытками выполнения задач. Например:

import asyncio
import logging

logging.basicConfig(level=logging.INFO)

async def risky_operation(task_id):
if task_id % 2 == 0:
raise ValueError(f'Ошибка в задаче {task_id}!')
return f'Задача {task_id} выполнена успешно.'

async def main():
tasks = [risky_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
logging.error(f'Задача {i} завершилась с ошибкой: {result}')
else:
logging.info(result)

asyncio.run(main())


Этот пример демонстрирует обработку ошибок для нескольких задач с разными идентификаторами. Логирование позволяет более детально отслеживать, какие именно задачи завершились успешно, а какие — с ошибками.

Таким образом, использование asyncio.gather с параметром return_exceptions=True предоставляет гибкий и мощный способ управления ошибками в асинхронных приложениях на Python.
🔥1
Знаете ли вы, как эффективно кэшировать API-запросы в Python?

Кэширование запросов к API может существенно повысить производительность приложения и снизить нагрузку на сервер. Рассмотрим более детально, как это реализовать с помощью библиотеки requests-cache.

import requests
import requests_cache

# Установка кеша с временем жизни 3 минуты
requests_cache.install_cache('api_cache', expire_after=180)

def fetch_data(url):
response = requests.get(url)
if response.from_cache:
print('Данные получены из кеша')
else:
print('Данные получены с сервера')
return response.json()

# Пример использования
data = fetch_data('https://api.example.com/data')
print(data)


В этом примере:

1. Установка кеша: requests_cache.install_cache('api_cache', expire_after=180) создает кеш с именем api_cache, где данные будут храниться в течение 180 секунд (3 минут).

2. Функция fetch_data: Отправляет запрос к указанному URL. После получения ответа проверяет, был ли он получен из кеша с помощью response.from_cache.

3. Использование кеша: При повторном запросе к тому же URL в течение 3 минут данные будут извлечены из кеша, что значительно ускоряет обработку и уменьшает количество обращений к серверу.

Дополнительные возможности requests-cache:

- Хранение кеша в разных бекендах: По умолчанию используется SQLite, но можно настроить другие хранилища, такие как Redis или MongoDB.

  requests_cache.install_cache('api_cache', backend='redis', expire_after=180, redis_host='localhost', redis_port=6379)


- Параметры игнорирования: Можно настроить, какие параметры запроса игнорировать при кешировании, чтобы различать запросы по определенным критериям.

  requests_cache.install_cache('api_cache', ignore_params=['api_key'], expire_after=180)


- Очистка кеша: В любой момент можно очистить кеш, чтобы удалить устаревшие данные.

  requests_cache.clear()


Использование кеширования API-запросов позволяет не только ускорить работу приложения, но и эффективно управлять ресурсами, особенно при взаимодействии с внешними сервисами с ограниченными лимитами запросов.
1🔥1
Знаете ли вы о возможностях модульности в Terraform для управления инфраструктурой?

Использование модулей в Terraform позволяет значительно упростить управление сложной инфраструктурой, обеспечивая повторное использование кода и улучшая его читаемость. Модули можно рассматривать как функции в программировании, которые принимают входные параметры и возвращают ресурсы.

Создание собственного модуля

Например, создадим модуль для развёртывания экземпляра EC2 с возможностью настройки параметров:

1. Структура модуля:

terraform-modules/
├── ec2-instance/
│ ├── main.tf
│ ├── variables.tf
│ └── outputs.tf


2. main.tf:

resource "aws_instance" "this" {
ami = var.ami
instance_type = var.instance_type

tags = {
Name = var.instance_name
}
}


3. variables.tf:

variable "ami" {
description = "AMI ID для экземпляра"
type = string
}

variable "instance_type" {
description = "Тип экземпляра"
type = string
default = "t2.micro"
}

variable "instance_name" {
description = "Имя экземпляра"
type = string
}


4. outputs.tf:

output "instance_id" {
description = "ID созданного экземпляра"
value = aws_instance.this.id
}

output "public_ip" {
description = "Публичный IP экземпляра"
value = aws_instance.this.public_ip
}


Использование модуля в основном конфигурационном файле:

module "web_server" {
source = "./terraform-modules/ec2-instance"
ami = "ami-0c55b159cbfafe1f0"
instance_type = "t2.medium"
instance_name = "WebServer"
}

module "db_server" {
source = "./terraform-modules/ec2-instance"
ami = "ami-0c55b159cbfafe1f0"
instance_type = "t2.large"
instance_name = "DatabaseServer"
}


Преимущества использования модулей:

- Повторное использование кода: Одни и те же конфигурации можно применять в разных частях проекта без дублирования.
- Упрощение управления: Изменения в модуле автоматически отражаются во всех местах его использования.
- Организованность: Чёткая структура проекта облегчает навигацию и понимание архитектуры.

Модули также можно использовать из публичных репозиториев, таких как Terraform Registry, что позволяет быстро интегрировать проверенные решения в ваши проекты. Разделение инфраструктуры на модули способствует лучшей масштабируемости и поддерживаемости кода.
1🔥1
Знаете ли вы, как эффективно использовать Dask для параллельной обработки больших файлов в Python?

Dask позволяет разбивать большие задачи на мелкие подзадачи и распределять их между несколькими ядрами или даже машинами. Это особенно полезно при обработке больших наборов данных, таких как массивные файлы CSV.

Пример использования Dask для чтения и фильтрации данных из нескольких файлов CSV:

import dask.dataframe as dd

# Чтение всех CSV файлов в директории
df = dd.read_csv('data/*.csv')

# Фильтрация строк, где значение в колонке 'score' больше 80
filtered_df = df[df['score'] > 80]

# Вычисление результата и сохранение в новый CSV файл
filtered_df.to_csv('filtered_data/filtered_*.csv', single_file=True)


Пояснения:
- dd.read_csv('data/*.csv') загружает все CSV файлы из директории data как единый Dask DataFrame, не загружая их полностью в память.
- Фильтрация df[df['score'] > 80] применяется к каждому подфрейму параллельно.
- Метод to_csv сохраняет отфильтрованные данные в новую директорию filtered_data. Параметр single_file=True объединяет результаты в один файл после обработки.

Дополнительные возможности Dask:
- Параллельные вычисления: Автоматическое распределение задач по доступным ядрам или кластерам.
- Интеграция с другими библиотеками: Легко комбинируется с NumPy, Pandas и другими инструментами для анализа данных.
- Масштабируемость: Подходит для обработки как небольших, так и очень больших наборов данных без изменения кода.

Используя Dask, можно значительно ускорить процессы обработки данных, эффективно используя ресурсы системы и минимизируя время ожидания.
1🔥1
Знаете ли вы, как эффективно управлять зависимостями с помощью Poetry в Python?

Poetry использует файл pyproject.toml для точного управления зависимостями и их версиями. Это позволяет избежать конфликтов между пакетами и гарантирует воспроизводимость среды. Для создания нового проекта выполните:

poetry new my_project
cd my_project
poetry add requests


Чтобы настроить виртуальное окружение вручную, используйте:

poetry env use python3.10


Poetry автоматически активирует виртуальное окружение при выполнении команд внутри проекта. Для установки всех зависимостей можно использовать:

poetry install


Кроме добавления пакетов, Poetry поддерживает создание и публикацию собственных пакетов. Например, для публикации пакета на PyPI:

poetry build
poetry publish --username your_username --password your_password


Также Poetry позволяет использовать группы зависимостей для разных сред, например, для разработки и тестирования:

[tool.poetry.dependencies]
python = "^3.10"
requests = "^2.25.1"

[tool.poetry.dev-dependencies]
pytest = "^6.2.4"


Это обеспечивает гибкость и удобство при управлении различными наборами зависимостей. Благодаря таким возможностям, Poetry становится незаменимым инструментом для современных Python-проектов, обеспечивая стабильность и удобство разработки.
1🔥1
Знаете ли вы настройку автоматического масштабирования в Kubernetes?

Для обеспечения эффективного использования ресурсов и поддержания производительности приложений Kubernetes предоставляет горизонтальное автоскалирование (Horizontal Pod Autoscaler, HPA). HPA автоматически изменяет количество реплик подов на основе метрик, таких как использование CPU или пользовательские метрики.

Пример конфигурации HPA для развертывания myapp:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: myapp
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70


В этом манифесте:

- scaleTargetRef указывает на развертывание myapp, которое будет масштабироваться.
- minReplicas и maxReplicas задают минимальное и максимальное количество реплик.
- metrics определяет, что масштабирование будет происходить на основе средней загрузки CPU, стремясь поддерживать её на уровне 70%.

Чтобы применить этот HPA, используйте команду:

kubectl apply -f myapp-hpa.yaml


Дополнительно, можно настроить пользовательские метрики или использовать метрики из внешних источников для более тонкого управления масштабированием. Например, используя Prometheus Adapter для интеграции пользовательских метрик:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-custom-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: myapp
minReplicas: 2
maxReplicas: 10
metrics:
- type: Pods
pods:
metric:
name: requests_per_second
target:
type: AverageValue
averageValue: 100


В этом примере масштабирование происходит на основе среднего количества запросов в секунду, используя метрику requests_per_second. Такие настройки позволяют Kubernetes адаптироваться к изменяющимся нагрузкам, обеспечивая стабильную работу приложений и оптимальное использование ресурсов.
1🔥1
Знаете ли вы, как эффективно использовать асинхронные генераторы в Python?

Асинхронные генераторы в Python позволяют обрабатывать поток данных без блокировки основного потока выполнения. Это особенно полезно при работе с большими объемами данных или при необходимости параллельной обработки.

Пример использования асинхронного генератора для чтения больших файлов построчно:

import asyncio

async def async_read_file(file_path):
loop = asyncio.get_event_loop()
with open(file_path, 'r') as f:
while True:
line = await loop.run_in_executor(None, f.readline)
if not line:
break
yield line.strip()

async def process_lines(file_path):
async for line in async_read_file(file_path):
# Обработка строки
print(line)

if __name__ == "__main__":
asyncio.run(process_lines('large_file.txt'))


В этом примере async_read_file — асинхронный генератор, который читает файл построчно, используя run_in_executor для выполнения блокирующей операции чтения в отдельном потоке. Это позволяет основной корутине продолжать выполнение, не ожидая завершения чтения строки.

Еще один пример — асинхронный генератор для получения данных из API с пагинацией:

import aiohttp
import asyncio

async def fetch_page(session, url, page):
async with session.get(f"{url}?page={page}") as response:
return await response.json()

async def async_paginated_fetch(url):
async with aiohttp.ClientSession() as session:
page = 1
while True:
data = await fetch_page(session, url, page)
if not data['items']:
break
for item in data['items']:
yield item
page += 1

async def process_items(url):
async for item in async_paginated_fetch(url):
# Обработка элемента
print(item)

if __name__ == "__main__":
asyncio.run(process_items('https://api.example.com/data'))


Здесь async_paginated_fetch итеративно запрашивает страницы данных из API и использует yield для передачи элементов по одному. Это позволяет обрабатывать большие объемы данных без загрузки их всех в память одновременно.

Асинхронные генераторы полезны для повышения эффективности и производительности приложений, работающих с потоками данных или выполняющих длительные операции ввода-вывода.
🔥1
Знаете ли вы, как реализовать JWT аутентификацию в вашем веб-приложении?

Для реализации JWT аутентификации можно использовать библиотеку pyjwt в Python. Рассмотрим пример на Flask:

from flask import Flask, request, jsonify
import jwt
import datetime
from functools import wraps

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'

def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(" ")[1]
if not token:
return jsonify({'message': 'Token is missing!'}), 401
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
current_user = data['user_id']
except:
return jsonify({'message': 'Token is invalid!'}), 401
return f(current_user, *args, **kwargs)
return decorated

@app.route('/login', methods=['POST'])
def login():
auth = request.json
if auth and auth.get('username') == 'admin' and auth.get('password') == 'password':
token = jwt.encode({
'user_id': 1,
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=30)
}, app.config['SECRET_KEY'], algorithm="HS256")
return jsonify({'token': token})
return jsonify({'message': 'Invalid credentials'}), 401

@app.route('/protected', methods=['GET'])
@token_required
def protected(current_user):
return jsonify({'message': f'Hello user {current_user}'})

if __name__ == '__main__':
app.run(debug=True)


В этом примере:

1. Генерация токена при логине: При успешной аутентификации пользователя генерируется JWT с указанием user_id и временем истечения exp.

2. Проверка токена: Декоратор token_required проверяет наличие и валидность токена в заголовке Authorization. Если токен валиден, запрос передается защищенному маршруту.

3. Защищенный маршрут: Маршрут /protected доступен только при наличии действительного токена и возвращает сообщение с идентификатором пользователя.

Этот подход обеспечивает безопасную аутентификацию пользователей, позволяя масштабировать приложение без необходимости хранения сессий на сервере.
🔥1
Знаете ли вы, как реализовать двустороннюю связь с помощью WebSockets в Python?

Для создания реального времени в приложениях на Python можно использовать библиотеку websockets. Ниже приведён пример сервера, который принимает сообщения от клиентов и отправляет их обратно.

import asyncio
import websockets

async def echo(websocket, path):
async for message in websocket:
await websocket.send(f'Эхо: {message}')

start_server = websockets.serve(echo, 'localhost', 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()


Этот сервер устанавливает WebSocket-соединение на localhost порту 8765. Каждый раз, когда клиент отправляет сообщение, сервер отвечает тем же сообщением с префиксом 'Эхо: '.

Чтобы создать клиента, который взаимодействует с этим сервером, можно использовать следующий код:

import asyncio
import websockets

async def send_message():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
await websocket.send("Привет, сервер!")
response = await websocket.recv()
print(response)

asyncio.get_event_loop().run_until_complete(send_message())


Клиент подключается к серверу, отправляет сообщение "Привет, сервер!" и выводит полученный ответ.

Чтобы повысить надёжность соединения, можно добавить обработку исключений и автоматическое переподключение:

import asyncio
import websockets
import time

async def echo(websocket, path):
try:
async for message in websocket:
await websocket.send(f'Эхо: {message}')
except websockets.exceptions.ConnectionClosed as e:
print(f"Соединение закрыто: {e}")

start_server = websockets.serve(echo, 'localhost', 8765)

loop = asyncio.get_event_loop()
loop.run_until_complete(start_server)
loop.run_forever()


Используя WebSockets, можно значительно уменьшить задержки и нагрузку на сервер при постоянной двусторонней коммуникации, что особенно полезно для чат-приложений, систем уведомлений и других сервисов реального времени.
🔥1
Знаете ли вы, как настроить эффективное автоматическое масштабирование в Kubernetes?

Horizontal Pod Autoscaler (HPA) изменяет количество реплик приложения на основе метрик, таких как использование CPU или памяти. Для более точного масштабирования можно использовать кастомные метрики или внешние источники данных.

Пример YAML конфигурации HPA с использованием кастомной метрики:

apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: myapp
minReplicas: 2
maxReplicas: 15
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 60
- type: Pods
pods:
metric:
name: transactions_per_second
target:
type: AverageValue
averageValue: 100


В этом примере HPA масштабирует количество подов myapp от 2 до 15 реплик. Масштабирование происходит при превышении средней загрузки CPU до 60% или при среднем числе транзакций в секунду выше 100. Для использования кастомных метрик необходимо настроить соответствующий метрик-сервер, например, Prometheus Adapter.

Дополнительно можно настроить автоскейлинг на основе внешних метрик, таких как задержка запросов или использование памяти:

apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: myapp-hpa-external
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: myapp
minReplicas: 3
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: request_latency
selector:
matchLabels:
endpoint: /api/v1/resource
target:
type: AverageValue
averageValue: "200ms"


Здесь HPA масштабирует приложение myapp на основе внешней метрики request_latency для определенного эндпоинта. Это позволяет более гибко реагировать на реальные показатели производительности приложения.

Использование HPA с кастомными и внешними метриками позволяет достичь более точного и эффективного масштабирования, адаптированного под конкретные потребности вашего приложения.
🔥1
Знаете ли вы, как eBPF позволяет реализовать мониторинг специфических туннельных протоколов, таких как GRE, прямо в ядре Linux?

Глубокий разбор анализа GRE-трафика через eBPF

eBPF (extended Berkeley Packet Filter) даёт системным администраторам и разработчикам возможность добавлять свой код прямо в ядро Linux для перехвата и обработки сетевых пакетов без необходимости модификации исходного кода ядра или загрузки kernel-модулей.

Чтобы следить за специфическими типами трафика на уровне ядра — например, за туннелированием через GRE, — не нужно писать обвязки на user space или тянуть pcap-тулзы. Программа eBPF может быстро подсчитать пакеты и байты GRE прямо при обработке пакета и тут же отправить события в user space.

Вот сокращённый пример кода для такой задачи. Программа считает количество байтов GRE-пакетов, приходящих на интерфейс, и отправляет статистику в map, которую можно читать снаружи:

#include <linux/bpf.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/gre.h>
#include <linux/bpf_helpers.h>

struct bpf_map_def SEC("maps") packet_count = {
.type = BPF_MAP_TYPE_PERF_EVENT_ARRAY,
.key_size = sizeof(u32),
.value_size = sizeof(u64),
.max_entries = 1024,
};

SEC("xdp") int count_packets(struct xdp_md *ctx) {
void *data = (void *)(long)ctx->data;
void *data_end = (void *)(long)ctx->data_end;

struct ethhdr *eth = data;
if ((void*)(eth + 1) > data_end)
return XDP_PASS;

if (eth->h_proto != htons(ETH_P_IP))
return XDP_PASS;

struct iphdr *ip = (void *)(eth + 1);
if ((void*)(ip + 1) > data_end)
return XDP_PASS;

if (ip->protocol != IPPROTO_GRE)
return XDP_PASS;

// Проверяем доступность GRE-заголовка
struct gre_hdr *gre = (void *)(ip + 1);
if ((void*)(gre + 1) > data_end)
return XDP_PASS;

// Собираем метрики
u64 bytes = (u64)(data_end - data);
u32 key = 0;
bpf_perf_event_output(ctx, &packet_count, BPF_F_CURRENT_CPU, &bytes, sizeof(bytes));
return XDP_PASS;
}


Подключение и использование

Компилируйте код с помощью clang под target BPF, затем загрузите через bpftool:

clang -O2 -target bpf -c gre_monitor.c -o gre_monitor.o
bpftool prog load gre_monitor.o /sys/fs/bpf/gre_monitor pinmaps /sys/fs/bpf/
bpftool net attach xdp id $(bpftool prog show name count_packets | grep -o '[0-9]*') dev eth0


Для сбора статистики потребуется простая программа на C или Python, которая читает события из perf-буфера:

from bcc import BPF

b = BPF(obj="gre_monitor.o")
b["packet_count"].open_perf_buffer(lambda cpu, data, size: print(f"GRE packet: {int.from_bytes(data, byteorder='little')} bytes"))

print("Monitoring GRE packets... Press Ctrl+C to exit")
while True:
try:
b.perf_buffer_poll()
except KeyboardInterrupt:
break


Почему стоит использовать этот подход для туннелирования?

Обычные способы мониторинга не видят вложенные полезные данные за GRE и L2TP, а XDP-программа на eBPF позволяет точно считать только нужные протоколы, не тормозя всю систему. Можно расширить пример для трекинга L2TP, подсчёта RTT или фильтрации по списку IP-адресов: работать с породами туннелирования становится намного проще.

Данный подход обеспечивает производительность на порядок выше традиционных решений на базе libpcap, так как:
- Обработка происходит на ранней стадии в сетевом стеке ядра
- Отсутствует копирование пакетов в пространство пользователя для всего трафика
- Фильтрация выполняется атомарно, без взаимодействия с userspace

Реальный кейс: на больших инфраструктурах так можно ловить подозрительные GRE-туннели (например, неожиданный P2P), отслеживать логику отказа каналов и собирать детальную статистику без затрат на зеркалирование трафика и внешние анализаторы. Также возможно интегрировать это решение с системами мониторинга вроде Prometheus или Grafana для построения визуализаций и настройки алертов.
🔥1
Знаете ли вы, как просто управлять фичами без сторонних библиотек и фреймворков?

Вместо тяжелых интеграций и лишних зависимостей можно изолировать новые и старые функции через небольшой конфиг на основе обычного файла. Такой подход дает предсказуемость: поведение приложения управляется централизованно, а запуск экспериментальных изменений не требует деплоя кода.

Изоляция изменений через конфиг

Нужно раздать новую функцию только части пользователей? Самописный контроль с помощью простого конфиг-файла легко справится с этим.

Пример: нужно плавно перевести пользователей со старой логики на новую и иметь возможность моментально вернуть все обратно. Используем обычный JSON-файл с включателями:

{
"features": {
"newFeature": false,
"legacyFeature": true
}
}


Весь контроль у вас: достаточно поменять значение в json – и поведение тут же изменится.

Применение в коде

В коде легко читать флаги из файла:

import json

with open('config.json') as config_file:
config = json.load(config_file)

if config['features']['newFeature']:
newFeature()
else:
legacyFeature()


Такой файл можно править на лету, он одинаково читается как скриптом, так и человеком.

Плюсы подхода

1. Управление полностью отделено от логики – никакой жёсткой прошивки фичей в коде.
2. Откат фичи не требует ручного редеплоя – внести правку можно прямо на сервере.
3. Легко масштабируется: можно дописать разные уровни управления (например, добавлять флаги для отдельных пользователей или групп).
4. Нет скрытых зависимостей и магии – код весь на виду, править можно в любой момент.

Минимум кода, максимум контроля. Такой способ оказывается незаменим во внутренней инфраструктуре, для экспериментов и осторожных изменений в проде без внедрения тяжелых сервисов.
🔥1
Знаете ли вы, как можно использовать reverse proxy для быстрого autosave между фронтовыми модулями без единой базы, через WebSocket-соединения?

В распределённых модульных фронтендах часто требуется без задержек синхронизировать пользовательские настройки или черновики между разными частями SPA, не прибегая к общей базе данных. Такой обмен можно реализовать, если объединять WebSocket-сессии модулями сквозь reverse proxy, чтобы получать центральную точку обмена и упростить маршрутизацию трафика между отдельными сервисами или микрофронтендами. Это позволяет каждому модулю слушать и отправлять изменения состояния мгновенно, без лишнего кустарного взаимодействия.

Зачем здесь reverse proxy — и чем он помогает с WebSocket

Прямое соединение каждого фронта с autosave-сервисом может быстро привести к избытку открытых WebSocket-каналов и хаосу в настройках CORS, балансировке и безопасности. Reverse proxy превращает все WebSocket-запросы в управляемый поток: получает их на одном порту, потом аккуратно раскидывает на нужные микросервисы, централизует контроль соединений и позволяет быстро масштабировать инфраструктуру. Архитектурно это убирает необходимость в отдельном сервисе шины данных или синхронизации между фронтами.

Пример reverse proxy для WebSocket autosave на Node.js

В этом фрагменте сервер принимает входящие WebSocket-подключения на единый порт (через proxy), а затем проксирует дальше к нужному сервису. Каждый входящий autosave-запрос обрабатывается на целевом сервере – так, чтобы фронты не взаимодействовали друг с другом напрямую и не пересекались на уровне логики:

const http = require('http');
const WebSocket = require('ws');
const httpProxy = require('http-proxy');

const proxy = httpProxy.createProxyServer({});
const server = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('Reverse Proxy for Autosave');
});

const wss = new WebSocket.Server({ noServer: true });

server.on('upgrade', (req, socket, head) => {
proxy.ws(req, socket, head, { target: 'ws://localhost:5000' });
});

wss.on('connection', (ws) => {
ws.on('message', (message) => {
// Обрабатываем autosave – сохраняем данные во временное хранилище, если нужно
ws.send('Autosave performed');
});
});

server.listen(3000, () => {
console.log('Proxy server is running on port 3000');
});


В этом примере autosave-логика реализуется на целевом сервере (localhost:5000) – reverse proxy лишь пересылает WebSocket-сессии, не привязываясь к конкретной реализации хранения. Это значит, можно гибко менять схему хранения: временно записывать в память, сбрасывать в redis или даже на диск, не меняя прокси.

Что интересно учесть для production-сценариев

– С помощью такого reverse proxy можно организовать маршрутизацию на разные группы сервисов или разные версии серверов, проставлять нужные заголовки и прокидывать авторизацию.
– Для фронтов, построенных как модульные микросервисы, это снимает многие ограничения по количеству соединений и позволяет держать WebSocket-пул небольшим и управляемым.
– Если нужно централизовано контролировать autosave-сессии или временные данные, достаточно добавить простую прослойку на target-сервере на redis или file-based storage. Прокси остается “глупым”, быстро реагируя на события и не лезет в детали логики хранения.

Такой подход не только упрощает архитектуру временного обмена состоянием между фрагментами SPA, но и отлично работает при большом числе одновременных подключений, избавляя от лишней синхронизации и межмодульных зависимостей.
🔥1
Знаете ли вы, как сочетать Terraform и Chaos Engineering для анализа устойчивости инфраструктуры?

Подход “инфраструктуры как кода” дает надежные механизмы автоматизации, но гарантии реальной отказоустойчивости появляются только после проверки системы под неожиданными сбоями и хаосными событиями. Автоматизированный деплой средствами Terraform — это база, а устойчивость к реальным отказам требует симуляции “аварий” и анализа процессов самовосстановления.

Внедрение хаотического тестирования для облачных инфраструктур как AWS

Когда инфраструктура на AWS развернута через Terraform и управляется autoscaling group, можно предположить, что резервы устойчивости уже реализованы за счет автоматического масштабирования и перезапусков. Тем не менее, без моделирования реальных сбоев сложно сказать, насколько быстро и корректно восстанавливается сервис под нагрузкой или во время отсутствия одного из ресурсов.

Вот базовый пример Terraform-конфигурации для группы приложений на EC2:

resource "aws_launch_configuration" "app_lc" {
image_id = "ami-0c55b159cbfafe1f0"
instance_type = "t2.micro"
lifecycle {
create_before_destroy = true
}
}

resource "aws_autoscaling_group" "app_asg" {
launch_configuration = aws_launch_configuration.app_lc.id
min_size = 2
max_size = 10
desired_capacity = 5
tag {
key = "Name"
value = "AppServer"
propagate_at_launch = true
}
health_check_type = "EC2"
health_check_grace_period = 60
}


Автоматизация запуска и поддержки пяти инстансов выглядит рабочей конфигурацией. Но готова ли система действительно к хаосу? Чтобы проверить это, можно задействовать Chaos Monkey или иной инструмент для случайных остановок или завершения инстансов прямо внутри указанной ASG. Пример вызова для “убийства” случайного сервера:

chaos-monkey --app AppServer --instance-type t2.micro --duration 30s


Такой сценарий воспроизведет внезапный отказ в реальном времени. Дальнейшее поведение наблюдаем через метрики: ASG практически сразу запустит новый инстанс, чтобы поддерживать необходимый desiredcapacity. Но чтобы сделать вывод об устойчивости, обязательно фиксируйте время до полного восстановления сервиса (вплоть до прохождения всех health-check и включения в балансировщик) и сопоставьте с логами группы и приложений.

Тонкости на практике: что смотреть после теста?

- Важно мониторить не только момент появления нового EC2, но и скорость его регистрации во всех жизненно важных компонентах (LB, сервис-дискавери и др).
- Если неправильно заданы параметры health
checkgraceperiod, scale-in или scale-out, система может либо не среагировать на сбой вовремя, либо потерять часть ресурсов под нагрузкой.
- Обязательно проверьте, что lifecycle hooks корректно отрабатывают: иногда задержки в shutdown могут тормозить быстрое восстановление.

Добавьте интеграцию с Telegram-ботом — моментальные нотификации о рестарте инстансов позволят ловить аномалии не только по метрикам, а и по необычным сценариям (например, если один инстанс постоянно уходит в unhealthy без явных причин).

Такой подход позволяет удостовериться, что код инфраструктуры не только развернут “по спецификации”, но и готов к реальным непредсказуемым событиям: проверена реакция на аварии, скорость восстановления и корректность триггеров автоскейлинга. Чем лучше симулированы настоящие сбои, тем увереннее можно масштабироваться — и тем спокойнее команда поддерживает работу в проде.
🔥1