go-with-me
1.85K subscribers
80 photos
1 file
92 links
go-with-me — уютный канал, где Golang становится дружелюбным

Мы — Go-инженеры в BigTech, обожаем делиться своей экспертизой,
пишем статьи о скрытых ловушках, объясняя все на пальцах

— Проводим Go-моки

questions @lovelygopher | @prettygopher
Download Telegram
🎨 Interface Comparisons

В этот раз поговорим про сравнения интерфейсов и развеем все мифы, которые вы могли встретить на своем пути

Нюансы в этой теме важны для понимания ввиду возможности паник при использовании некорректных нижележащих сущностей

Для начала взглянем на структуру не пустого интерфейса:
type iface struct {
tab *itab
data unsafe.Pointer
}

— С полем data все прозрачно - это непосредственно те данные, которые мы закладываем при присваивании интерфейсу какого-то значения
itab содержит метаданные об интерфейсе, о реализующем его типе и методах

Ключевое: при сравнении не пустых интерфейсов мы сравниваем две структуры iface: itab и data. В случае с пустыми интерфейсами eface сравниваем не itab, а сам тип _type

Приступим к рассмотрению боевых примеров!


1. Сравнение простых композитных типов
В данном случае все достаточно тривиально, itab одинаковы и, следовательно можем приступить к сравнению data, а они как раз являются разнымии. Смело получаем false и идем дальше
func A() {
var v1, v2 any

p1 := Person{
Name: "Bob",
Age: 18,
}

p2 := Person{
Name: "John",
Age: 19,
}

v1, v2 = p1, p2

fmt.Println(v1 == v2)
}



2. Несравнимые типы
Первое, что необходимо зарубить себе на носу - такие сущности как map, slice, func являются uncomparable. При попытке сравнить таковые мы моментально получаем panic и идем в люльку (в близлежащий defer)

Go Playground (тык)


3. Композитные типы с вложенными несравнимыми сущностями
Нос мы свой не щадим и зарубаем еще одну штуку - несравнимый тип, содержащийся в структуре, аффектит сравнение всей структуры. Иначе говоря, все поля структуры (включая вложенные) должны быть сравнимыми
type UncomparablePerson struct {
Name string
Age uint8
Pets []string
Things map[string]struct{}
Action func()
}



3.1 Различные типы и вложенно несравнимые сущности
Мы дошли до нетривиального случая. Имеем следующий расклад:
— Наши _type разные, ввиду того, что типы разные
— Один из типов вложенно хранит в себе множество несравнимых типов

Наивным предположением было бы думать, что мы словим панику, но как бы не так!

Сравнение двух структур (eface или iface) происходит линейно (как и с массивами): сначала сравниваются поля (tab или _type), потом data. Так как типы у нас разные, поэтому сравнения полей data не будет

Получаем заслуженный false и идем дальше!
func C() {
var v1, v2 any

p := Person{
Name: "Bob",
Age: 18,
}

up := UncomparablePerson{
Name: "John",
Age: 21,
Pets: nil,
Things: map[string]struct{}{
"ball": {},
},
}
up.Action = func() {
fmt.Printf("My age is %s\n", up.Name)
}

v1, v2 = p, up

fmt.Println(v1 == v2) // false
}



3.2. Одинаковые типы и вложенно несравнимые сущности
Комментарий с выводом говорит сам за себя. Типы одинаковые, data содержит несравнимые сущности, получаем panic. Все тривиально

Go Playground(тык)


4. Сравнение интерфейса напрямую с неинтерфейсным значением
Мало тех, кто знает о том, что можно сравнить переменную интерфейса напрямую с каким-то значением. Происходить все будет аналогично сравнению двух интерфейсов
func E() {
var v1 any
p1 := Person{
Name: "A",
Age: 12,
}

v1 = p1

fmt.Println(v1 == p1)
}



5. Сравнение алиасов и новых типов в интерфейсах
Напоминание о том, как работают синонимы и типовые переопределения
type (
AliasedInteger = int // Синоним (type alias) для int:идентичен int, взаимозаменяем с ним
AnotherInteger int // Новый тип на базе int: не совместим с int напрямую, но имеет ту же внутреннюю структуру
)

