Python для начинающих
1.24K subscribers
546 photos
3 videos
232 files
74 links
Python для начинающих
Download Telegram
Как подружиться с argparse и сделать свой первый CLI-инструмент

Почти каждый полезный скрипт рано или поздно превращается в маленькую консольную утилиту: нужно передать путь к файлу, режим работы, уровень логирования. Пихать всё в input() — путь страданий. Для этого в стандартной библиотеке есть модуль argparse, который превращает ваш скрипт в удобный CLI-инструмент с автогенерируемой справкой.

---

### Базовый пример: обязательный аргумент

Скрипт, который приветствует пользователя по имени:

import argparse

parser = argparse.ArgumentParser(description="Simple greeting script")
parser.add_argument("name", help="User name to greet")

args = parser.parse_args()
print(f"Hello, {args.name}!")


Запускаем из терминала:

python greet.py Alice
# Hello, Alice!


Попробуйте python greet.py -h — справка генерируется автоматически.

---

### Опциональные флаги и значения по умолчанию

Добавим флаг --uppercase и параметр --times:

import argparse

parser = argparse.ArgumentParser(description="Advanced greeting script")
parser.add_argument("name", help="User name to greet")
parser.add_argument(
"-t", "--times",
type=int,
default=1,
help="How many times to repeat greeting"
)
parser.add_argument(
"-u", "--uppercase",
action="store_true",
help="Print greeting in uppercase"
)

args = parser.parse_args()

greeting = f"Hello, {args.name}!"
if args.uppercase:
greeting = greeting.upper()

for _ in range(args.times):
print(greeting)


Примеры запуска:

python greet.py Bob -t 3
python greet.py Bob -t 2 -u


---

### Выбор из ограниченного набора значений

Частая задача — режим работы: debug, info, error. Используем choices:

import argparse

parser = argparse.ArgumentParser(description="Logging level demo")
parser.add_argument(
"--level",
choices=["debug", "info", "error"],
default="info",
help="Logging level"
)

args = parser.parse_args()
print(f"Selected level: {args.level}")


Если передать неправильное значение, argparse сам ругнется и покажет помощь.

---

### Что в итоге

argparse умеет:

- разбирать позиционные и опциональные аргументы;
- автоматически генерировать -h/--help;
- проверять типы (type=int, float, и т.д.);
- ограничивать значения (choices);
- удобно работать с флагами (action="store_true").

Освоив этот модуль, вы переводите свои скрипты из категории «сделал для себя» в категорию «это можно отдавать другим и не стыдиться».
👍1
- Работа с Google Sheets API для автоматизации работы с таблицами.
Python для начинающих: автоматизируем Google Sheets через API

Работа с Excel надоела, а таблицы в Google Sheets растут как грибы? Самое время подключить Python и заставить таблицы работать за вас.

### Что понадобится

1. Аккаунт Google.
2. Включить Google Sheets API и создать Service Account в Google Cloud Console.
3. Скачать JSON‑ключ сервисного аккаунта.
4. Выдать этому сервисному аккаунту доступ к нужной таблице (Share → по email из JSON).

Устанавливаем нужные пакеты:

pip install gspread google-auth


### Подключение к таблице

import gspread
from google.oauth2.service_account import Credentials

SCOPES = ["https://www.googleapis.com/auth/spreadsheets"]
creds = Credentials.from_service_account_file(
"service_account.json",
scopes=SCOPES
)

client = gspread.authorize(creds)
sheet = client.open("Sales Report").sheet1 # первая вкладка


Теперь sheet — это объект рабочей таблицы, с которым можно делать почти всё.

### Чтение данных

Получим все строки и заголовок:

rows = sheet.get_all_records()  # список dict'ов
header = sheet.row_values(1) # первая строка
print(header)
print(rows[:3]) # первые три записи


Так удобно превращать таблицу в «мини-базу данных».

### Запись и обновление

Запишем заголовок и пару строк:

data_header = ["Date", "Product", "Quantity", "Price"]
sheet.update("A1:D1", [data_header])

new_rows = [
["2025-01-01", "Keyboard", 3, 59.9],
["2025-01-02", "Mouse", 5, 29.5],
]
sheet.append_rows(new_rows)


