Команда Ozon Tech готовит мощнейший трек по бэкенду в рамках своей конференции E-CODE 💙
И это только одна из причин, почему стоит быть там 13-14 сентября. А ещё: качественный нетворк, 1х1 с топовыми IT-экспертами и HR, эксклюзивный мерч и интерактивы, вечеринки с участием НТР, Заточки, ILWT и Нейромонаха Феофана.
Успейте зарегистрироваться. Это обязательно⬅
И это только одна из причин, почему стоит быть там 13-14 сентября. А ещё: качественный нетворк, 1х1 с топовыми IT-экспертами и HR, эксклюзивный мерч и интерактивы, вечеринки с участием НТР, Заточки, ILWT и Нейромонаха Феофана.
Успейте зарегистрироваться. Это обязательно
Please open Telegram to view this post
VIEW IN TELEGRAM
🎯 Задача (Go, продвинутая конкуррентность; версия: Go 1.21+)
Сделай универсальную функцию обработки массива с контролем параллельности, сохранением исходного порядка результатов и мгновенной отменой при первой ошибке.
🟠 Требования:
- Сигнатура:
Process[T any, R any](ctx context.Context, in []T, worker func(context.Context, T) (R, error), parallelism int) ([]R, error)
- Параллельная обработка не более parallelism задач одновременно.
- Результаты возвращаются в том же порядке, что и входной срез, даже если отдельные задачи завершаются вразнобой.
- При первой ошибке:
- немедленно отменить все ещё выполняющиеся задачи,
- вернуть первую ошибку,
- не оставить «утекших» горутин.
- Учитывать ctx.Done() и корректно завершаться по таймауту/отмене.
- Без внешних зависимостей; только стандартная библиотека.
🟠 Подсказка:
- Используй context.WithCancelCause для распространения причины отмены.
- Организуй пул рабочих через буферизованный канал с задачами.
- Результаты складывай по индексу, чтобы сохранить порядок.
- Для потокозащищённой фиксации первой ошибки используй sync.Once.
Ниже — эталонная реализация и пример использования.
Код (Go 1.21+):
🟠 Как проверить:
- go run . — запусти несколько раз, чтобы увидеть разные порядки завершения, но стабильный порядок результатов.
- Поменяй порог ошибки в worker (например, x%3==0), чтобы убедиться, что отмена срабатывает мгновенно и горутины не висят.
- Проверь на гонки: go run -race .
Сделай универсальную функцию обработки массива с контролем параллельности, сохранением исходного порядка результатов и мгновенной отменой при первой ошибке.
- Сигнатура:
Process[T any, R any](ctx context.Context, in []T, worker func(context.Context, T) (R, error), parallelism int) ([]R, error)
- Параллельная обработка не более parallelism задач одновременно.
- Результаты возвращаются в том же порядке, что и входной срез, даже если отдельные задачи завершаются вразнобой.
- При первой ошибке:
- немедленно отменить все ещё выполняющиеся задачи,
- вернуть первую ошибку,
- не оставить «утекших» горутин.
- Учитывать ctx.Done() и корректно завершаться по таймауту/отмене.
- Без внешних зависимостей; только стандартная библиотека.
- Используй context.WithCancelCause для распространения причины отмены.
- Организуй пул рабочих через буферизованный канал с задачами.
- Результаты складывай по индексу, чтобы сохранить порядок.
- Для потокозащищённой фиксации первой ошибки используй sync.Once.
Ниже — эталонная реализация и пример использования.
Код (Go 1.21+):
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
type job[T any] struct {
i int
val T
}
func Process[T any, R any](
parent context.Context,
in []T,
worker func(context.Context, T) (R, error),
parallelism int,
) ([]R, error) {
if parallelism <= 0 {
return nil, errors.New("parallelism must be > 0")
}
ctx, cancel := context.WithCancelCause(parent)
defer cancel(nil)
jobs := make(chan job[T], parallelism) // лёгкая обратная давление
out := make([]R, len(in))
var wg sync.WaitGroup
var once sync.Once
var firstErr error
// Рабочие
workerFn := func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case j, ok := <-jobs:
if !ok {
return
}
res, err := worker(ctx, j.val)
if err != nil {
once.Do(func() {
firstErr = err
cancel(err) // прерываем остальных
})
return
}
// Сохраняем порядок
out[j.i] = res
}
}
}
// Старт пула
wg.Add(parallelism)
for k := 0; k < parallelism; k++ {
go workerFn()
}
// Диспетчер задач
sendLoop:
for i, v := range in {
select {
case <-ctx.Done():
break sendLoop
case jobs <- job[T]{i: i, val: v}:
}
}
close(jobs)
// Ждём завершения
wg.Wait()
// Если была отмена по ошибке — вернём её
if firstErr != nil {
return nil, firstErr
}
// Если отменил родительский контекст — вернём его причину
if err := context.Cause(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
// это cause из cancel(err), уже обработали выше
} else if err := context.Cause(parent); err != nil {
return nil, err
}
return out, nil
}
// Демонстрация: умножаем числа с случайной задержкой; каждое третье число — ошибка.
// Видно, что вывод упорядочен по входу, а отмена срабатывает на первой ошибке.
func main() {
rand.New(rand.NewSource(time.Now().UnixNano()))
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
input := []int{1, 2, 3, 4, 5, 6, 7}
parallelism := 3
worker := func(ctx context.Context, x int) (int, error) {
// эмуляция непредсказуемого времени работы
time.Sleep(time.Duration(rand.Intn(120)) * time.Millisecond)
if x%7 == 0 { // попробуй поменять условие на (x%3==0), чтобы увидеть раннюю отмену
return 0, fmt.Errorf("bad luck on %d", x)
}
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
return x * x, nil
}
}
res, err := Process[int, int](ctx, input, worker, parallelism)
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println("results:", res)
}
- go run . — запусти несколько раз, чтобы увидеть разные порядки завершения, но стабильный порядок результатов.
- Поменяй порог ошибки в worker (например, x%3==0), чтобы убедиться, что отмена срабатывает мгновенно и горутины не висят.
- Проверь на гонки: go run -race .
Please open Telegram to view this post
VIEW IN TELEGRAM
❤1