Goroutine & Go Channel

Gary Liao
16 min readJun 5, 2023

--

GO執行序與通道

前言

Go 提供了兩種平行運算的方式:

  • communicating sequential processes (CSP)
  • shared memory multithreading

本篇介紹 Go 實現 CSP 的方式,就是用使用 channels 在 goroutines 之間溝通。而 shared memory multithreading,用 Go 實踐,就是多個 goroutines 共用同一組變數。

Goroutines

Goroutines 是 Go 的 CSP 執行序,類似一般的 threads 又有點不同。(另篇解釋)

Go 的第一個 Goroutine 都是執行 main function,稱之為 main Goroutine,其中所有程式碼都是按順序執行;若要平行運算某個 function可以使用 go 指令:

f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait

go 會開啟一個新的 Goroutine 來執行這個function,此 function 結束時,這個 Goroutine也會結束,而這個function的回傳值會被無視。當 main Goroutine 結束時,所有 Goroutines 都會結束,若要確保 goroutines 都執行完畢,後面會提到利用channel同步的做法。

Go Channel

Channel也是一種 type , 他提供了一種機制,讓平行執行的 functions 可以透過事先定義的types ,對 channel 執行 傳送(sending) 、接收 (receiving) 的方式互相溝通 。Channel 也可以傳送 channel。

ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType .

Channel 可以被定義為雙向、只有傳送、只有接收,三種模式。"<-" 定義了channel 的方向,不寫就是雙向。

// ChannelType
chan T // 雙向 can be used to send and receive values of type T
chan<- float64 // 傳送 can only be used to send float64s
<-chan int // 接收 can only be used to receive ints
chan<- chan int // same as chan<- (chan int)

// 宣告一個變數 x 是 Channel,只傳送 float64
var x chan<- float64

// 宣告一個變數 x 是 Channel,只傳送 "可雙向傳送int的channel"。
var x chan<- chan int
var x chan<- (chan int) // they are the same

Channel 可以用 make 指令實體化,同時設定 channel 的 capacity,也就是緩衝區大小 (buffer),單位是傳送的 type。

var x chan int = make(chan int)      // channel without buffer
var x chan int = make(chan int, 100) // channel with capacity = 100

先定義與 channel 有關的雙方:

  • sender: 傳送方,執行 sending
  • receiver: 接收方,執行 receiving

沒有 buffer (cap = 0 或沒設定) 的情況下,sender 與 receiver 都必須就緒,傳輸才會成功。

有buffer的情況下,如果 buffer 沒滿,那麼傳輸必定成功;若 buffer 滿了,則 sender 會被 block,直到receiver就緒;若 buffer 空了,receiver 會被 block,直到sender就緒。

傳送接收資料的寫法如下:


package main

import "fmt"

func main() {
var channel chan int = make(chan int, 1)
var data int = 3
var box int

channel <- data // send data to channel
box = <-channel // box receives data from channel
fmt.Println(box) // 3
}

這個範例是殺雞用牛刀,如果只是要把 data 傳給 box ,直接assign即可,但channel 的真正用途是在不同的goroutine之間溝通。但這範例帶出一個小問題,我們把上例中的 cap 拿掉:

var channel chan int = make(chan int) // 拿掉cap

那麼這行error就會出現:

fatal error: all goroutines are asleep — deadlock!

因為沒有 buffer 的情況下,sender 與 receiver 都必須就緒,傳輸才會成功,在這個案例中,main function 同時為兩者,由於是在同一個 main goroutine 之中,是循序執行,而不是同時,因此在他扮演 sender 的時候被block,沒有辦法繼續成為receiver,因為receive在後面才會執行,形成死結 (deadlock)。

Deadlock 死結

等待一個永遠不會被釋放的資源,導致沒有程序可以進行,就是死結。

應用 channels 在 goroutines 之間溝通,若設計不良,很容易遇到死結。

Unbuffered Channel

沒有buffer的channel稱為 unbuffered channel,由於收發都必須等待對方就緒,因此形成某種同步 (synchronize),因此 unbuffered channel 亦被稱為 synchronous channel。這個特性常被應用於確保 goroutine 完成才關閉主程式。

Buffered Channel

與unbuffered channel相反, buffered channel 的目的是為了讓 goroutine之間解耦。

Unidirectional Channel

單向Channel 主要是保證傳輸的方向,單獨存在沒什麼意義,因此通常都是搭配雙向channel使用。

用途1: 同步

例子中有一個 士兵J 丟出了一顆炸彈,而士兵K 與士兵G 都準備好要接炸彈了,每次接到炸彈,士兵都會報數,炸彈會倒數,然後我們緬懷最後接到炸彈的士兵。主程式必須在收到炸彈爆炸的訊息後才會結束,以確保所有訊息都被印出。

package main

import (
"fmt"
)

type Bomb struct {
Count int
Done chan struct{}
}

func (b *Bomb) CountDown() (Explode bool) {
b.Count -= 1
if b.Count == 0 {
fmt.Println("BooooooM !!")
defer func() { b.Done <- struct{}{} }()
return true
}
return false
}

type Soldier struct {
Name string
Holding *Bomb
}

type BombChannel chan *Bomb

func (s *Soldier) PassBomb(bc BombChannel) {
bc <- s.Holding
s.Holding = nil
}

func (s *Soldier) ReadyForBomb(bc BombChannel) {
s.Holding = <-bc

fmt.Println(s.Name, "gets the bomb, and the count is", s.Holding.Count)
explode := s.Holding.CountDown()
if explode {
fmt.Println("RIP", s.Name)
} else {
go s.PassBomb(bc)
go s.ReadyForBomb(bc)
}
}

func main() {
bc := make(BombChannel)
boom := make(chan struct{})
bomb := Bomb{5, boom}
SoldierJ := Soldier{"J", &bomb}
SoldierG := Soldier{"G", nil}
SoldierK := Soldier{"K", nil}
go SoldierG.ReadyForBomb(bc)
go SoldierK.ReadyForBomb(bc)
SoldierJ.PassBomb(bc)

<-boom // 確保炸彈爆炸才關閉主程式,否則
}

結果

K gets the bomb, and the count is 5 // 保證印出
K gets the bomb, and the count is 4 // 保證印出
G gets the bomb, and the count is 3 // 保證印出
K gets the bomb, and the count is 2 // 保證印出
G gets the bomb, and the count is 1 // 保證印出
BooooooM !! // 保證印出
RIP G // 不保證印出

除了第一個接到炸彈的人是固定的,剩下的順序不一定。

用途2: Pipelines

這裡展示單向 channel 如何與雙向 channel 搭配,做到保證傳輸方向;同時也展示了 for 接收channel ,直到 channel.cose() 才會跳出 for loop的用法。

這個範例是從台灣到英國的飛機,中間在泰國轉機,乘客人數的變化:

桃園國際機場 -> 泰國廊曼機場 -> 英國西斯洛機場


package main

import "fmt"

type Airplane struct {
Id int
Passengers int
}

// 桃園國際機場
func TPE(out chan<- Airplane) {
for x := 0; x < 5; x++ {
plane := Airplane{Id: x, Passengers: x + 2}
fmt.Printf("Airplane %+v departing from TPE. \n", plane)
out <- plane

}
close(out)
}

// 泰國 廊曼機場
func DMK(out chan<- Airplane, in <-chan Airplane) {
for v := range in {
v.Passengers = v.Passengers - 2 + 3
out <- v
}
close(out)
}

// 英國 希斯洛機場
func LHR(in <-chan Airplane) {
for v := range in {
fmt.Printf("Airplane %+v Arriving LHR. \n", v)
}
}

func main() {

TPE2DMK := make(chan Airplane)
DMK2LHR := make(chan Airplane)

go TPE(TPE2DMK)
go DMK(DMK2LHR, TPE2DMK)
LHR(DMK2LHR)
}

結果

Airplane {Id:0 Passengers:2} departing from TPE. 
Airplane {Id:1 Passengers:3} departing from TPE.
Airplane {Id:2 Passengers:4} departing from TPE.
Airplane {Id:0 Passengers:3} Arriving LHR.
Airplane {Id:1 Passengers:4} Arriving LHR.
Airplane {Id:2 Passengers:5} Arriving LHR.
Airplane {Id:3 Passengers:5} departing from TPE.
Airplane {Id:4 Passengers:6} departing from TPE.
Airplane {Id:3 Passengers:6} Arriving LHR.
Airplane {Id:4 Passengers:7} Arriving LHR.

Select