append_rows добавляет данные в конец, не нужно считать, какая строка следующая.

Обновим цену в конкретной ячейке:

sheet.update("D2", 79.9)


### Массовые обновления

Если нужно поменять сразу блок данных — используем диапазон:

discounted = [
["Keyboard", 49.9],
["Mouse", 24.9],
]
sheet.update("B2:C3", discounted)


### Небольшая автоматизация: перерасчёт итогов

Добавим в таблицу столбец с итоговой суммой:

values = sheet.get_all_values()
header = values[0]
rows = values[1:]

quantity_idx = header.index("Quantity")
price_idx = header.index("Price")

totals = []
for row in rows:
try:
qty = float(row[quantity_idx])
price = float(row[price_idx])
totals.append([qty * price])
except ValueError:
totals.append([""])

start_row = 2
end_row = start_row + len(totals) - 1
sheet.update(f"E{start_row}:E{end_row}", totals)
sheet.update("E1", "Total")


Теперь таблица сама считает сумму по строке, а вы можете раз в день запускать скрипт и обновлять отчеты.

Google Sheets API + Python — это быстрый способ превратить обычную таблицу в часть автоматизированного пайплайна: собирать данные, очищать, пересчитывать и готовить отчеты без ручного копипаста.
🔥31
- Как работать с параллельными потоками данных с использованием Apache Kafka.
Как работать с параллельными потоками данных с Apache Kafka и Python

Представь себе конвейер в заводском цеху: по ленте бесконечно едут детали, а рабочие на разных станциях что‑то с ними делают. Apache Kafka — это примерно такой же конвейер, только для данных. Она позволяет принимать, хранить и отдавать миллионы сообщений в реальном времени. Python здесь может быть “рабочим”, который эти данные обрабатывает.

---

### Основные термины Kafka

- Broker — сервер Kafka, который хранит сообщения.
- Topic — “лента”, куда пишутся сообщения (например, user_events).
- Partition — кусок топика. Именно разделы позволяют обрабатывать сообщения параллельно.
- Producer — отправляет сообщения в Kafka.
- Consumer — читает сообщения из Kafka.
- Consumer group — группа потребителей, которые совместно читают один топик, автоматически деля партиции между собой.

Чем больше партиций у топика и чем больше потребителей в группе, тем выше параллелизм.

---

### Установка клиента для Python

Один из популярных клиентов — confluent-kafka (тонкая обертка над C-библиотекой):

pip install confluent-kafka


---

### Простой producer: отправляем события

from confluent_kafka import Producer
import json

producer = Producer({"bootstrap.servers": "localhost:9092"})

def delivery_report(err, msg):
if err is not None:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

for user_id in range(1, 6):
event = {"user_id": user_id, "action": "click"}
producer.produce(
topic="user_events",
value=json.dumps(event).encode("utf-8"),
callback=delivery_report
)

producer.flush()


Сообщения попадают в топик user_events. Kafka сама распределяет их по партициям (по умолчанию — по хешу ключа, если он есть).

---

### Параллельное чтение: несколько consumers в одной группе

Ключ к параллельности — использовать одну consumer group и задать топику несколько партиций (например, 3). Тогда, если ты запустишь этот скрипт в нескольких процессах, Kafka раздаст им партиции автоматически.

from confluent_kafka import Consumer, KafkaException
import json

consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "analytics_group",
"auto.offset.reset": "earliest",
})

consumer.subscribe(["user_events"])

try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
event = json.loads(msg.value().decode("utf-8"))
print(
f"Process: handled user={event['user_id']} "
f"from partition={msg.partition()} offset={msg.offset()}"
)
finally:
consumer.close()


Теперь магия:
- Увеличиваешь количество процессов/контейнеров с этим consumer’ом — обработка масштабируется.
- Kafka сама следит, кто какие партиции читает, и перераспределяет их при падении одного из экземпляров.

---

### Практические советы

1. Думай о ключах сообщений: одинаковый ключ → одинаковая партиция → удобно гарантировать порядок для конкретного пользователя.
2. Используй consumer groups для разных задач: аналитика, логирование, алерты могут читать один и тот же топик независимо.
3. Следи за offset’ами: по умолчанию Kafka фиксирует прогресс чтения, чтобы после рестартов не терять сообщения.

Kafka + Python позволяют строить конвейеры обработки данных, которые легко масштабируются горизонтально. Для начинающего питониста это хороший шаг от “скриптиков” к полноценным потоковым системам.
👍1
- Основы работы с мемкэшем и Redis для кэширования данных.
Основы работы с Memcached и Redis для кэширования данных

Если ваш Python‑код постоянно лезет в базу за одними и теми же данными — вы жжёте процессор и тормозите приложение. В таких случаях на сцену выходят Memcached и Redis — быстрые in‑memory хранилища, идеальные для кэширования.

---

### Зачем нужен кэш

Кэширование — это сохранение заранее вычисленных результатов в память, чтобы не пересчитывать и не запрашивать их повторно. Типичный сценарий:

1. Проверяем кэш.
2. Если данных нет — обращаемся к БД / API.
3. Кладём результат в кэш.
4. В следующий раз берём из кэша за миллисекунды.

---

### Memcached: минималистичный и быстрый

Memcached — это просто распределённый словарь в памяти: ключ–значение, без сложных типов данных.

Установка клиента:

pip install pymemcache


Пример:

from pymemcache.client import base

client = base.Client(("localhost", 11211))

def get_user_profile(user_id: int) -> dict:
cache_key = f"user:{user_id}"
cached = client.get(cache_key)
if cached:
# данные хранятся как bytes; в реальном коде — json.loads(...)
print("from cache")
return eval(cached.decode("utf-8"))

print("from db")
user = {"id": user_id, "name": "Alice"} # тут должна быть реальная БД
client.set(cache_key, str(user).encode("utf-8"), expire=60)
return user

get_user_profile(1)
get_user_profile(1)


Особенности Memcached:
- Нет персистентности: перезапустили — всё забыто.
- Простые типы (строки/байты).
- Идеален как быстрый временный кэш поверх БД.

---

### Redis: швейцарский нож кэширования

Redis умеет больше: строки, списки, множества, хэши, TTL, pub/sub, транзакции и персистентность.

Установка:

pip install redis


Пример кэша с TTL:

import json
from redis import Redis

redis_client = Redis(host="localhost", port=6379, db=0)

def get_product(product_id: int) -> dict:
cache_key = f"product:{product_id}"
cached = redis_client.get(cache_key)
if cached:
print("from cache")
return json.loads(cached)

print("from db")
product = {"id": product_id, "title": "Keyboard", "price": 99.9}
redis_client.set(cache_key, json.dumps(product), ex=120)
return product

get_product(10)
get_product(10)


Пример использования сложных структур: счётчик просмотров товара:

def increment_views(product_id: int) -> int:
key = f"product:{product_id}:views"
return redis_client.incr(key)

print(increment_views(10))
print(increment_views(10))


---

### Что выбрать новичку

- Memcached, если нужен простой, сверхбыстрый кэш без сложных структур и персистентности.
- Redis, если хотите кэш + дополнительные возможности (счётчики, очереди, сессии, персистентность).

Для учебных проектов Redis часто удобнее: он универсален и его легче масштабировать по мере усложнения системы.
👍2
- Использование pandas для преобразования данных и очистки наборов данных.
Использование pandas для преобразования и очистки данных

Если вы хоть раз открывали «сырой» CSV-файл, то знаете: данные редко бывают аккуратными. Пропуски, дубли, странные форматы дат — всё это мешает анализу. Здесь на сцену выходит pandas — швейцарский нож для работы с табличными данными в Python.

---

### Загрузка и первый взгляд на данные

Начнём с простого: прочитаем CSV и посмотрим, что внутри.

import pandas as pd

df = pd.read_csv("sales.csv")
print(df.head())
print(df.info())


head() показывает первые строки, info() — типы столбцов и наличие пропусков. Уже отсюда часто можно понять, где бардак: например, даты хранятся как object, а суммы — как строки.

---

### Преобразование типов и работа с датами

Частая проблема — строки там, где должны быть числа или даты.

df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
df["date"] = pd.to_datetime(df["date"], errors="coerce")


Параметр errors="coerce" превращает невозможные значения в NaN. Это удобно: дальше их можно явно обработать, а не ловить таинственные ошибки.

