This media is not supported in your browser
VIEW IN TELEGRAM
В этом видео разбирается, что такое модули в Python и как их подключать в проект. Автор показывает, как импортировать встроенные модули, а также создавать и использовать свои собственные. Это позволяет делать код более структурированным и переиспользуемым.
Please open Telegram to view this post
VIEW IN TELEGRAM
❤🔥2❤2⚡2
🚦 Python + Redis Streams — быстрая обработка событий
Redis Streams — это структура данных для очередей и событий. С ней можно строить системы, где сообщения не теряются, а обрабатываются быстро и упорядоченно.
📥 Запись события в стрим
➡️ Каждое событие получает уникальный ID и хранится в очереди.
📤 Чтение из стрима
➡️ Можно читать все сообщения с начала или только новые.
👥 Консьюмер-группы
➡️ Несколько воркеров обрабатывают события параллельно, но каждое сообщение достанется только одному.
♻️ Подтверждение обработки
➡️ Сообщение убирается из «висячих», значит оно точно обработано.
🔄 Повторная доставка
➡️ Это делает систему надёжной.
⚡️ Потоковая обработка в реальном времени
➡️ Код крутится бесконечно и реагирует на новые события сразу.
🚀 Кейсы применения
🗣️ Запомни: Redis Streams позволяют строить быстрые и надёжные пайплайны обработки событий, а с consumer groups ты получаешь масштабируемость и защиту от потерь.
Redis Streams — это структура данных для очередей и событий. С ней можно строить системы, где сообщения не теряются, а обрабатываются быстро и упорядоченно.
import redis
r = redis.Redis()
r.xadd("mystream", {"user": "ivan", "action": "login"})
events = r.xread({"mystream": "0"}, count=5)
print(events)r.xgroup_create("mystream", "group1", id="0", mkstream=True)
msg = r.xreadgroup("group1", "consumer-1", {"mystream": ">"}, count=1)♻️ Подтверждение обработки
r.xack("mystream", "group1", msg_id)Если воркер упал, Redis отдаст «неподтверждённые» события другому.
while True:
msgs = r.xread({"mystream": "$"}, block=5000)
for _, events in msgs:
for _, data in events:
print(data)
Логи, чаты, системы мониторинга, стриминг данных с IoT-устройств. Всё, где важна скорость и порядок.
Please open Telegram to view this post
VIEW IN TELEGRAM
👍4❤2🥰2
Pydantic v2 — не мелкий апдейт, а рефакторинг движка в отдельный
pydantic-core на Rust. Это смена архитектуры: быстрее, строже и с новой API-парадигмой для валидации и сериализации.pydantic-core (Rust) — профит по скорости и надёжности.parse_obj → model_validate, dict() → model_dump/model_dump_json.from pydantic import BaseModel
class User(BaseModel):
id: int
name: str = "Anon"
u = User.model_validate({"id": "1", "name": "Bob"})
print(u) # id=1 (если режим lax — pydantic попытается привести)
model_validate — основной метод для валидации словарей/объектов.По умолчанию Pydantic пытается привести типы. Но ты можешь включить строгий режим на модель:
from pydantic import BaseModel, ConfigDict
class M(BaseModel):
model_config = ConfigDict(strict=True)
id: int
M.model_validate({"id": "1"}) # ValidationError: строка → int запрещена
model_config = ConfigDict(strict=True) включает strict-режим для всех полей модели.Если нужен только жёсткий тип для конкретного поля — используй строгие типы (
StrictInt, StrictStr и т.д.).@field_validatorfrom pydantic import BaseModel, field_validator, ValidationError
class User(BaseModel):
username: str
password: str
password2: str
@field_validator('password2')
def passwords_match(cls, v, info):
if v != info.data['password']:
raise ValueError("Пароли не совпадают")
return v
try:
User.model_validate({'username': 'u', 'password': 'x', 'password2': 'y'})
except ValidationError as e:
print(e)
@field_validator — заменитель старого @validator. Может запускаться before/after и получать доступ к уже валидированным данным через ValidationInfo.@model_validatorfrom pydantic import BaseModel, model_validator
class Payment(BaseModel):
amount: int
currency: str
@model_validator(mode='after')
def check_amount(cls, values):
if values['amount'] <= 0:
raise ValueError("amount must be > 0")
return values
@model_validator — для cross-field правил и общего pre/post-processing. Есть режимы before, after, wrap.user = User.model_construct({"id": 1, "name": "x"})model_construct() создаёт экземпляр без валидации — только если данные доверенные и нужны микро-оптимизации.model_dump / model_dump_jsonu = User.model_validate({"id": 1, "name": "Bob"})
print(u.model_dump()) # dict
print(u.model_dump_json()) # json
# опции: exclude_none, exclude_unset, round_trip, by_alias и т.д.model_dump заменил dict() и даёт контроль над сериализацией.class A(BaseModel):
model_config = ConfigDict(extra='forbid')
name: str
A.model_validate({'name': 'x', 'bad': 1}) # ValidationError, extra not allowed
model_config.extra — ignore (по умолчанию), allow, forbid.❌ Привычка: User.parse_obj(...)✔️ Теперь: User.model_validate(...).❌ Ожидать, что dict() сохранит все флаги и алиасы.✔️ Используй model_dump(by_alias=True, exclude_unset=True).❌ Делать сложную cross-field валидацию в @field_validator.✔️ Для таких случаев — @model_validator(mode='after').
🛠 Практические паттерны и советы
👍 Нужна производительность? Для доверенных данных используй model_construct().👍 Хочешь полностью выключить «магическое» приведение — model_config = ConfigDict(strict=True) или StrictInt/StrictStr.👍 Для сложных преобразований — @field_validator(..., mode='before'), чтобы нормализовать вход до основной валидации.
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥7❤2👍2❤🔥1
Media is too big
VIEW IN TELEGRAM
В этом видео автор простым и наглядным языком показывает, как открывать, читать и записывать файлы в Python: режимы `r/w/a`, контекстный менеджер `with`, методы `read()`, `readlines()` и `write()`, а также базовые приёмы обработки ошибок при работе с файлами.
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥4❤2
🚰 Стриминг больших данных через генераторы и
Когда данных слишком много — миллионы строк в файле, бесконечные события или поток логов — держать всё в памяти нельзя. Тут спасают генераторы и ключевое слово
📦 Чтение файла построчно
➡️ Память не переполняется: читается и отдаётся по строке.
🔗 Комбинация генераторов через yield from
➡️
⚡️ Обработка пайплайнами
➡️ Логика разбивается на звенья, и каждое обрабатывает поток данных лениво.
🌀 Бесконечные источники
➡️ Генератор выдаёт значения бесконечно, но потребитель сам решает, когда остановиться.
📑 Вложенные пайплайны с
➡️
🚀 Генераторы + итераторы стандартной библиотеки
➡️ Вместо списка в миллион чисел получаешь поток, из которого берёшь только нужное.
🗣️ Запомни: генераторы — это ленивые конвейеры данных, yield from связывает их в цепочку. Такой подход позволяет обрабатывать гигабайты без переполнения памяти и писать код, который дышит как поток.
yield fromКогда данных слишком много — миллионы строк в файле, бесконечные события или поток логов — держать всё в памяти нельзя. Тут спасают генераторы и ключевое слово
yield from, позволяющее строить цепочки ленивых итераторов.def read_file(path):
with open(path, "r") as f:
for line in f:
yield line.strip()
for row in read_file("bigdata.txt"):
print(row)
def numbers():
yield from range(5)
yield from range(10, 15)
print(list(numbers()))
yield from разворачивает подгенераторы и прокидывает их значения наружу.def read_lines(path):
with open(path) as f:
yield from f
def filter_errors(lines):
for line in lines:
if "ERROR" in line:
yield line
errors = filter_errors(read_lines("app.log"))
for e in errors:
print(e)
🌀 Бесконечные источники
import time
def ticker():
while True:
yield time.time()
time.sleep(1)
for t in ticker():
print(t)
📑 Вложенные пайплайны с
yield fromdef split_words(lines):
for line in lines:
yield from line.split()
data = ["hello world", "big data stream"]
for word in split_words(data):
print(word)
yield from красиво разрывает вложенные итерации без двойных циклов.from itertools import islice
def stream():
for i in range(1000000):
yield i
print(list(islice(stream(), 5)))
Please open Telegram to view this post
VIEW IN TELEGRAM
❤6⚡1
🚦 Python + Redis Streams — быстрая обработка событий
Redis Streams — это структура данных для очередей и событий. С ней можно строить системы, где сообщения не теряются, а обрабатываются быстро и упорядоченно.
📥 Запись события в стрим
➡️ Каждое событие получает уникальный ID и хранится в очереди.
📤 Чтение из стрима
➡️ Можно читать все сообщения с начала или только новые.
👥 Консьюмер-группы
➡️ Несколько воркеров обрабатывают события параллельно, но каждое сообщение достанется только одному.
♻️ Подтверждение обработки
➡️ Сообщение убирается из «висячих», значит оно точно обработано.
🔄 Повторная доставка
➡️ Это делает систему надёжной.
⚡️ Потоковая обработка в реальном времени
➡️ Код крутится бесконечно и реагирует на новые события сразу.
🚀 Кейсы применения
🗣️ Запомни: Redis Streams позволяют строить быстрые и надёжные пайплайны обработки событий, а с consumer groups ты получаешь масштабируемость и защиту от потерь.
Redis Streams — это структура данных для очередей и событий. С ней можно строить системы, где сообщения не теряются, а обрабатываются быстро и упорядоченно.
import redis
r = redis.Redis()
r.xadd("mystream", {"user": "ivan", "action": "login"})
events = r.xread({"mystream": "0"}, count=5)
print(events)r.xgroup_create("mystream", "group1", id="0", mkstream=True)
msg = r.xreadgroup("group1", "consumer-1", {"mystream": ">"}, count=1)♻️ Подтверждение обработки
r.xack("mystream", "group1", msg_id)Если воркер упал, Redis отдаст «неподтверждённые» события другому.
while True:
msgs = r.xread({"mystream": "$"}, block=5000)
for _, events in msgs:
for _, data in events:
print(data)
Логи, чаты, системы мониторинга, стриминг данных с IoT-устройств. Всё, где важна скорость и порядок.
Please open Telegram to view this post
VIEW IN TELEGRAM
❤6👍4
📐 Typing: TypedDict, Literal, Annotated в реальных проектах
Аннотации типов в Python — это не только
📦 TypedDict — словари со строгой схемой
➡️ Теперь IDE и mypy знают структуру словаря. Ошибку в ключе или типе отловишь сразу.
🎯 Literal — фиксированные значения
➡️ Полезно, когда параметр должен принимать строго ограниченные значения.
🧩 Annotated — тип + метаданные
➡️ Аннотации становятся документацией и могут использоваться библиотеками (например, валидацией в FastAPI).
⚡️ TypedDict с опциональными ключами
➡️
🔄 Literal в связке с Enum
➡️ Даёт компактную альтернативу Enum, если не нужно хранить методы.
📑 Annotated для валидации данных
В FastAPI:
➡️ Ограничения накладываются прямо через типы, без лишнего кода.
🧠 Совмещение инструментов
➡️ Получается словарь со строгой схемой и фиксированным набором значений.
🗣️ Запомни: TypedDict даёт строгие словари, Literal ограничивает набор значений, Annotated связывает тип с метаданными. Вместе они делают код самодокументируемым и надёжным.
Аннотации типов в Python — это не только
int и str. Для сложных сценариев есть мощные инструменты: TypedDict, Literal, Annotated. Они помогают писать код понятнее и ловить ошибки ещё на этапе разработки.from typing import TypedDict
class User(TypedDict):
id: int
name: str
is_admin: bool
u: User = {"id": 1, "name": "Иван", "is_admin": False}
from typing import Literal
def set_status(status: Literal["NEW", "IN_PROGRESS", "DONE"]) -> None:
print(f"Статус: {status}")
set_status("NEW") # ✅
set_status("INVALID") # ❌ mypy отловит
from typing import Annotated
from dataclasses import dataclass
@dataclass
class User:
id: Annotated[int, "DB Primary Key"]
email: Annotated[str, "must be valid email"]
⚡️ TypedDict с опциональными ключами
class Config(TypedDict, total=False):
debug: bool
log_level: str
cfg: Config = {"debug": True} # ok
total=False позволяет делать часть ключей необязательными.from typing import Literal
Role = Literal["user", "admin", "moderator"]
def check_role(role: Role): ...
📑 Annotated для валидации данных
В FastAPI:
from typing import Annotated
from fastapi import Query
def read_items(size: Annotated[int, Query(ge=1, le=100)]):
return {"size": size}
class Product(TypedDict):
id: int
status: Literal["active", "archived"]
def process(p: Product): ...
Please open Telegram to view this post
VIEW IN TELEGRAM
👍4❤2🔥2
Media is too big
VIEW IN TELEGRAM
В этом видео автор объясняет, как работать с модулем os в Python для взаимодействия с операционной системой.
Разбирается выполнение команд через терминал, навигация по файловой системе, работа с путями и управление файлами. Всё показано на простых примерах, которые легко повторить самостоятельно.
Please open Telegram to view this post
VIEW IN TELEGRAM
❤3⚡2
Async ORM в Python: SQLModel/Tortoise 🆚 SQLAlchemy
Синхронные ORM вроде SQLAlchemy блокируют потоки на время запросов к БД — в асинхронных приложениях это убивает всю пользу asyncio. Async ORM работают на корутинах и не блокируют event loop, позволяя обрабатывать тысячи одновременных подключений.
🔄 SQLAlchemy: синхронный подход (блокирующий)
➡️ Каждый запрос занимает поток. В async-приложениях нужны тред-пулы, что усложняет архитектуру.
⚡️ SQLModel: асинхронная обёртка над SQLAlchemy
➡️ Полная совместимость с SQLAlchemy + асинхронность. Идеально для миграции.
🐢 Tortoise ORM: нативный async-подход
➡️ Вдохновлен Django ORM — простой и понятный синтаксис.
📊 Сравнение производительности
➡️ Async ORM в 5-7 раз быстрее при высокой нагрузке благодаря отсутствию блокировок.
🛠 Работа с отношениями: SQLModel
➡️ Полная поддержка отношений как в SQLAlchemy + Pydantic валидация.
🛠 Работа с отношениями: Tortoise
➡️ Простой Django-like синтаксис с автоматическими обратными связями.
🔧 Экосистема и сообщество
🌐 Использование с FastAPI
➡️ Идеальная интеграция с современными async-фреймворками.
🗣️ Запомни: Выбирай Tortoise для новых async-проектов за его простоту и производительность.
Синхронные ORM вроде SQLAlchemy блокируют потоки на время запросов к БД — в асинхронных приложениях это убивает всю пользу asyncio. Async ORM работают на корутинах и не блокируют event loop, позволяя обрабатывать тысячи одновременных подключений.
# ❌ Блокирует поток на время выполнения запроса
from sqlalchemy.orm import Session
def get_users_sync():
with Session(engine) as session:
users = session.query(User).all() # Поток заблокирован
return users
# ✅ Не блокирует event loop
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
async def get_users_async():
async with AsyncSession(engine) as session:
result = await session.exec(select(User)) # Освобождает loop
return result.all()
# ✅ Специально разработан для асинхронности
from tortoise.models import Model
from tortoise import fields
class User(Model):
name = fields.CharField(max_length=255)
email = fields.CharField(max_length=255)
async def get_users_native():
return await User.all() # Чистый async API
# Тест: 1000 одновременных запросов
# SQLAlchemy + threads: ~15 сек, 1.2 GB RAM
# SQLModel: ~2.8 сек, 620 MB RAM
# Tortoise: ~2.3 сек, 512 MB RAM
🛠 Работа с отношениями: SQLModel
class Team(SQLModel, table=True):
id: int | None = Field(primary_key=True)
name: str
users: List["User"] = Relationship(back_populates="team")
class User(SQLModel, table=True):
team_id: int | None = Field(foreign_key="team.id")
team: Optional[Team] = Relationship(back_populates="users")
🛠 Работа с отношениями: Tortoise
class Team(Model):
name = fields.CharField(max_length=255)
users: fields.ReverseRelation["User"]
class User(Model):
name = fields.CharField(max_length=255)
team: fields.ForeignKeyRelation[Team] = fields.ForeignKeyField(
"models.Team", related_name="users"
)
⚡️ Миграции и инструменты🟢 SQLAlchemy: Требует Alembic, сложная настройка🟢 SQLModel: Работает через Alembic, наследует сложности SQLAlchemy🟢 Tortoise: Встроенная система миграций, простая настройка
🟢 SQLAlchemy: Огромное сообщество, все возможные расширения🟢 SQLModel: Молодой, но растущий проект, полная совместимость🟢 Tortoise: Зрелый async-ориентированный фреймворк
# SQLModel интеграция
from fastapi import FastAPI
from sqlmodel.ext.asyncio.session import AsyncSession
app = FastAPI()
@app.get("/users")
async def get_users(session: AsyncSession = Depends(get_session)):
result = await session.exec(select(User))
return result.all()
Please open Telegram to view this post
VIEW IN TELEGRAM
❤2👍2
import - механизм подключения модулейВ Python
import используется для подключения модулей (файлов с кодом) в текущую программу.1. Импорт всего модуля
import math
print(math.sqrt(16)) # 4.0
2. Импорт с псевдонимом (alias)
import numpy as np
print(np.pi) # 3.141592653589793
3. Импорт конкретных объектов
from math import sqrt, pi
print(sqrt(25)) # 5.0
print(pi) # 3.141592653589793
4. Импорт всех объектов (*) ❗ (не рекомендуется)
from math import *
print(sin(0)) # 0.0
⚠️ Минус: может вызвать конфликты имён.
5. Импорт из пользовательского модуля (my_module.py)
import my_module
my_module.hello()
Если модуль в другой папке, нужно добавить его в
sys.path:
import sys
sys.path.append("path/to/module")
import my_module
Python ищет модуль в следующем порядке:
1️⃣ Стандартные модули (
math, random, os)2️⃣ Текущая директория (папка, где запускается скрипт)
3️⃣ Пути из
sys.pathЕсли модуль не найден, возникает
ModuleNotFoundError.import позволяет использовать готовый код и организовать программу, разбивая её на модули.Please open Telegram to view this post
VIEW IN TELEGRAM
⚡1👍1
HTTP/3 работает поверх QUIC (UDP). Меньше задержек, параллельные стримы, нет head-of-line блокировки. Для Python это новая эра веб-сервисов.
from aioquic.asyncio.client import connect
async with connect("example.com", 443, alpn_protocols=["h3"]) as client:
stream_id = client.get_next_available_stream_id()
client.send_headers(stream_id, [(":method", "GET"), (":path", "/")])
client.send_eof(stream_id)
📡 Сервер на aioquic
from aioquic.asyncio import serve
from aioquic.h3.connection import H3_ALPN
async def handler(stream_id, request):
return [(b":status", b"200")], b"Hello QUIC!"
await serve("0.0.0.0", 4433, configuration=my_tls_config,
alpn_protocols=H3_ALPN, stream_handler=handler)
QUIC = несколько потоков внутри одного соединения.
id1 = client.get_next_available_stream_id()
id2 = client.get_next_available_stream_id()
client.send_headers(id1, [...])
client.send_headers(id2, [...])
QUIC всегда шифрован.
Пока через uvicorn-h3 (экспериментально).
uvicorn app:app --http h3 --port 4433
🟢 API с большим количеством параллельных запросов🟢 real-time (чаты, стримы)🟢 мобильные клиенты с нестабильной сетью
❌ Минусы
🔴 Поддержка в браузерах уже есть, но в Python всё ещё эксперимент.🔴 aioquic — низкоуровневый, нужно писать руками.🔴 Поддержка в фреймворках только зарождается.
Please open Telegram to view this post
VIEW IN TELEGRAM
👍5🔥2❤1
Графы — не про математику, а про связи. Пользователи, друзья, транзакции, сервисы — всё это графы. Python и современные БД делают их работу простой и мощной.
Создадим и изучим связи между объектами:
import networkx as nx
G = nx.Graph()
G.add_edges_from([('Alice', 'Bob'), ('Bob', 'Eve'), ('Alice', 'Eve')])
print(nx.shortest_path(G, 'Alice', 'Eve'))
print(nx.degree_centrality(G))
import matplotlib.pyplot as plt
nx.draw(G, with_labels=True, node_color='lightblue', node_size=1500)
plt.show()
GraphQL от Facebook — не база, а язык запросов к API.
query = """
{
user(id: "1") {
name
friends {
name
}
}
}
"""
from ariadne import QueryType, make_executable_schema, graphql_sync
type_defs = """
type Query {
hello: String!
}
"""
query = QueryType()
@query.field("hello")
def resolve_hello(_, info):
return "GraphQL + Python!"
schema = make_executable_schema(type_defs, query)
Python-драйвер для Neo4j:
from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687")
with driver.session() as session:
result = session.run("MATCH (a)-[r]->(b) RETURN a,b LIMIT 5")
for record in result:
print(record)
MATCH (user:Person)-[:FRIEND]->(friend)
WHERE user.name = "Alice"
RETURN friend.name
Анализируешь связи в NetworkX, сохраняешь их в Neo4j, а отдаёшь клиенту через GraphQL API.
Полный цикл: анализ → хранение → доступ.
Please open Telegram to view this post
VIEW IN TELEGRAM
👍3❤2⚡2
DevOps — это не только YAML и Terraform. Python — идеальный клей между инфраструктурой, API и конфигами. Он управляет тем, что не умеет Ansible, и дополняет Terraform там, где HCL бессилен.
Когда тебе нужно сгенерировать
.tf на лету или вызывать Terraform из пайплайна:import subprocess, json
vars = {"region": "eu-west-1", "env": "prod"}
with open("vars.tf.json", "w") as f:
json.dump(vars, f)
subprocess.run(["terraform", "apply", "-auto-approve"])
Каждый Ansible-модуль — это обычный Python-скрипт.
from ansible.module_utils.basic import AnsibleModule
def main():
module = AnsibleModule(
argument_spec={"msg": {"required": True, "type": "str"}}
)
module.exit_json(changed=False, msg=f"Hello {module.params['msg']}!")
if __name__ == "__main__":
main()
Python — король SDK. AWS, GCP, Azure, GitLab, Kubernetes — всё управляется через API.
import boto3
ec2 = boto3.client('ec2')
for i in ec2.describe_instances()['Reservations']:
print(i['Instances'][0]['InstanceId'])
import json
inventory = {
"all": {
"hosts": ["web1", "web2"],
"vars": {"ansible_user": "ubuntu"}
}
}
print(json.dumps(inventory))
Python идеально вписывается в пайплайны:
🟢 валидирует .tf перед применением,🟢 вызывает Ansible с нужными параметрами,🟢 парсит логи и метрики.
import subprocess
subprocess.run(["ansible-playbook", "deploy.yml", "-e", "env=staging"])
import requests
resp = requests.get("https://my-service/health")
if resp.status_code != 200:
print("⚠️ Service down!")
🔵 Terraform хорош для декларации, но не для логики.🔵 Ansible конфигурирует, но не анализирует.🔵 Python делает и то, и другое: управляет, проверяет, комбинирует.
Please open Telegram to view this post
VIEW IN TELEGRAM
👍3🔥2❤1
yield и зачем он реально нужен в Pythonyield — это не return. Он не завершает функцию, а паузит её и отдаёт результат по частям.
def gen():
yield 1
yield 2
yield 3
for x in gen():
print(x)
1, 2, 3 — не сразу, а по одному.Функция работает до первого `yield`, потом «засыпает» и продолжает дальше.
🧾 Чтение большого файла без нагрузки
def read_lines(path):
with open(path) as f:
for line in f:
yield line.strip()
for line in read_lines("huge.log"):
print(line)
Если файл весит 10 ГБ — всё равно работает.
Никаких
readlines(), которые жрут память.
def counter():
n = 0
while True:
yield n
n += 1
for i in counter():
if i > 5:
break
print(i)
yield тут не обойтись.Можно генерировать бесконечный поток, но обрабатывать его как обычный список.
def even_only(numbers):
for n in numbers:
if n % 2 == 0:
yield n
nums = [1, 2, 3, 4, 5, 6]
for x in even_only(nums):
print(x)
filter(), а свой контролируемый фильтр. Гибко и понятно.yield — это когда ты генерируешь данные на лету.Ты не ждёшь весь список, не держишь всё в памяти — просто отдаёшь, когда нужно.
Нужен потоковый режим?
yield — лучший друг.Please open Telegram to view this post
VIEW IN TELEGRAM
👍7❤3
Media is too big
VIEW IN TELEGRAM
В этом видео автор показывает, как создавать функции в Python: как объявлять их, передавать аргументы и использовать возвращаемые значения.
Разъясняется, зачем нужны функции: они позволяют разбивать код на логические блоки, упрощать повторное использование и улучшать читаемость.
Please open Telegram to view this post
VIEW IN TELEGRAM
👍4🔥3❤2
AI-инструменты теперь реально помогают Python-разработчикам: рефакторят, комментируют и ловят ошибки быстрее, чем линтер. Но как ими пользоваться с умом, а не превращать проект в тестовую свалку?
Наивно:
def calc(x, y):
return x*y + x/y - x**y + y/x
def calc(x, y):
if y == 0:
raise ValueError("y не может быть нулём")
return (x * y) + (x / y) - pow(x, y) + (y / x)
Наивно:
def get_data(a):
return json.loads(a)
def get_data(a: str) -> dict:
"""Парсит JSON-строку в словарь"""
return json.loads(a)
Наивно:
data=[1,2,3,4,5]
print(sum([i for i in data if i>2]))
data = [1, 2, 3, 4, 5]
print(sum(i for i in data if i > 2))
Наивно:
def add(a,b): return a+b
def test_add():
assert add(2, 3) == 5
assert add(-1, 1) == 0
Наивно:
result = func1(func2(func3(data)))
temp = func3(data)
temp = func2(temp)
result = func1(temp)
AI-комментарий:
# Этот код группирует пользователей по стране и считает активных
Наивно:
os.system(f"rm -rf {user_input}")subprocess.run(["rm", "-rf", user_input], check=True)
AI может за тебя создать CRUD, REST API, Dockerfile, CI pipeline — по описанию.
Please open Telegram to view this post
VIEW IN TELEGRAM
❤9😁4👍3🔥1
Media is too big
VIEW IN TELEGRAM
В этом видео автор показывает, как строить логику приложения на Python через использование циклов.
Разбирается, когда и как использовать `for` и `while`, как контролировать ход выполнения, организовывать условия внутри циклов и выстраивать структуру программы для решения конкретных задач.
Please open Telegram to view this post
VIEW IN TELEGRAM
⚡3🔥3❤1
enumerate() добавляет счётчик к итерации.fruits = ["🍎", "🍌", "🍇"]
for i, fruit in enumerate(fruits):
print(i, fruit)
0 🍎
1 🍌
2 🍇
Please open Telegram to view this post
VIEW IN TELEGRAM
🍌8❤5👍3🔥1👀1
sequence[start:stop:step]
🔹
start – начальный индекс (по умолчанию 0).🔹
stop – конечный индекс (не включается!).🔹
step – шаг (по умолчанию 1).
lst = [0, 1, 2, 3, 4, 5]
print(lst[1:4]) # [1, 2, 3] (элементы с индекса 1 по 3)
print(lst[:3]) # [0, 1, 2] (с начала до индекса 2)
print(lst[::2]) # [0, 2, 4] (каждый второй элемент)
print(lst[::-1]) # [5, 4, 3, 2, 1, 0] (разворот списка)
⚡ Итог: срезы позволяют легко работать с последовательностями без циклов и лишнего кода!
Please open Telegram to view this post
VIEW IN TELEGRAM
👍6❤2
Когда pandas начинает задыхаться на гигабайтах данных — не надо переписывать код.
Просто меняешь библиотеку — и Python начинает считать на всех ядрах и даже на кластере.
import pandas as pd
df = pd.read_csv("data.csv")
df.groupby("city").sum()
🧩 Dask подключает все ядра
import dask.dataframe as dd
df = dd.read_csv("data.csv")
df.groupby("city").sum().compute()
Всё летит параллельно.
from dask.distributed import Client
client = Client("tcp://scheduler:8786")
Каждая нода тянет свой кусок данных.
import modin.pandas as pd
df = pd.read_csv("data.csv")
df.groupby("city").sum()
Многоядерность включается автоматически.
🧠 Dask + ML = масштаб
from dask_ml.linear_model import LogisticRegression
model = LogisticRegression()
model.fit(X, y)
Больше данных — та же логика.
dask-scheduler &
dask-worker tcp://localhost:8786
localhost:8787 — видишь граф задач, загрузку CPU, память.dask-ctl deploy kubernetes --replicas 8
Kubernetes, AWS, GCP — всё поддерживается.
Please open Telegram to view this post
VIEW IN TELEGRAM
❤7👍4🔥2❤🔥1
В Python можно распаковывать коллекции в аргументы или структуры — это мощно.
def f(*args, **kwargs):
print(args, kwargs)
f(1, 2, x=10) # (1, 2), {'x': 10}
nums = [1, 2, 3]
print(*nums) # 1 2 3
params = {"sep": " | "}
print(1, 2, 3, **params) # 1 | 2 | 3
a, *middle, b = [1, 2, 3, 4, 5]
print(middle) # [2, 3, 4]
Please open Telegram to view this post
VIEW IN TELEGRAM
👍5❤3🔥3