select
select
var c1, c2 <-chan interface{}
var c3 chan<- interface{}
select {
case <-c1:
// Do something
case <-c2:
// Do something
case c3 <- struct{}{}:
// Do something
}
跟
start := time.Now()
c := make(chan interface{})
go func() {
time.Sleep(5 * time.Second)
close(c) // 5 秒后关闭通道
}()
fmt.Println("Blocking on read...")
select {
case <-c: // 尝试读取通道。注意,尽管我们可以不使用select语句而直接使用<-c,但我们的目的是为了展示select语句。
fmt.Printf("Unblocked %v later.\n", time.Since(start))
}
// Blocking on read...
// Unblocked 5s later.
通道是将
select 顺序
对于空的
package main
func main() {
select {
}
}
package main
import (
"fmt"
"time"
)
func main() {
chan1 := make(chan int)
chan2 := make(chan int)
go func() {
chan1 <- 1
time.Sleep(5 * time.Second)
}()
go func() {
chan2 <- 1
time.Sleep(5 * time.Second)
}()
select {
case <-chan1:
fmt.Println("chan1 ready.")
case <-chan2:
fmt.Println("chan2 ready.")
default:
fmt.Println("default")
}
fmt.Println("main exit.")
}
package main
import (
"fmt"
"time"
)
func main() {
chan1 := make(chan int)
chan2 := make(chan int)
writeFlag := false
go func() {
for {
if writeFlag {
chan1 <- 1
}
time.Sleep(time.Second)
}
}()
go func() {
for {
if writeFlag {
chan2 <- 1
}
time.Sleep(time.Second)
}
}()
select {
case <-chan1:
fmt.Println("chan1 ready.")
case <-chan2:
fmt.Println("chan2 ready.")
}
fmt.Println("main exit.")
}
程序中声明两个
总结而言:
select 语句中除default 外,每个case 操作一个channel ,要么读要么写select 语句中除default 外,各case 执行顺序是随机的select 语句中如果没有default 语句,则会阻塞等待任一case select 语句中读操作要判断是否成功读取,关闭的channel 也可以读取
select 的典型应用
定时器
func main() {
tickTimer := time.NewTicker(1 * time.Second)
barTimer := time.NewTicker(60 * time.Second)
for {
select {
case <-tickTimer.C:
fmt.Println("tick")
case <-barTimer.C:
fmt.Println("bar")
}
}
}
select 实现原理
case 数据结构
源码包 src/runtime/select.go:scase
定义了表示
type scase struct {
c *hchan // chan
kind uint16
elem unsafe.Pointer // data element
}
- caseRecv:
case 语句中尝试读取scase.c 中的数据; - caseSend:
case 语句中尝试向scase.c 中写入数据; - caseDefault:
default 语句
- scase.kind == caseRecv:
scase.elem 表示读出channel 的数据存放地址; - scase.kind == caseSend:
scase.elem 表示将要写入channel 的数据存放地址;
select 实现逻辑
源码包
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool)
函数参数:
cas0 为scase 数组的首地址,selectgo() 就是从这些scase 中找出一个返回。order0 为一个两倍cas0 数组长度的buffer ,保存scase 随机序列pollorder 和scase 中channel 地址序列lockorder - pollorder:每次
selectgo 执行都会把scase 序列打乱,以达到随机检测case 的目的。 - lockorder:所有
case 语句中channel 序列,以达到去重防止对channel 加锁时重复加锁的目的。
- pollorder:每次
ncases 表示scase 数组的长度
函数返回值:
- int:选中
case 的编号,这个case 编号跟代码一致 bool: 是否成功从channle 中读取了数据,如果选中的case 是从channel 中读数据,则该返回值表示是否读取成功。
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
//1. 锁定scase语句中所有的channel
//2. 按照随机顺序检测scase中的channel是否ready
// 2.1 如果case可读,则读取channel中数据,解锁所有的channel,然后返回(case index, true)
// 2.2 如果case可写,则将数据写入channel,解锁所有的channel,然后返回(case index, false)
// 2.3 所有case都未ready,则解锁所有的channel,然后返回(default index, false)
//3. 所有case都未ready,且没有default语句
// 3.1 将当前协程加入到所有channel的等待队列
// 3.2 当将协程转入阻塞,等待被唤醒
//4. 唤醒后返回channel对应的case index
// 4.1 如果是读操作,解锁所有的channel,然后返回(case index, true)
// 4.2 如果是写操作,解锁所有的channel,然后返回(case index, false)
}
func selectgo(sel *hselect) int {
// ...
// case洗牌
pollslice := slice{unsafe.Pointer(sel.pollorder), int(sel.ncase), int(sel.ncase)}
pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))
for i := 1; i < int(sel.ncase); i++ {
//....
}
// 给case排序
lockslice := slice{unsafe.Pointer(sel.lockorder), int(sel.ncase), int(sel.ncase)}
lockorder := *(*[]uint16)(unsafe.Pointer(&lockslice))
for i := 0; i < int(sel.ncase); i++ {
// ...
}
for i := int(sel.ncase) - 1; i >= 0; i-- {
// ...
}
// 加锁该select中所有的channel
sellock(scases, lockorder)
// 进入loop
loop:
// ...
// pass 1 - look for something already waiting
// 按顺序遍历case来寻找可执行的case
for i := 0; i < int(sel.ncase); i++ {
//...
switch cas.kind {
case caseNil:
continue
case caseRecv:
// ... goto xxx
case caseSend:
// ... goto xxx
case caseDefault:
dfli = casi
dfl = cas
}
}
// 没有找到可以执行的case,但有default条件,这个if里就会直接退出了。
if dfl != nil {
// ...
}
// ...
// pass 2 - enqueue on all chans
// chan入等待队列
for _, casei := range lockorder {
// ...
switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)
case caseSend:
c.sendq.enqueue(sg)
}
}
// wait for someone to wake us up
// 等待被唤起,同时解锁channel(selparkcommit这里实现的)
gp.param = nil
gopark(selparkcommit, nil, "select", traceEvGoBlockSelect, 1)
// 突然有故事发生,被唤醒,再次该select下全部channel加锁
sellock(scases, lockorder)
// pass 3 - dequeue from unsuccessful chans
// 本轮最后一次循环操作,获取可执行case,其余全部出队列丢弃
casi = -1
cas = nil
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
for _, casei := range lockorder {
// ...
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
} else {
c = k.c
if k.kind == caseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
// ...
}
// 没有的话,再走一次loop
if cas == nil {
goto loop
}
// ...
bufrecv:
// can receive from buffer
bufsend:
// ...
recv:
// ...
rclose:
// ...
send:
// ...
retc:
// ...
sclose:
// send on closed channel
}
特别说明:对于读case elem, ok := <-chan1:
