4

Разбираем паттерны конкурентности

База

Параллельность - выполнение задач в один момент времени на разных логических ядрах.

Конкурентность - выполнение задач последовательно, но со сменой контекста на другую задачу в ожидание завершения иной задачи. У пользователя может возникнуть иллюзия многозадачности даже в однопроцессорной системе, поскольку смена контекста происходит быстро (микросекунды).

Процессы:

  • Раздельная память

  • Раздельные ресурсы

  • Раздельные регистры

Потоки:

  • Общая память

  • Общие ресурсы

  • Раздельные стэк и регистры

Горутины:

  • Общая память

  • Общие ресурсы

  • Общий системный стэк

  • Общие регистры

Go runtime представляет модель P:M:G.

P - представляет логическое ядро процессора.

M - поток ОС по числу процессоров P.

G - структура, которая выполняет переданную функцию, создаётся по необходимости, минимум одна на старте программы (main). Стэк всего 2кб, может расширятся до 1гб для 64x и до 250кб для 32х систем.

Управление горутинами осуществляется планировщиком Go, а не ОС. Планировщик Go работает в пользовательском пространстве. Мы не можем напрямую управлять на каком процессоре будет исполняться горутина, за это отвечает планировщик.

Канал - очередь сообщений, которая умеет работать в многопоточной среде, работает по принципу FIFO.

Есть два типа каналов: буферизованный и небуферизованный.

Первый может хранить несколько сообщений, второй только одно.

Синхронизация

sync.WaitGroup - счётчик, который позволяет подождать завершения горутин.

sync.Mutex - блокирует доступ к ресурсу.

sync.RwMutex - разделяемая блокировка на чтение и запись. Читать могут несколько горутин, но мутировать данные только одна.

sync.Atomic - атомарная операция чтения и записи. Работает только с простыми значениями.

sync.Map - lock-free структура. Работает так же, как и обычная map, но потокобезопасная, можно использовать в многопоточной среде. Хорошо подходит для случаев, где надо много читать и мало писать. Если надо много писать, то лучше использовать обычную map и sync.RwMutex.

Небуферизованный канал

Буферизованный канал

Ограничения канала

Важные правила

  • Закрывает канал тот, кто в него пишет.

  • Если пишет несколько продюсеров, то закрывает тот, кто создал продюсеров.

  • Не закрытый канал держит ресурсы. Закрывать надо явно.

Паттерны

Generator

Микропаттерн, который наполняет канал. Закрываем канал, чтобы не было проблем.

func generator() <- chan int {

ch := make(chant int)

go func(){

for i := 0; i <= 12; i++ {

ch <- i + 1

}

close(ch)

}()

return ch

}

Wrapper

Оборачиваем функцию, добавляя функциональность. Если вам что-то говорит слово декоратор, то это тот самый паттерн.

func wrapper(wg *sync.WaitGroup, fn func()) {

wg.Add(1)

go func() {

defer wg.Done()

fmt.Println("Work before func")

fn()

time.Sleep(1 * time.Second)

fmt.Println("Work after func")

}()

}

func main() {

var wg sync.WaitGroup

wrapper(&wg, func() {

time.Sleep(1 * time.Second)

fmt.Println("heavy work")

})

wg.Wait()

}

Fan-in

Собирает результаты из нескольких каналов в один.

func fanIn(input1, input2 <-chan string) <-chan string {

ch := make(chan string)

go func(){

for {

select {

case s := <-input1: ch <- s

case s := <-input2: ch <- s

}

}

}()

return ch

}

Fan-out

Одна или несколько горутин пишут в канал, с другой стороны рабочие горутины читают канал, делают работу и умирают.

func worker(ch <-chan int, wg *sync.WaitGroup) {

wg.Done()

for v := range ch {

fmt.Println(v)

time.Sleep(1 * time.Second)

}

}

func sender() {

ch := make(chan int)

var wg sync.WaitGroup

for i := 0; i < 2; i++ {

wg.Add(1)

go worker(ch, &wg)

}

for i := 0; i < 10; i++ {

ch <- i

}

close(ch)

wg.Wait()

fmt.Println("done")

}

Pipeline

Данные обрабатываются цепочкой. Producer -> Producer/Consumer -> Consumer. Стадий обработки может быть сколько угодно.

func producer() <-chan int {

c := make(chan int)

go func() {

for i := 0; i <= 10; i++ {

c <- i + 1

}

close(c)

}()

return c

}

func producerConsumer(c <-chan int) <-chan int {

out := make(chan int)

go func() {

for v := range c {

out <- v * 2

}

close(out)

}()

return out

}

func consumer(ch <-chan int) {

for v := range ch {

fmt.Println(v)

}

}

Rate limiting

Хотя для rate limiter есть множество разных алгоритмов, рассмотрим один, основанный на тиках.

func ticker() {

ch := make(chan int, 5)

go func() {

for i := 0; i < 5; i++ {

ch <- i

}

close(ch)

}()

limiter := time.Tick(time.Second)

for v := range ch {

<-limiter // будем ждать секунду

fmt.Println(v)

}

}

Cancellation

Способ прерывания горутин. Необходим, чтобы избегать висящих горутин, останавливать слишком долгие операции.

1. WithCancel

func worker(ctx context.Context) {

for {

select {

case <-ctx.Done():

fmt.Println("Done")

return

default:

fmt.Println("Working...")

time.Sleep(500 * time.Millisecond)

}

}

}

func main() {

ctx, cancel := context.WithCancel(context.Background())

go worker(ctx)

time.Sleep(1 * time.Second) // работаем

cancel() // отменяем

time.Sleep(1 * time.Second) // время на завершение

}

2. WithTimeout

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) // спустя 2 секунды воркер перестанет работать

3. WithDeadline. Можно указать точное время остановки.

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) // прибавляем к текущему времени две секунды

Worker Pool

Каждый воркер берёт задачу, делает работу и отправляет результат в канал, другая горутина, в нашем случае main, читает результат из канала.

func worker(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {

defer wg.Done()

for j := range jobs {

time.Sleep(1 * time.Second)

fmt.Println("job", j)

results <- j * j

}

}

func main() {

jobs := make(chan int)

results := make(chan int)

var wg sync.WaitGroup

for i := 0; i < 3; i++ {

wg.Add(1)

go worker(jobs, results, &wg)

}

go func() {

for i := 0; i < 10; i++ {

jobs <- i

}

close(jobs)

}()

go func() {

wg.Wait()

close(results)

}()

for result := range results {

fmt.Println(result)

}

}

Actor model

Паттерн, при котором акторы общаются только через каналы, и меняют данные только через каналы.

type message struct {

amount int

response chan int

}

func counter(messages chan message) {

for m := range messages {

m.response <- m.amount + 1

close(m.response)

}

}

func main() {

messages := make(chan message)

var wg sync.WaitGroup

go counter(messages)

wg.Add(3)

for i := 0; i < 3; i++ {

go func(i int) {

defer wg.Done()

response := make(chan int)

messages <- message{amount: i, response: response}

fmt.Println(<-response)

}(i)

}

wg.Wait()

close(messages)

}

Это не все паттерны, их целая куча, но уже с этими можно делать интересные вещи, такие как балансировщики нагрузки, pub/sub системы, очереди задач, rate limiter и много всего другого.