go语言无缓冲的channel

Python015

go语言无缓冲的channel,第1张

无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。

这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。否则,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。

这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。

阻塞:由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足,才接触阻塞。

同步:在两个或多个协程(线程)间,保持数据内容一致性的机制。

下图展示两个 goroutine 如何利用无缓冲的通道来共享一个值:

在第 1 步,两个 goroutine 都到达通道,但哪个都没有开始执行发送或者接收。

在第 2 步,左侧的 goroutine 将它的手伸进了通道,这模拟了向通道发送数据的行为。这时,这个 goroutine 会在通道中被锁住,直到交换完成。

在第 3 步,右侧的 goroutine 将它的手放入通道,这模拟了从通道里接收数据。这个 goroutine 一样也会在通道中被锁住,直到交换完成。

在第 4 步和第 5 步,进行交换,并最终,在第 6 步,两个 goroutine 都将它们的手从通道里拿出来,这模拟了被锁住的 goroutine 得到释放。两个 goroutine 现在都可以去做别的事情了。

如果没有指定缓冲区容量,那么该通道就是同步的,因此会阻塞到发送者准备好发送和接收者准备好接收。

无缓冲channel: —— 同步通信

goroutine和channel是Go语言非常棒的特色,它们提供了一种非常轻便易用的并发能力。但是当您的应用进程中有很多goroutine的时候,如何在主流程中等待所有的goroutine 退出呢?

1 通过Channel传递退出信号

Go的一大设计哲学就是:通过Channel共享数据,而不是通过共享内存共享数据。主流程可以通过channel向任何goroutine发送停止信号,就像下面这样:

func run(done chan int) {

for {

select {

case <-done:

fmt.Println("exiting...")

done <- 1

break

default:

}

time.Sleep(time.Second * 1)

fmt.Println("do something")

}

}

func main() {

c := make(chan int)

go run(c)

fmt.Println("wait")

time.Sleep(time.Second * 5)

c <- 1

<-c

fmt.Println("main exited")

}

这种方式可以实现优雅地停止goroutine,但是当goroutine特别多的时候,这种方式不管在代码美观上还是管理上都显得笨拙不堪。

2 使用waitgroup

sync包中的Waitgroup结构,是Go语言为我们提供的多个goroutine之间同步的好刀。下面是官方文档对它的描述:

A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for.

Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

通常情况下,我们像下面这样使用waitgroup:

创建一个Waitgroup的实例,假设此处我们叫它wg

在每个goroutine启动的时候,调用wg.Add(1),这个操作可以在goroutine启动之前调用,也可以在goroutine里面调用。当然,也可以在创建n个goroutine前调用wg.Add(n)

当每个goroutine完成任务后,调用wg.Done()

在等待所有goroutine的地方调用wg.Wait(),它在所有执行了wg.Add(1)的goroutine都调用完wg.Done()前阻塞,当所有goroutine都调用完wg.Done()之后它会返回。

那么,如果我们的goroutine是一匹不知疲倦的牛,一直孜孜不倦地工作的话,如何在主流程中告知并等待它退出呢?像下面这样做:

type Service struct {

// Other things

ch chan bool

waitGroup *sync.WaitGroup

}

func NewService() *Service {

s := &Service{

// Init Other things

ch: make(chan bool),

waitGroup: &sync.WaitGroup{},

}

return s

}

func (s *Service) Stop() {

close(s.ch)

s.waitGroup.Wait()

}

func (s *Service) Serve() {

s.waitGroup.Add(1)

defer s.waitGroup.Done()

for {

select {

case <-s.ch:

fmt.Println("stopping...")

return

default:

}

s.waitGroup.Add(1)

go s.anotherServer()

}

}

func (s *Service) anotherServer() {

defer s.waitGroup.Done()

for {

select {

case <-s.ch:

fmt.Println("stopping...")

return

default:

}

// Do something

}

}

func main() {

service := NewService()

go service.Serve()

// Handle SIGINT and SIGTERM.

ch := make(chan os.Signal)

signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)

fmt.Println(<-ch)

// Stop the service gracefully.

service.Stop()

}

select 语句使得一个 goroutine 在多个通讯操作上等待。

select 会阻塞,直到条件分支中的某个可以继续执行,这时就会执行那个条件分支。当多个都准备好的时候,会随机选择一个。

复制代码代码如下:

package main

import "fmt"

func fibonacci(c, quit chan int) {

x, y := 1, 1

for {

select {

case c <- x:

x, y = y, x + y

case <-quit:

fmt.Println("quit")

return

}

}

}

func main() {

c := make(chan int)

quit := make(chan int)

go func() {

for i := 0i <10i++ {

fmt.Println(<-c)

}

quit <- 0

}()

fibonacci(c, quit)

}

默认选择

当 select 中的其他条件分支都没有准备好的时候,default 分支会被执行。

为了非阻塞的发送或者接收,可使用 default 分支:

select {

case i := <-c:

// use i

default:

// receiving from c would block

}

复制代码代码如下:

package main

import (

"fmt"

"time"

)

func main() {

tick := time.Tick(1e8)

boom := time.After(5e8)

for {

select {

case <-tick:

fmt.Println("tick.")

case <-boom:

fmt.Println("BOOM!")

return

default:

fmt.Println(".")

time.Sleep(5e7)

}

}

}