Select 語法類似 switch ,但專門用於 channels,case 成立的必要條件是該channel 沒有被 block 而可以執行 case 後面接的動作;如果所有case 都不成立則 block,直到有case 成立為止。

範例

這範例中,透過輸入 'a''b' 來操作 c1, c2 這兩個 channels,進而觸發後面的select。c1 是 buffered channel,c2 是 unbuffered channel。

package main

import (
"fmt"
"os"
)

func main() {
var c1 = make(chan struct{}, 1)
c1 <- struct{}{}
var c2 = make(chan struct{})

go func() {
b := make([]byte, 1)
os.Stdin.Read(b)
switch b[0] {
case 'a':
<-c1
case 'b':
c2 <- struct{}{}
}

}()
select {

case c1 <- struct{}{}:
fmt.Println(1)

case <-c2:
fmt.Println(2)

case x := <-c2:
fmt.Println(x)
}

}

case c1 <- struct{} {}: 成立的條件是 c1 被清空;

case <-c2: 成立的條件是對c2 傳值;

case x := <-c2: 成立的條件與上一個 case 相同,x 會在case成立時宣告。

若複數個 case 同時成立,則會隨機選一個執行。

Select 也有 default,若沒有case 成立,則執行default。如下例子,若沒有default,則 for loop 會卡在第一圈;有了default,則每秒一圈:

package main

import (
"fmt"
"os"
"time"
)

func main() {
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
ticker := time.Tick(time.Second)
for k := range ticker {
fmt.Println(k)
select {
case <-abort:
return
default:
}
}
}

執行結果

我在第三秒多的時候輸入了enter:

2023–06–05 09:24:54.0276331 +0000 UTC m=+1.000497001
2023–06–05 09:24:55.028545 +0000 UTC m=+2.001408701
2023–06–05 09:24:56.0279756 +0000 UTC m=+3.000839001

2023–06–05 09:24:57.0285048 +0000 UTC m=+4.001368601
mission abort

select default 有時用於避免 deadlock。

Goroutines 數量上限

Go 沒有限制 Goroutines 的數量上限,由於goroutine非常輕量,一般機器開上萬個goroutines都不是問題,但無限制的開總是有爆的一天,所以限制goroutines 的數量上限也是一個課題。以下列幾個常用方法:

  • Semaphore 旗號 (使用 buffered channel)
const maxGoroutines = 5  // adjust this value as needed
sem := make(chan struct{}, maxGoroutines)

for i := 0; i < 100; i++ { // assume we want to start 100 goroutines
sem <- struct{}{} // will block if there is no room in the channel
go func(i int) {
// release the semaphore when goroutine finishes
defer func() { <-sem }()
// ... do work here ...
}(i)
}
  • Worker pool (搭配 sync.WaitGroup)
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, tasks <-chan int, wg *sync.WaitGroup) {
// Decreasing internal counter for wait-group as soon as goroutine finishes
defer wg.Done()
for task := range tasks {
// Do the work here.
fmt.Printf("worker %d Processing task %d.\n", id, task)
time.Sleep(time.Millisecond * 100)
}
}
func main() {
// 工人數量
const maxGoroutines = 5
// Declaring waitgroup
var wg sync.WaitGroup
// 將等待5個工人完成工作
wg.Add(maxGoroutines)
// Channel with buffer size equal to the number of tasks
maxTasks := 10
tasks := make(chan int, maxTasks)
// Starting 5 worker goroutines
for i := 0; i < maxGoroutines; i++ {
go worker(i, tasks, &wg)
}
// Adding tasks to the channel
for i := 1; i <= 10; i++ {
tasks <- i
}
// We close the channel to signal that no more tasks will be sent to it
close(tasks)
// Waiting for all the tasks to finish
wg.Wait()
}

結果

worker 4 Processing task 1.
worker 2 Processing task 3.
worker 1 Processing task 2.
worker 0 Processing task 5.
worker 3 Processing task 4.
worker 1 Processing task 6.
worker 4 Processing task 7.
worker 2 Processing task 8.
worker 3 Processing task 9.
worker 0 Processing task 10.
  • worker pool (搭配 buffered channel)

https://gobyexample.com/worker-pools

--

--

Gary Liao
Gary Liao

Written by Gary Liao

我很忙,一秒鐘幾十萬上下。

No responses yet