func F() {
var v1, v2 any
i, ai, ali := 1, AnotherInteger(1), AliasedInteger(1)

v1, v2 = i, ali
fmt.Println(v1 == v2) // true

v1, v2 = i, ai
fmt.Println(v1 == v2) // false

castAiToInt := int(ai)
v1, v2 = i, castAiToInt
fmt.Println(v1 == v2) // true
}


Надеемся, что материал для вас был полезным. Если возникли какие-нибудь вопросы, пишите в комментарии!

Статью писали с Дашей: @dariasroom

Stay tuned 👀👀
Please open Telegram to view this post
VIEW IN TELEGRAM
33👍17🔥842😘11
☀️ Новая статья многогранна и многопоточна, это наконец-то случилось!

Гофер готовится к самому загадочному и долгому исследованию. Надеемся, что вы тоже готовы!

По известной схеме: если вам не терпится, можете получить Early Access. Для этого достаточно просто забустить наш канал. Помогите нам сохранить уют, это очень важно

Кто бустил ранее и хочет заполучить ранний доступ, отпишите в личку @lovelygopher

BOOST BOOST BOOST

Stay tuned 🦴
Please open Telegram to view this post
VIEW IN TELEGRAM
5👍22🔥1
🤪 Surprising WaitGroup Panic

Часто ли вы ловили паники, используя WaitGroup? - не думаем. Сегодняшняя история об одной из таких, зарытой где-то в недрах, вас очень удивит

Не будем томить, приступаем!


Для начала рассмотрим избитую структуру:
type WaitGroup struct {
noCopy noCopy

state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}

noCopy - специфическая структура, используемая утилитой govet для маркировки откопированных значений
sema - используется внутренними функциями пакета sync, чтобы реализовать блокировку и разблокировку
state - отвечает за счетчик и количество ожидающих горутин


Углубимся в рассмотрение поля state:
Старшие 32 бита представляют из себя счетчик, который инкрементируется при вызовах Add(N) и декрементируется при вызовах Done()
Младшие 32 бита являются количеством ожидающих потоков


Теперь посмотрим на внутреннюю логику метода Add:
  func (wg *WaitGroup) Add(delta int) {
// ...

state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)

// ...

if v < 0 {
panic("sync: negative WaitGroup counter")
}

// ...
}

— Прибавляем к счетчику наш delta, сдвигая его влево на 32 бита
— Извлекаем значение счетчика из state, сдвигая его вправо на 32 бита
— Извлекаем количество ожидающих потоков из state, что нас не особо интересует :)

Ключевое здесь то, что инкрементация происходит до всех проверок


Смотря на этот код, мы задумались о том, а что если наш counter окажется переполненным? Ответ не заставил себя долго ждать. И вот почему!

Семантически счетчик воспринимается как int32, то есть один старший бит отвечает за знак, а остальные за мантиссу (само число). В случае, когда счетчик был инкрементирован методом Add(N) до состояния, когда все младшие биты установлены в 1 (счетчик имеет значение 2_147_483_647) и к этому значению мы прибавляем 1, то счетчик становится переполненным и вместо ожидаемого значения 2_147_483_648, мы получаем значение -2_147_483_648

Вот, что происходит на битовом уровне:
До Add(1):
Двоичное:   01111111 11111111 11111111 11111111
int32: 2147483647


После Add(1):
Двоичное:   10000000 00000000 00000000 00000000
int32: -2147483648


Итого: переменная v в методе Add будет иметь отрицательное значение, что вызовет панику


Для воспроизведения этого сценария вы можете воспользоваться следующими двумя сниппетами
var wg sync.WaitGroup

wg.Add(math.MaxInt32)
// goroutines launch...

wg.Add(1) // panic: sync: negative WaitGroup counter
// goroutine launch...


var wg sync.WaitGroup
for range math.MaxInt32 + 1 {
wg.Add(1) // panic: sync: negative WaitGroup counter
// goroutines launch...
}



Мораль
WaitGroup не счетная палата и не готова к отслеживанию популяции бактерий на Венере. Пристально следите за своими Add'ами и Done'ами, не отпуская их на волю, и контролируйте количество потоков в вашей системе

Статью писали с Дашей: @dariasroom

Stay tuned ❤️
Please open Telegram to view this post
VIEW IN TELEGRAM
😱168👍53🗿1
🤪 WaitGroup Pitfalls

Сегодня речь пойдет про крайние случаи при работе с WaitGroup, о которой мы уже говорили в предыдущем посте (тык). Советую обратиться к нему с целью изучения внутренней структуры этой сущности и понимания того, что такое counter

Предлагаю незамедлительно приступить к делу!


1. Паника при негативном счетчике
Запомните ключевое - Done под капотом является вызовом Add со значением -1 (тык).
Основываясь на внутрянке метода Done и внутренней имплементации метода Add, в котором заложена паника после инкрементации аргумента delta (тык & тык), можно вывести следующие правила:
— При прямом вызове Add с отрицательным аргументом delta необходимо быть уверенными в том, что мы не уйдем в отрицательный counter
— Количество вызовов Done должно быть равно сумме аргументов всех Add
func A() { 
var wg sync.WaitGroup
wg.Add(1)


go func() {
defer wg.Done() // panic: sync: negative WaitGroup counter
wg.Done()
}()


wg.Wait()
}



2. Паника при Add после Wait
В случае неполного выхода из ожидания на Wait и мгновенного вызова Add происходит зашитая в метод Wait паника (тык)

По своей сути WaitGroup является переиспользуемой сущностью, то есть мы в праве запустить новый пул потоков для ожидания, если к этому моменту будем иметь обнуленный счетчик counter и полный выход из метода Wait

Не следуя этому правилу, мы получим панику, что и происходит при запуске данного сниппета:
func B() {
const N = 10_000


var wg sync.WaitGroup
for range N {
wg.Add(2)


go wg.Done()
go wg.Done()


go func() {
wg.Wait() // panic: WaitGroup is reused before previous Wait has returned
}()
}
}



3. Каждый Add сопровождается Done
Этот кейс предельно тривиален. В случае, когда сумма всех аргументов не соотносится количеству вызовов Done мы получим вечное ожидание потока, где прожат Wait, следовательно мы должны строго следить за тем, чтобы после каждого Add происходило соответствующее количество вызовов Done
func C() {
const N = 1_000


var wg sync.WaitGroup


wg.Add(N)
for i := 0; i < N; i++ {
if i%2 == 0 {
continue
}


wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i)
}(i)
}


wg.Wait()
}



4. Мультиплексирование Wait на несколько потоков
Мало кто знает, но на одном экземпляре WaitGroup можно вызывать Wait в разных потоках, а это значит, что мы можем строить отношения вида N:M, где N - потоки, ответственные за вызов wg.Done(), а M - ожидающие потоки
func F() {
const (
N = 10
M = 5
)


var wgWork, wgWait sync.WaitGroup


for i := 0; i < N; i++ {
wgWork.Add(1)
go worker(&wgWork, i)
}


for i := 0; i < M; i++ {
wgWait.Add(1)
go waiter(&wgWork, &wgWait, i)
}


wgWait.Wait()
}


func worker(wg *sync.WaitGroup, idx int) {
defer wg.Done()


fmt.Printf("[%d] worker start\n", idx)
time.Sleep(5 * time.Second)
fmt.Printf("[%d] worker end\n", idx)
}


func waiter(wgWorkers, wgWaiters *sync.WaitGroup, idx int) {
defer wgWaiters.Done()


fmt.Printf("[%d] waiter start\n", idx)
wgWorkers.Wait()
fmt.Printf("[%d] waiter end\n", idx)
}


Статью писали с Дашей: @dariasroom

Stay tuned 🐈
Please open Telegram to view this post
VIEW IN TELEGRAM
13🔥541
☀️ Новая статья бьёт по всем фронтам — Fan Out, Fan In, bounded и unbounded — мы разложили всё по полочкам!

Гофер снова в деле и готов к самому шумному и многопоточному эксперименту

По известной схеме: если вам не терпится, можете получить Early Access. Для этого достаточно просто забустить наш канал. Помогите нам сохранить уют, это очень важно

Кто бустил ранее и хочет заполучить ранний доступ, отпишите в личку @lovelygopher

BOOST BOOST BOOST

Stay tuned 😬
Please open Telegram to view this post
VIEW IN TELEGRAM
9331
😁 Напоминание: Мы проводим Go мок-собеседования


Для чего это вам?
Мок позволяет предметно оценить свои знания, снять лишний стресс перед реальным собесом и понять, где именно зарыты пробелы

Что будет на моке?
— Разберём теорию от основ до глубоких нюансов
— Погоняем лайвкодинг с задачами уровня реального собеса
— Разберём concurrency-паттерны и оптимизации


⚙️Ведём вдвоём — будет чуть сложнее, но интереснее!

Дадим честный и подробный фидбэк с материалами для прокачки


Записаться: @lovelygopher | @prettygopher
Отзывы:
тык
Please open Telegram to view this post
VIEW IN TELEGRAM
711
Concurrency Patterns. Fan Out

Для начала разберемся в термине Fan Out. Частенько возникают такие ситуации, когда нам необходимо распространить одно и то же значение на несколько потоков, используя каналы. С этой задачей отлично справляется паттерн Fan Out

Вы наверняка миллион раз использовали его, но вопрос только в том, насколько осознанно вы это делали. Поэтому на сегодняшней повестке дня вариативность реализаций данного паттерна

В базовом случае мы имеем чот такое:
package main

import (
"fmt"
"math/rand"
"time"
)

func main() {
src := make(chan int)
dest1 := make(chan int)
dest2 := make(chan int)

// Читатели
go func() {
for v := range dest1 {
fmt.Printf("dest1 read %d\n", v)
}
}()
go func() {
for v := range dest2 {
fmt.Printf("dest2 read %d\n", v)
}
}()

// Fan Out
go func() {
for v := range src {
dest1 <- v
dest2 <- v
}
close(dest1)
close(dest2)
}()

// Генерация данных
go func() {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 5; i++ {
src <- rnd.Intn(10)
}
close(src)
}()

time.Sleep(time.Second)
}


Fan In в свою очередь является обратным паттерном, тобишь он собирает пачку значений из N каналов и мапит их в один

Далее речь будет идти в терминах Fan Out
Будет душно. Начнем!


1. Синхронный
Так как выполнение у нас синхронное, мы просто итерируемся по каждому из каналов dests и записываем в них числа из src
Playground пример


2. Синхронный с рандомизацией
Представьте, что в вашей пачке каналов для записи есть какой-то бесячий dest, в который мы вечно долго записываем из-за какого внутреннего процессинга. Имея детерминированную последовательность dests, мы рискуем постоянно зависать в одном месте, не давая другим потокам выполнять свой контекст

Решение — использовать range по мапе с каналом в качестве ключа. Так как перебор по мапе у нас недетерминирован, мы случайным образом меняем порядок обхода получателей и тем самым более равномерно распределяем записи
Playground пример


3. С пропуском записи
Суть этого метода: не блокировать поток, если какой то из dests не готов принять данные. Используем default для неблокирующего select
Playground пример


4. Bounded concurrent-parallel
Если до этого мы записывали все значения линейно, то в этом случае мы имеем утилизируем возможность распараллеливания, но с сохранением порядка записи данных через sync.WaitGroup
Playground пример


5. Unbounded concurrent-parallel

Внимание: WaitGroup вынесена за scope всех циклов

Unbounded — неограниченное количество горутин, при которых не сохраняется порядок записи в каналы. Это значит, что в отличие от bounded, горутины на запись разных значений из src могут замапиться на разные очереди виртуальных процессоров P. Это приводит к недетерминированной записи. К примеру: значение 1 будет записано после значения 2

Также стоит учитывать, что мы можем насоздавать безграничное число потоков, забив оперативку ресурсами, которые лежат на наших потоках
Playground пример


6. Unbounded concurrent-parallel with semaphore
Для ограничения количества порождаемых горутин можно использовать semaphore для ограничения количества потоков
Playground пример


Статью писали с Дашей: @dariasroom

Stay tuned 🦄
Please open Telegram to view this post
VIEW IN TELEGRAM
16👍2🔥2💯1
🥚 True Tech Go: митап для Go-разработчиков от МТС

🗓 Когда? 11 сентября 2025 | 19:00 по Москве

🌐 Формат — офлайн и онлайн


Список докладов:
Дмитрий Мамыкин (T-Bank) - WaitGroup Pitfalls — разберём, как привычный sync.WaitGroup может приводить к паникам, заглянем под капот, посмотрим на устройство и работу Add, обсудим альтернативы (conc.WaitGroup, sync.errgroup) и поделимся практиками для безопасного использования

Виталий Левченко (Wildberries) - Рассмотрим примеры использования LLM для автоматизации рутинных задач, повышения производительности, улучшения читаемости и надежности кода

Владимир Марунин (MWS) - Разберём обновлённый в Go 1.24 sync.Map: как он обходится без глобальных блокировок, сколько памяти съедает на запись и зачем тут пакет uniq

Роман Ерема (MWS) - В докладе разберём создание Cloud Controller Manager для интеграции Kubernetes с API MWS: как он связывает кластер с облаком, управляет нодами и балансировщиками, проводит авторизацию, обрабатывает метаданные и health-чеки, работает с имперсонацией сервисных аккаунтов и доставкой MWS на клиентские мастера


🖥 Резюме
True Tech Go — отличный повод встретиться с единомышленниками и перенять опыт коллег из BigTech. Не упусти шанс прокачать навыки и расширить горизонты!

🔗 Присоединяйся! Для участия зарегистрируйся по ссылке
Please open Telegram to view this post
VIEW IN TELEGRAM
11🔥5😱21🦄1
💭 Опрос

Ребята, напишите в комментарии о том, что хотели бы увидеть в многопоточке.

Хотим слышать вас!


👀 Stay tuned!
Please open Telegram to view this post
VIEW IN TELEGRAM
8
⚡️ Concurrency Patterns. Fan In
В предыдущем посте мы немного напутали с определением такого многопоточного паттерна как Fan Out

На деле мы показывали Tee, который распространяет одно и то же значение V из канала-источника на N каналов-потребителей

Отличие Fan Out от Tee в том, что на N каналов распространяются разные значения из одного канала-источника. Тобишь, воркеры тянут значения из одного канала, борясь за них насмерть

Лирическое отступление закончено, наша совесть чиста, а сегодняшняя тема будет посвящена Fan In

Этот паттерн является обратным для Fan-Out. Мы собираем данные из нескольких каналов-источников и направляем их в один общий канал-потребитель


1. Default
Итак, что мы имеем?
— Есть воркеры — они кладут значения в N каналов и являются продьюсерами
— Каждый из этих N каналов будет получать значения от своего продьюсера. Назовем такие каналы "стоковыми"
— Есть один общий канал out, туда будет нужно отправить все значения из стоковых каналов
— Для этого мы запускаем N потоков, каждый из которых слушает свой стоковый канал, куда кладет значения продьюсер и редиректит все значения в out

Playground пример

Но что же будет, если какой-то наш продьюсер потух и больше не шлет никаких значений, а переданный контекст не отменен? Как бы нам понять, что воркер не является активным и перезапустить его? — в этом нам поможет такой механизм как "Heartbeats"

Heartbeat — это регулярное сообщение от продьюсера/воркера, подтверждающее, что он жив и работает


2. Heartbeats
Приступим к рассмотрению этого чуда!

Основная идея проста:
— Имеем структуру, которая хранит в себе стоковый канал, используемый как пайп между воркером и стоком, и канал "сердцебиений"
— Функция Supervise ответственна за отслеживание "сердцебиений" и перезапуск воркера при их отсутствии по TTL
— Функция FanIn принимает на вход стоковые каналы и возвращает результирующий канал, из которого можно читать данные

Всмотримся в наши функции поподробнее

2.1. FanIn
— Не отклоняемся от цели: выкачиваем данные из стоковых каналов и перекладываем в out, реагируя на контекст и неблокирующе отправляя "сердцебиение" нашему супервизору, который пристально наблюдает за нашими воркерами
WaitGroup здесь так же используется для того, чтобы дождаться конца работы наших стоков и отдать управление основному потоку
после дренажа всех "живых" значений

2.2 Supervise
— Создаем стоковый канал и канал "сердцебиений", агрегируем эти значения в структуре Source и возвращаем ее
— В отдельном потоке запускаем нашу рутину по отслеживанию и перезапуску воркеров

2.2.1 Смотрим на внутренности запущенного потока внутри Supervise
— Изначально происходит создание дочернего контекста с отменой для нашего воркера. Этот контекст будет рулить в тот момент, когда наш TTL пройдет и надо будет потушить воркера
— Создаем ticker, который будет слать ивенты, семантически значащие следующее: "в нашего воркера стреляли и он упал в лужу на..."
После получения ивента мы отменяем контекст и воркер окончательно "задыхается в луже"
— Первично запускаем работягу в отдельном потоке
— Если ловим ивент от тикера: производим отмену контекста, переназначаем этот же контекст и функцию отмены, сбрасываем таймер, и запускаем нового воркера в отдельном потоке
— В случае, когда из стокового потока нам пришло "сердцебиение" мы просто сбрасываем таймер и движемся дальше!

Стоит отметить, что воркеры должны "реагировать" на переданный контекст. Без этого мы получим утечку потоков и черт его знает, чем нам это грозит (профилированием и
устранением проблемы, которой можно было бы и избежать)

Таким образом, мы получаем более надежный Fan In, где все источники данных контролируемы и восстанавливаемы при зависаниях

Playground пример


Статью писали с Дашей: @dariasroom

Stay tuned 😏
Please open Telegram to view this post
VIEW IN TELEGRAM
1242👎1
🥂Выпуск про Go 1.25 уже доступен / GoGetPodcast 17

https://youtu.be/fHuJNsZPCJ0

Этого ролика вам точно будет достаточно для полного понимания нововведений, вне зависимости от вашего опыта

Подробно обсудили, что нового в новой версии, зачем это нужно, как оно устроено, в сложных местах делали ликбез для лучшего понимания изменений.
Дима был очень хорош, такого подробного разбора вы больше нигде не увидите, а с комментариями Глеба оно ещё круче, очень рекомендую.

Напоминаю, что теперь я буду выкладывать новые выпуски подкаста на отдельном канале, чтобы не смешивать столь разный контент.

🟢Наш подкаст обычно собирает значительно меньше просмотров, чем прочие мои ролики. При этом я знаю, что есть люди, которые очень его любят. Поэтому, если он вам интересен, вы можете помочь алгоритмам Ютуба в его продвижении:

- Досмотривайте выпуски до конца
- Подписывайтесь на канал
- Ставьте лайки
- Делитесь с друзьями и коллегами

Это правда очень важно.

🫶 Чем быстрее растёт аудитория подкаста, тем чаще будут выходить новые выпуски, и тем больше я буду вкладываться в их качество.

Несмотря на скромные показатели, я всё же возобновил регулярные выпуски, как вы могли заметить. Потому что хороших подкастов по Go сейчас практически нет, и кто-то ведь должен этим заниматься 😩
Очень надеюсь на вашу поддержку.

#gogetpodcast #news
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥54
Презентация моего доклада для тех, кто просил!

Писали вместе с Дашей:
@dariasroom 😍
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥14
139🔥21🏆2💯1
😠 Concurrency Patterns. Or Done Channel

Отгремев на MTC True Tech Go и собравшись с силами, мы решили порадовать вас новым постом!

Сегодня вещаем о таком механизме как Or Done Channel

Or Done Channel применим, когда мы выполняем какую-то операцию и зависимы от пачки ивентов завершения. Под пачкой ивентов подразумевается множество каналов, которые в неопределенный момент присылают токен завершения (к примеру пустую структуру)

Пу-пу-пу, погнали!


Обсудим использование этого паттерна на примере обычной перекладки значения из одного канала в другой

1. Базовый случай
func OrDone[T any](src <-chan T, done <-chan struct{}) <-chan T {
out := make(chan T)

go func() {
defer close(out)
for {
select {
case <-done:
return
case v, ok := <-src:
if !ok {
return
}

select {
case <-done:
return
case out <- v:
}
}
}
}()
return out
}

В простейшем примере мы просто забираем значение из src и кладем его в результирующий канал out, но при этом завершаем работу при получении ивента из done