---

### Очистка пропусков и дубликатов

Пропуски можно либо удалить, либо заполнить осмысленными значениями.

# Удаляем строки, где нет суммы покупки
df = df.dropna(subset=["amount"])

# Заполняем пропущенный город значением "Unknown"
df["city"] = df["city"].fillna("Unknown")

# Удаляем полные дубликаты строк
df = df.drop_duplicates()


Для числовых столбцов часто используют среднее, медиану или 0 — зависит от задачи:

df["discount"] = df["discount"].fillna(df["discount"].median())


---

### Создание новых признаков

Pandas позволяет легко добавлять столбцы, основанные на уже существующих.

df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["amount_with_tax"] = df["amount"] * 1.2


За пару строк кода можно превратить «сырые» данные в удобный набор признаков для анализа или модели.

---

### Фильтрация и группировка

Комбинация фильтрации и группировки — основа исследовательского анализа.

# Оставим только заказы после 2023 года и с положительной суммой
filtered = df[(df["date"].dt.year >= 2023) & (df["amount"] > 0)]

# Посчитаем суммарные продажи по городам
city_stats = (
filtered
.groupby("city", as_index=False)["amount"]
.sum()
.rename(columns={"amount": "total_amount"})
)

print(city_stats.head())


С помощью groupby и агрегирующих функций (sum, mean, count и др.) можно быстро получать сводки и отчёты, не трогая Excel.

---

pandas превращает хаотичный CSV в аккуратный, структурированный датафрейм, с которым приятно работать. Чем раньше вы начнёте его использовать, тем меньше времени будете тратить на рутину и тем больше — на сам анализ.
👍1
- Как настроить асинхронные запросы HTTP с библиотекой aiohttp.
Как настроить асинхронные HTTP‑запросы с aiohttp

Синхронный код в стиле requests.get() удобен, пока запросов мало. Но как только нужно опросить десятки или сотни URL — программа начинает “залипать”. Здесь на сцену выходит aiohttp и asyncio.

---

### Установка и базовый пример

pip install aiohttp


Первый асинхронный запрос:

import asyncio
import aiohttp

async def fetch(session, url):
async with session.get(url) as response:
return await response.text()

async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, "https://httpbin.org/get")
print(len(html))

asyncio.run(main())


Разбор:
- async def — объявление корутины;
- await — “подождать, но не блокировать”;
- ClientSession — переиспользует соединения (это важно для производительности).

---

### Параллельные запросы

Самая вкусная часть — запуск запросов одновременно:

import asyncio
import aiohttp

URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
]

async def fetch_status(session, url):
async with session.get(url) as response:
return url, response.status

async def main():
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(fetch_status(session, url)) for url in URLS]
for task in asyncio.as_completed(tasks):
url, status = await task
print(url, "->", status)

asyncio.run(main())


Все запросы уходят почти одновременно, а ответы обрабатываются по мере готовности. Разница особенно заметна на “медленных” API.

---

### Таймауты и обработка ошибок

Без ограничений можно повесить программу на “вечном” запросе. Добавим таймаут и отловим ошибки:

import asyncio
import aiohttp
from aiohttp import ClientError

async def safe_fetch_json(session, url):
try:
async with session.get(url, timeout=3) as response:
response.raise_for_status()
return await response.json()
except asyncio.TimeoutError:
print("Timeout:", url)
except ClientError as e:
print("ClientError:", url, "->", e)

async def main():
async with aiohttp.ClientSession() as session:
data = await safe_fetch_json("https://httpbin.org/json")
print("Result:", data)

asyncio.run(main())


Ключевые моменты:
- timeout=3 — запрос не будет длиться бесконечно;
- raise_for_status() — выбросит исключение при 4xx/5xx;
- перехват ClientError — базовый класс большинства сетевых ошибок в aiohttp.

---

### Когда стоит переходить на aiohttp

Используйте асинхронные HTTP‑запросы, когда:
- нужно дернуть много URL в короткое время;
- ваш код в основном ждет сети или диска;
- вы пишете асинхронный веб‑скрейпер, бота или микросервис.

