GO執行序與通道
前言
Go 提供了兩種平行運算的方式:
- communicating sequential processes (CSP)
- shared memory multithreading
本篇介紹 Go 實現 CSP 的方式,就是用使用 channels 在 goroutines 之間溝通。而 shared memory multithreading,用 Go 實踐,就是多個 goroutines 共用同一組變數。
Goroutines
Goroutines 是 Go 的 CSP 執行序,類似一般的 threads 又有點不同。(另篇解釋)
Go 的第一個 Goroutine 都是執行 main function,稱之為 main Goroutine,其中所有程式碼都是按順序執行;若要平行運算某個 function可以使用 go
指令:
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
go
會開啟一個新的 Goroutine 來執行這個function,此 function 結束時,這個 Goroutine也會結束,而這個function的回傳值會被無視。當 main Goroutine 結束時,所有 Goroutines 都會結束,若要確保 goroutines 都執行完畢,後面會提到利用channel同步的做法。
Go Channel
Channel也是一種 type , 他提供了一種機制,讓平行執行的 functions 可以透過事先定義的types ,對 channel 執行 傳送(sending) 、接收 (receiving) 的方式互相溝通 。Channel 也可以傳送 channel。
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType .
Channel 可以被定義為雙向、只有傳送、只有接收,三種模式。"<-" 定義了channel 的方向,不寫就是雙向。
// ChannelType
chan T // 雙向 can be used to send and receive values of type T
chan<- float64 // 傳送 can only be used to send float64s
<-chan int // 接收 can only be used to receive ints
chan<- chan int // same as chan<- (chan int)
// 宣告一個變數 x 是 Channel,只傳送 float64
var x chan<- float64
// 宣告一個變數 x 是 Channel,只傳送 "可雙向傳送int的channel"。
var x chan<- chan int
var x chan<- (chan int) // they are the same
Channel 可以用 make 指令實體化,同時設定 channel 的 capacity,也就是緩衝區大小 (buffer),單位是傳送的 type。
var x chan int = make(chan int) // channel without buffer
var x chan int = make(chan int, 100) // channel with capacity = 100
先定義與 channel 有關的雙方:
- sender: 傳送方,執行 sending
- receiver: 接收方,執行 receiving
沒有 buffer (cap = 0 或沒設定) 的情況下,sender 與 receiver 都必須就緒,傳輸才會成功。
有buffer的情況下,如果 buffer 沒滿,那麼傳輸必定成功;若 buffer 滿了,則 sender 會被 block,直到receiver就緒;若 buffer 空了,receiver 會被 block,直到sender就緒。
傳送接收資料的寫法如下:
package main
import "fmt"
func main() {
var channel chan int = make(chan int, 1)
var data int = 3
var box int
channel <- data // send data to channel
box = <-channel // box receives data from channel
fmt.Println(box) // 3
}
這個範例是殺雞用牛刀,如果只是要把 data 傳給 box ,直接assign即可,但channel 的真正用途是在不同的goroutine之間溝通。但這範例帶出一個小問題,我們把上例中的 cap 拿掉:
var channel chan int = make(chan int) // 拿掉cap
那麼這行error就會出現:
fatal error: all goroutines are asleep — deadlock!
因為沒有 buffer 的情況下,sender 與 receiver 都必須就緒,傳輸才會成功,在這個案例中,main function 同時為兩者,由於是在同一個 main goroutine 之中,是循序執行,而不是同時,因此在他扮演 sender 的時候被block,沒有辦法繼續成為receiver,因為receive在後面才會執行,形成死結 (deadlock)。
Deadlock 死結
等待一個永遠不會被釋放的資源,導致沒有程序可以進行,就是死結。
應用 channels 在 goroutines 之間溝通,若設計不良,很容易遇到死結。
Unbuffered Channel
沒有buffer的channel稱為 unbuffered channel,由於收發都必須等待對方就緒,因此形成某種同步 (synchronize),因此 unbuffered channel 亦被稱為 synchronous channel。這個特性常被應用於確保 goroutine 完成才關閉主程式。
Buffered Channel
與unbuffered channel相反, buffered channel 的目的是為了讓 goroutine之間解耦。
Unidirectional Channel
單向Channel 主要是保證傳輸的方向,單獨存在沒什麼意義,因此通常都是搭配雙向channel使用。
用途1: 同步
例子中有一個 士兵J 丟出了一顆炸彈,而士兵K 與士兵G 都準備好要接炸彈了,每次接到炸彈,士兵都會報數,炸彈會倒數,然後我們緬懷最後接到炸彈的士兵。主程式必須在收到炸彈爆炸的訊息後才會結束,以確保所有訊息都被印出。
package main
import (
"fmt"
)
type Bomb struct {
Count int
Done chan struct{}
}
func (b *Bomb) CountDown() (Explode bool) {
b.Count -= 1
if b.Count == 0 {
fmt.Println("BooooooM !!")
defer func() { b.Done <- struct{}{} }()
return true
}
return false
}
type Soldier struct {
Name string
Holding *Bomb
}
type BombChannel chan *Bomb
func (s *Soldier) PassBomb(bc BombChannel) {
bc <- s.Holding
s.Holding = nil
}
func (s *Soldier) ReadyForBomb(bc BombChannel) {
s.Holding = <-bc
fmt.Println(s.Name, "gets the bomb, and the count is", s.Holding.Count)
explode := s.Holding.CountDown()
if explode {
fmt.Println("RIP", s.Name)
} else {
go s.PassBomb(bc)
go s.ReadyForBomb(bc)
}
}
func main() {
bc := make(BombChannel)
boom := make(chan struct{})
bomb := Bomb{5, boom}
SoldierJ := Soldier{"J", &bomb}
SoldierG := Soldier{"G", nil}
SoldierK := Soldier{"K", nil}
go SoldierG.ReadyForBomb(bc)
go SoldierK.ReadyForBomb(bc)
SoldierJ.PassBomb(bc)
<-boom // 確保炸彈爆炸才關閉主程式,否則
}
結果
K gets the bomb, and the count is 5 // 保證印出
K gets the bomb, and the count is 4 // 保證印出
G gets the bomb, and the count is 3 // 保證印出
K gets the bomb, and the count is 2 // 保證印出
G gets the bomb, and the count is 1 // 保證印出
BooooooM !! // 保證印出
RIP G // 不保證印出
除了第一個接到炸彈的人是固定的,剩下的順序不一定。
用途2: Pipelines
這裡展示單向 channel 如何與雙向 channel 搭配,做到保證傳輸方向;同時也展示了 for 接收channel ,直到 channel.cose() 才會跳出 for loop的用法。
這個範例是從台灣到英國的飛機,中間在泰國轉機,乘客人數的變化:
桃園國際機場 -> 泰國廊曼機場 -> 英國西斯洛機場
package main
import "fmt"
type Airplane struct {
Id int
Passengers int
}
// 桃園國際機場
func TPE(out chan<- Airplane) {
for x := 0; x < 5; x++ {
plane := Airplane{Id: x, Passengers: x + 2}
fmt.Printf("Airplane %+v departing from TPE. \n", plane)
out <- plane
}
close(out)
}
// 泰國 廊曼機場
func DMK(out chan<- Airplane, in <-chan Airplane) {
for v := range in {
v.Passengers = v.Passengers - 2 + 3
out <- v
}
close(out)
}
// 英國 希斯洛機場
func LHR(in <-chan Airplane) {
for v := range in {
fmt.Printf("Airplane %+v Arriving LHR. \n", v)
}
}
func main() {
TPE2DMK := make(chan Airplane)
DMK2LHR := make(chan Airplane)
go TPE(TPE2DMK)
go DMK(DMK2LHR, TPE2DMK)
LHR(DMK2LHR)
}
結果
Airplane {Id:0 Passengers:2} departing from TPE.
Airplane {Id:1 Passengers:3} departing from TPE.
Airplane {Id:2 Passengers:4} departing from TPE.
Airplane {Id:0 Passengers:3} Arriving LHR.
Airplane {Id:1 Passengers:4} Arriving LHR.
Airplane {Id:2 Passengers:5} Arriving LHR.
Airplane {Id:3 Passengers:5} departing from TPE.
Airplane {Id:4 Passengers:6} departing from TPE.
Airplane {Id:3 Passengers:6} Arriving LHR.
Airplane {Id:4 Passengers:7} Arriving LHR.
Select
Select 語法類似 switch ,但專門用於 channels,case 成立的必要條件是該channel 沒有被 block 而可以執行 case 後面接的動作;如果所有case 都不成立則 block,直到有case 成立為止。
範例
這範例中,透過輸入 'a'
或 'b'
來操作 c1, c2 這兩個 channels,進而觸發後面的select。c1 是 buffered channel,c2 是 unbuffered channel。
package main
import (
"fmt"
"os"
)
func main() {
var c1 = make(chan struct{}, 1)
c1 <- struct{}{}
var c2 = make(chan struct{})
go func() {
b := make([]byte, 1)
os.Stdin.Read(b)
switch b[0] {
case 'a':
<-c1
case 'b':
c2 <- struct{}{}
}
}()
select {
case c1 <- struct{}{}:
fmt.Println(1)
case <-c2:
fmt.Println(2)
case x := <-c2:
fmt.Println(x)
}
}
case c1 <- struct{} {}:
成立的條件是 c1 被清空;
case <-c2:
成立的條件是對c2 傳值;
case x := <-c2:
成立的條件與上一個 case 相同,x 會在case成立時宣告。
若複數個 case 同時成立,則會隨機選一個執行。
Select 也有 default,若沒有case 成立,則執行default。如下例子,若沒有default,則 for loop 會卡在第一圈;有了default,則每秒一圈:
package main
import (
"fmt"
"os"
"time"
)
func main() {
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
ticker := time.Tick(time.Second)
for k := range ticker {
fmt.Println(k)
select {
case <-abort:
return
default:
}
}
}
執行結果
我在第三秒多的時候輸入了enter:
2023–06–05 09:24:54.0276331 +0000 UTC m=+1.000497001
2023–06–05 09:24:55.028545 +0000 UTC m=+2.001408701
2023–06–05 09:24:56.0279756 +0000 UTC m=+3.000839001
2023–06–05 09:24:57.0285048 +0000 UTC m=+4.001368601
mission abort
select default 有時用於避免 deadlock。
Goroutines 數量上限
Go 沒有限制 Goroutines 的數量上限,由於goroutine非常輕量,一般機器開上萬個goroutines都不是問題,但無限制的開總是有爆的一天,所以限制goroutines 的數量上限也是一個課題。以下列幾個常用方法:
- Semaphore 旗號 (使用 buffered channel)
const maxGoroutines = 5 // adjust this value as needed
sem := make(chan struct{}, maxGoroutines)
for i := 0; i < 100; i++ { // assume we want to start 100 goroutines
sem <- struct{}{} // will block if there is no room in the channel
go func(i int) {
// release the semaphore when goroutine finishes
defer func() { <-sem }()
// ... do work here ...
}(i)
}
- Worker pool (搭配 sync.WaitGroup)
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, tasks <-chan int, wg *sync.WaitGroup) {
// Decreasing internal counter for wait-group as soon as goroutine finishes
defer wg.Done()
for task := range tasks {
// Do the work here.
fmt.Printf("worker %d Processing task %d.\n", id, task)
time.Sleep(time.Millisecond * 100)
}
}
func main() {
// 工人數量
const maxGoroutines = 5
// Declaring waitgroup
var wg sync.WaitGroup
// 將等待5個工人完成工作
wg.Add(maxGoroutines)
// Channel with buffer size equal to the number of tasks
maxTasks := 10
tasks := make(chan int, maxTasks)
// Starting 5 worker goroutines
for i := 0; i < maxGoroutines; i++ {
go worker(i, tasks, &wg)
}
// Adding tasks to the channel
for i := 1; i <= 10; i++ {
tasks <- i
}
// We close the channel to signal that no more tasks will be sent to it
close(tasks)
// Waiting for all the tasks to finish
wg.Wait()
}
結果
worker 4 Processing task 1.
worker 2 Processing task 3.
worker 1 Processing task 2.
worker 0 Processing task 5.
worker 3 Processing task 4.
worker 1 Processing task 6.
worker 4 Processing task 7.
worker 2 Processing task 8.
worker 3 Processing task 9.
worker 0 Processing task 10.
- worker pool (搭配 buffered channel)