Здесь все достаточно просто:
— Создаем результирующий канал, немедленно возвращаемый из функции
— Запускаем поток вычитки из src и перекладки значения в out
— Постоянно реагируем на ивент из done как при вычитке, так и перекладке

Получаем механизм, который уж очень похож на работу с переданным контекстом! Все становится сложнее, когда таких каналов с ивентами отмены становится несколько. Сейчас мы на это и взглянем!


2. Множество каналов с ивентами завершения
func OrAny(dones ...<-chan struct{}) <-chan struct{} {
switch len(dones) {
case 0:
closed := make(chan struct{})
close(closed)
return closed
case 1:
return dones[0]
}

out := make(chan struct{})

go func() {
defer close(out)

switch len(dones) {
case 2:
select {
case <-dones[0]:
case <-dones[1]:
}
}

m := len(dones) / 2
select {
case <-OrAny(dones[:m]...):
case <-OrAny(dones[m:]...):
}
}()
return out
}

В таком кейсе мы будем использовать рекурсивный подход

2.1 Проверяем базовые случаи рекурсии
— Если длина массива каналов с ивентами завершения равна 0, то возвращаем канал, который сразу закрывается
— Если канал с ивентом завершения всего лишь один, возвращаем его

2.2 Рекурсивный случай
— Создаем результирующий канал верхнего уровня, который будет закрыт тогда, когда одна из рекурсивных веток будет закрыта
— Запускаем поток, в котором регистрируем defer секцию с закрытием результирующего канала
— Проверяем крайний случай, когда длина массива с каналами равна двум
— Пройдя все корнер-кейсы, разбиваем массив на две части и запускаем эту же функцию рекурсивно и кладем результирующие каналы из этих частей в select. Этим мы запускаем шайтан-машину отмены!


Теперь нам осталось склеить два наших кейса и получить очень удобный инструмент, давайте это и сделаем:
func OrDoneN[T any](src <- chan T, dones ... <- chan T) <-chan T {
return OrDone(src, orAny(dones...))
}


Блюдо готово к подаче!


Посмотрим на юзкейсы
1. Pipeline + Or Done
Имеем цепочку конвейера: stage1 -> ... -> stage# и хотим оборвать ее в случае получения ивента:
for v := range OrDone(done, in) { 
out <- process(v)
}


2. Fan In + Or Done
Производим слияние нескольких каналов-продьюсеров в один результирующий канал и аналогично завершаемся по ивенту:
for _, ch := range inputs {
go func(ch <-chan T) {
for v := range OrDone(done, ch) {
out <- v
}
}(ch)
}


3. Fan Out + Or Done
Шарим канал с тасками в множество потоков и так же завершаем наших воркеров по ивенту:
func worker(done <-chan struct{}, tasks <-chan Task) {
for t := range OrDone(done, tasks) {
handle(t)
}
}


В итоге, OrDone — это маленький, но очень мощный строительный блок для конвейеров. Он делает код безопасным к отмене и защищает от утечек ваших потоков.
OrAny расширяет его на несколько сигналов. Вместе это формирует паттерн, который стоит держать в кармане каждому


Статью писали с Дашей: @dariasroom

Stay tuned 🔥
Please open Telegram to view this post
VIEW IN TELEGRAM
106🔥532111
👻 Concurrency Patterns. Bridge

Следующая ступень после Or Done Channel паттерна — Bridge. Сначала разберемся с какой ситуацией помогает бороться эта модель

Представим, что у нас время от времени динамически появляются каналы с данными, и их число заранее неизвестно. Все эти стримы мы хотим слить в единый канал, откуда и планируем получать данные. Наш потребитель как раз и будет высасывать все-все из этого стрима

Кратенько:
— Источники (каналы) создаются динамически (по событиям / шардам / тайм-слайсам)
— Внутренние каналы имеют свой lifecycle, по истечении которого закрываются
— Потребителю нужна единая точка чтения, без координации множества каналов

Перед освоением данного материала рекомендуем обратиться к статье про Or Done Channel

Ну что ж, поехали!


1. Synchronous Bridge
В синхронной версии бриджа мы будем дрейнить данные из поступающих стримов последовательно, то есть вычитывать все из полученного стрима и только потом переходить к следующему. Так же мы будем использовать Or Done паттерн, чтобы избавить себя от лишних мук с обработкой завершения вычитки:
Playground пример