Если запрос один‑два — requests проще. Но как только вы упираетесь в “ожидание сети”, aiohttp + asyncio дает очень ощутимый прирост скорости, не усложняя код слишком сильно.
3
- Создание и управление новой базой данных SQLite в Python.
Создание и управление новой базой данных SQLite в Python

SQLite — идеальная отправная точка для тех, кто хочет начать работать с базами данных, не устанавливая сервер и не настраивая сложные системы. Всё хранится в одном файле, а в Python уже есть встроенный модуль sqlite3.

Разберём базовые шаги: создание БД, таблиц, вставка, выборка и обновление данных.

---

### 1. Подключение и создание базы

Если файла базы данных нет, sqlite3 создаст его автоматически:

import sqlite3

conn = sqlite3.connect("blog.db") # создаем/открываем файл базы
cursor = conn.cursor()


conn — это соединение, cursor — объект для выполнения SQL-запросов.

---

### 2. Создание таблицы

Создадим таблицу для статей блога:

cursor.execute("""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()


IF NOT EXISTS защитит от ошибки при повторном запуске скрипта.

---

### 3. Вставка данных

Добавим новую запись:

new_post = ("First post", "This is my first post content.")
cursor.execute(
"INSERT INTO posts (title, content) VALUES (?, ?)",
new_post
)
conn.commit()


Обрати внимание на ? — это параметризованный запрос. Так безопаснее и защищает от SQL-инъекций.

---

### 4. Чтение данных

Вытащим все статьи:

cursor.execute("SELECT id, title, created_at FROM posts")
rows = cursor.fetchall()

for row in rows:
post_id, title, created_at = row
print(post_id, title, created_at)


fetchall() возвращает список кортежей. Для больших объёмов данных выгоднее использовать fetchone() в цикле.

---

### 5. Обновление и удаление

Изменим заголовок и удалим запись:

cursor.execute(
"UPDATE posts SET title = ? WHERE id = ?",
("Updated title", 1)
)

cursor.execute(
"DELETE FROM posts WHERE id = ?",
(2,)
)

conn.commit()


---

### 6. Акуратное закрытие соединения

conn.close()


Либо использовать контекстный менеджер:

import sqlite3

with sqlite3.connect("blog.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM posts")
print(cursor.fetchone())


В этом случае commit() и close() произойдут автоматически.

---

SQLite + sqlite3 — это минимальный, но мощный набор, чтобы почувствовать себя создателем настоящих приложений с базой данных: от простого блога до небольших утилит и заметочников. Главное — аккуратная работа с запросами и понимание, что всё это уже доступно «из коробки» в Python.
👍2
- Введение в управление сессиями пользователей с помощью Flask.
Введение в управление сессиями пользователей с помощью Flask

Когда вы пишете веб‑приложение, быстро возникает вопрос: как «запомнить» пользователя между запросами? HTTP сам по себе «ничего не помнит» — каждый запрос живет отдельно. Здесь на сцену выходят сессии.

Во Flask сессия — это обычный словарь session, который привязан к пользователю через cookie. Данные сессии шифруются с помощью SECRET_KEY, поэтому значение ключа должно быть сложным и храниться в секрете.

Начнем с минимального примера «ручного входа»:

from flask import Flask, session, redirect, url_for, request

app = Flask(__name__)
app.secret_key = "change_this_secret_key"

@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
# Здесь могла бы быть проверка пароля
session["user"] = username
return redirect(url_for("profile"))
return """
<form method="post">
<input name="username" placeholder="Username">
<button type="submit">Login</button>
</form>
"""

@app.route("/profile")
def profile():
user = session.get("user")
if not user:
return redirect(url_for("login"))
return f"Hello, {user}! This is your profile."

@app.route("/logout")
def logout():
session.pop("user", None)
return redirect(url_for("login"))

if __name__ == "__main__":
app.run(debug=True)


Что здесь важно:

- session работает почти как обычный словарь.
- session["user"] = username — «залогинили» пользователя.
- session.get("user") — проверяем, авторизован ли он.
- session.pop("user", None) — «разлогин».

Flask хранит содержимое сессии в зашифрованном cookie, поэтому:

1. Не кладите туда большие данные (лимит cookie обычно до 4 КБ).
2. Не храните критически важные секреты (например, пароли в открытом виде).
3. Обязательно ставьте нормальный SECRET_KEY (случайную строку, а не 123).

Чтобы чуть усложнить пример, добавим флаг администратора:

@app.route("/set_admin")
def set_admin():
session["is_admin"] = True
return "Admin mode enabled"

@app.route("/admin")
def admin_panel():
if not session.get("is_admin"):
return "Access denied", 403
return "Welcome to admin panel"


Так шаг за шагом можно строить авторизацию, роли, корзину покупок, настройки пользователя — всё это основано на сессиях. Flask делает работу с ними максимально простой: по сути вы просто модифицируете словарь, а фреймворк сам связывает его с конкретным пользователем.
👍1
- Управление Docker-контейнерами с Python через библиотеку docker-py.
Управление Docker-контейнерами с Python через docker-py

Когда проекты перестают помещаться в одну виртуалку, на сцену выходит Docker. Но почему бы не управлять контейнерами прямо из Python-скрипта, а не вручную через консоль? Для этого есть библиотека docker-py (официальный Docker SDK для Python).

---

### Установка и подключение

pip install docker


Простейшее подключение:

import docker

client = docker.from_env()


from_env() находит Docker-демон по переменным окружения (обычно это сокет /var/run/docker.sock).

---

### Запуск контейнера одной строкой

Запустим контейнер с nginx в фоне:

import docker

client = docker.from_env()

container = client.containers.run(
"nginx:latest",
detach=True,
ports={"80/tcp": 8080},
name="my_nginx"
)

print(container.id)


Что происходит:
- detach=True — контейнер не блокирует скрипт;
- ports пробрасывает 80 порт контейнера на 8080 хоста;
- name задает читаемое имя, чтобы не искать по ID.

---

### Список и состояние контейнеров

containers = client.containers.list(all=True)
for c in containers:
print(c.name, c.status)


Пара полезных статусов: running, exited, created.

Остановить и удалить контейнер по имени:

container = client.containers.get("my_nginx")
container.stop()
container.remove()


---

### Запуск команды внутри контейнера

Представьте, что нужно выполнить миграции или тесты внутри образа:

container = client.containers.run(
"python:3.12-slim",
command="python -c \"print('Hello from container')\"",
detach=True
)

logs = container.logs().decode("utf-8")
print(logs)
container.remove()


logs() возвращает вывод процесса — можно использовать для автоматизации CI/CD.

---

### Управление образами

Скачать образ, если его еще нет:

image = client.images.pull("redis:7-alpine")
print(image.tags)


Удалить неиспользуемый образ:

client.images.remove("redis:7-alpine")


---

### Где это пригодится

- написание своих mini-оркестраторов и утилит;
- автоматизация локальной разработки (запустить БД, брокер, кэш одной командой);
- тестирование в изолированном окружении с разными версиями Python и библиотек.

docker-py превращает Docker в обычную Python-библиотеку: вместо больших bash-скриптов — компактные и читаемые Python-программы.
3
- Как организовать загрузку данных с помощью многопоточного кода.
Python для начинающих: ускоряем загрузку данных с помощью многопоточности

Один файл скачать — не проблема. Десять — уже раздражает. Сто, тысяча — и ваш скрипт начинает жить своей жизнью, а вы смотрите, как он медленно тянет данные по одному запросу. Исправляем это с помощью многопоточности.

### Когда имеет смысл использовать потоки?

В Python потоки отлично подходят для задач, которые ждут сеть или диск, а не процессор:
— скачивание файлов;
— запросы к API;
— парсинг веб-страниц.

Ограничение GIL мешает распараллеливать чистые вычисления, но для сетевых операций потоки дают реальный прирост.

### Базовый вариант: requests + ThreadPoolExecutor

Сначала — наивный однопоточный код:

import requests

urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
]

def fetch(url: str) -> str:
resp = requests.get(url, timeout=5)
return f"{url} -> {resp.status_code}"

for url in urls:
print(fetch(url))


Каждый запрос ждет завершения предыдущего. Время ≈ сумме задержек.

Теперь включим многопоточность:

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
]

def fetch(url: str) -> str:
resp = requests.get(url, timeout=5)
return f"{url} -> {resp.status_code}"

with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(future_to_url):
print(future.result())


Что изменилось:

- ThreadPoolExecutor создает пул потоков.
- max_workers — максимальное число параллельных задач.
- executor.submit планирует выполнение функции fetch.
- as_completed позволяет обрабатывать результаты по мере готовности.

Теперь общее время будет примерно равно самому «длинному» запросу, а не сумме всех.

### Как выбрать max_workers?

Небольшое правило для сетевых задач:
max_workers ≈ 5–20 для начала.
Слишком мало — медленно. Слишком много — забьете сеть или получите лимиты от API.

### Скачивание и сохранение файлов

Минимальный пример, который реально полезен:

from concurrent.futures import ThreadPoolExecutor
import requests
from pathlib import Path

urls = [
"https://httpbin.org/image/png",
"https://httpbin.org/image/jpeg",
]

output_dir = Path("downloads")
output_dir.mkdir(exist_ok=True)

def download_file(url: str) -> str:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
filename = output_dir / url.split("/")[-1]
with open(filename, "wb") as f:
f.write(resp.content)
return f"Saved: {filename}"

with ThreadPoolExecutor(max_workers=4) as executor:
for result in executor.map(download_file, urls):
print(result)


Здесь executor.map удобен, когда нужно просто применить одну функцию ко множеству аргументов и собрать все результаты.

---

Многопоточность для загрузки данных — это простой способ превратить медленный последовательный скрипт в уверенный «пулевой поезд». Главное — использовать ее там, где программа ждет сеть, а не считает числа.
👍2
- Создание базового приложения для управления расписанием с визуализацией.
Создание базового приложения для управления расписанием с визуализацией

Когда задачи и встречи начинают жить собственной жизнью, пора заводить им цифровой «зоопарк» — небольшое приложение для расписания. Сделаем его на Python: будем хранить события, отображать их в виде таблицы и визуализировать нагрузку по дням.

### Шаг 1. Структура данных для расписания

Начнем с простой модели: список словарей, где каждое событие — это дата, время и описание.

from datetime import datetime

schedule = []

def add_event(date_str, time_str, title):
event_datetime = datetime.strptime(
f"{date_str} {time_str}", "%Y-%m-%d %H:%M"
)
schedule.append({
"datetime": event_datetime,
"title": title
})

add_event("2025-01-10", "09:00", "Morning meeting")
add_event("2025-01-10", "14:30", "Code review")
add_event("2025-01-11", "11:00", "Gym")


### Шаг 2. Текстовая «визуализация» в виде таблицы

Отсортируем события и красиво выведем их в консоли.

def print_schedule():
events_sorted = sorted(schedule, key=lambda e: e["datetime"])
print(f"{'Date':<12} {'Time':<6} Title")
print("-" * 40)
for e in events_sorted:
d = e["datetime"].strftime("%Y-%m-%d")
t = e["datetime"].strftime("%H:%M")
print(f"{d:<12} {t:<6} {e['title']}")

print_schedule()


Уже похоже на мини-органайзер, но хочется наглядности.

### Шаг 3. Визуализация загрузки по дням с matplotlib

Теперь посчитаем, сколько событий в каждый день, и построим столбчатую диаграмму. Для этого пригодится модуль collections и библиотека matplotlib.

from collections import Counter
import matplotlib.pyplot as plt

def plot_daily_load():
dates = [e["datetime"].date() for e in schedule]
counter = Counter(dates)

days = sorted(counter.keys())
counts = [counter[d] for d in days]

plt.bar([d.strftime("%Y-%m-%d") for d in days], counts, color="skyblue")
plt.xlabel("Date")
plt.ylabel("Events count")
plt.title("Schedule load by day")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

plot_daily_load()


Теперь видно, какие дни перегружены, а где можно добавить еще пару дел.

### Шаг 4. Куда развиваться дальше

Идеи для улучшения:

- сохранять расписание в json или sqlite3;
- добавлять фильтр по дате/ключевому слову;
- заменить консольный вывод на tkinter-интерфейс;
- выделять цветом важные события на графике.

Так из нескольких десятков строк кода получается живое мини-приложение, которое уже помогает управлять временем и показывает картину загрузки глазами Python.
👍5