Что мы здесь видим?
— Результирующий канал out, мгновенно возвращающийся из функции
— Канал done, содержащий ивенты завершения нашей работы
— Канал streams, спавнящий нам каналы для вычитки данных и последующей перекладки в out

Обсудим процесс подробнее:
— Внутри нашего потока мы создаем переменную stream, хранящую текущий стрим вычитки
— Спинимся в бесконечном цикле в поисках нового стрима, реагируя на ивент завершения
— Как только находим новый стрим для вычитки, мы мигом оборачиваем его в OrDone, чтобы закрыть канал по окончании вычитки
— Далее мы просто вычитываем из него данные и перекладываем в результирующий канал

В целом, ничего сложного, но есть нюанс. Нюанс возникает тогда, когда в канал streams насыпают ну ож очень много стримов на единицу времени. Синхронная модель в данном случае будет наступать нам на горло, находясь в ожидании N-го стрима. После профилирования лочек на каналах вы покрутите у виска и придете к мысли, что надо бы эту проблему решить. А решение просто — распараллелить обработку стримов


2. Asynchronous Bridge
Исходя из наших потребностей мы пишем вот такую портянку:
func BridgeSemaphore[T any](done <-chan struct{}, streams <-chan <-chan T, parallel int) <-chan T {
if parallel <= 0 {
parallel = 1
}

out := make(chan T)

go func() {
defer close(out)

var wg sync.WaitGroup
sem := make(chan struct{}, parallel)

fwd := func(stream <-chan T) {
defer func() { <-sem }()

for v := range OrDone(done, stream) {
select {
case <-done:
return
case out <- v:
}
}
}

for {
select {
case <-done:
wg.Wait()
return

case s, ok := <-streams:
if !ok {
wg.Wait()
return
}

sem <- struct{}{}
wg.Go(func() {
fwd(s)
})
}
}
}()

return out
}

Видим мы уж очень много знакомого, но стоит обсудить нововведения, а именно:

2.1 WaitGroup
Необходимость наличия группы здесь обусловлена нуждой дождаться всех наших воркеров, усердно сливающих данные в канал out. Именно поэтому в ветках select'a при выходе из функции дочернего потока мы прожимаем wg.Wait()

2.2 Semaphore
При резво-растущем количестве стримов мы можем столкнуться с проблемой, когда дочерних потоков будет создаваться безмерное количество. С целью подавить это мы используем самый простецкий семафор, сделанный на канале пустых структур. Можно было бы это провернуть, используя Worker Pool:
Playground пример

2.3 Forwarder Closure
Функция fwd инкапсулирует логику вычитки из стрима с использованием семафора и OrDone канала

Получили уж очень хороший мостик!


Сегодня вы заимели в арсенал еще один паттерн, чему мы очень рады


Статью писали с Дашей:
@dariasroom

Спасибо, что читаете и остаетесь с нами!
Stay tuned 🧑‍💻
Please open Telegram to view this post
VIEW IN TELEGRAM
8🔥542
🏦 T-Meetup: Golang — встреча Golang-разработчиков в новом ИТ-хабе в Санкт-Петербурге!

Спикеры из Т-Банка
— Обсудят разработку в экосистеме Kubernetes и современные архитектурные практики
— Расскажут, как создавать контроллеры с помощью KubeBuilder и как эволюционирует реализация Outbox в core-сервисах
— Поделятся реальными кейсами и сложностями, с которыми сталкивались на пути развития решений.

⭐️ После докладов вас ждет экскурсия по флагманскому IT-хабу!

⚡️ Запись будет у стойки регистрации перед началом митапа

🗓 30 сентября в 19:00
📍 Санкт-Петербург, Свердловская набережная, 44, стр. 2, БЦ Ferrum II, конференц-зал, 1-й этаж

Регистрируйтесь и зовите всех, кто интересуется современными подходами к построению систем

LINK
Please open Telegram to view this post
VIEW IN TELEGRAM
5👍3🔥2221
Ребята, в моей команде есть очень классный дизайнер🐾

Настя будет очень рада вашему вниманию!

https://t.me/shenkree
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥4