17 KiB
14.2 协程间的信道
14.2.1 概念
在第一个例子中,协程是独立执行的,他们之间没有通信。他们必须通信才会变得更有用:彼此之间发送和接收信息并且协调/同步他们的工作。协程可以使用共享变量来通信,但是很不提倡这样做,因为这种方式给所有的共享内存的多线程都带来了困难。
而Go有一个特殊的类型,通道(channel)
,像是通道(管道),可以通过它们发送类型化的数据在协程之间通信,可以避开所有内存共享导致的坑;通道的通信方式保证了同步性。数据通过通道:同一时间只有一个协程可以访问数据:所以不会出现数据竞争,设计如此。数据的归属(可以读写数据的能力)被传递。
工厂的传送带是个很有用的例子。一个机器(生产者协程)在传送带上放置物品,另外一个机器(消费者协程)拿到物品并打包。
通道服务于通信的两个目的:值的交换,同步的,保证了两个计算(协程)任何时候都是可知状态。
通常使用这样的格式来声明通道:var identifier chan datatype
未初始化的通道的值是nil。
所以通道稚嫩传输一种类型的数据,比如chan int
或者chan string
,所有的类型都可以用于通道,空接口interface{}
也可以。甚至可以(有时非常有用)创建通道的通道。
通道实际上是类型化消息的队列:使数据得以传输。它是先进先出(FIFO)结构的所以可以保证发送给他们的元素的顺序(有些人知道,通道可以比作Unix shells中的双向管道(tw-way pipe))。通道也是引用类型,所以我们使用make()
函数来给它分配内存。这里先声明了一个字符串通道ch1,然后创建了它(实例化):
var ch1 chan string
ch1 = make(chan string)
当然可以更短: ch1 := make(chan string)
这里我们构建一个int通道的通道: chanOfChans := make(chan chan int)
或者函数通道: funcChan := chan func()
(相关示例请看章节14.17)
所以通道是对象的第一类型:可以存储在变量中,作为函数的参数传递,从函数返回以及通过通道发送它们自身。另外它们是类型化的,允许类型检查,比如尝试使用整数通道发送一个指针。
14.2.2 通信操作符 <-
这个操作符直观的标示了数据的传输:信息按照箭头的方向流动。
流向通道(发送)
ch <- int1
表示:用通道ch发送变量int1(二进制操作符,中缀 = 发送)
从通道流出(接收),三种方式:
int2 = <- ch
表示:变量int2从通道ch(一元运算的前缀操作符,前缀 = 接收)接收数据(获取新值);假设int2已经声明过了,如果没有的话可以写成:int2 := <- ch
<- ch
可以单独调用获取通道的(下一个)值,当前值会被丢弃,但是可以用来验证,所以以下代码是合法的:
if <- ch != 1000{
...
}
操作符 <- 也被用来发送和接收,Go尽管不必要,为了可读性,通道的命名通常以ch
开头或者包含chan
。通道的发送和接收操作都是自动的:它们通常一气呵成。下面的示例展示了通信操作。
示例 14.2-goroutine2.go
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go sendData(ch)
go getData(ch)
time.Sleep(1e9)
}
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
}
func getData(ch chan string) {
var input string
// time.Sleep(1e9)
for {
input = <-ch
fmt.Printf("%s ", input)
}
}
输出:
Washington Tripoli London Beijing Tokio
main()
函数中启动了两个协程:sendData()
通过通道ch发送了5个字符串,getData()
按顺序接收它们并打印出来。
如果2个协程需要通信,你必须给他们同一个通道作为参数才行。
尝试一下如果注释掉time.Sleep(1e9)
会如何。
我们发现协程之间的同步非常重要:
- main()等待了1秒让两个协程完成,如果不这样,sendData()就没有机会输出。
- getData()使用了无限循环:它随着sendData()的发送完成和ch变空也结束了。
- 如果我们移除一个或所有go关键字,程序无法运行,Go运行时会抛出panic:
---- Error run E:/Go/Goboek/code examples/chapter 14/goroutine2.exe with code Crashed ---- Program exited with code -2147483645: panic: all goroutines are asleep-deadlock!
为什么会这样?运行时会检查所有的协程(也许只有一个是这种情况)是否在等待(可以读取或者写入某个通道),意味着程序无法处理。这是死锁(deadlock)形式,运行时可以检测到这种情况。
注意:不要使用打印状态来表明通道的发送和接收顺序:由于打印状态和通道实际发生读写的时间延迟会导致和真实发生的顺序不同。
练习 14.4:解释一下为什么如果在函数getData()
的一开始插入time.Sleep(1e9)
,不会出现错误但也没有输出呢。
14.2.3 通道阻塞
默认情况下,通信是同步且无缓冲的:在有接受者接收数据之前,发送不会结束。可以想象一个无缓冲的通道在没有空间来保存数据的时候:必须要一个接收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者。所以通道的发送/接收操作在对方准备好之前是阻塞的:
1)对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入。所以发送操作会等待ch再次变为可用状态:就是通道值被接收时(可以传入变量)。
2)对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。
尽管这看上去是非常严格的约束,实际在大部分情况下工作的很不错。
程序channel_block.go
验证了以上理论,一个协程在无限循环中给通道发送整数数据。不过因为没有接收者,只输出了一个数字0。
示例 14.3-channel_block.go
package main
import "fmt"
func main() {
ch1 := make(chan int)
go pump(ch1) // pump hangs
fmt.Println(<-ch1) // prints only 0
}
func pump(ch chan int) {
for i := 0; ; i++ {
ch <- i
}
}
输出:
0
pump()
函数为通道提供数值,也被叫做生产者。
为通道解除阻塞定义了suck
函数来在无限循环中读取通道,参见示例 14.4-channel_block2.go:
func suck(ch chan int) {
for {
fmt.Println(<-ch)
}
}
在main()
中使用协程开始它:
go pump(ch1)
go suck(ch1)
time.Sleep(1e9)
给程序1秒的时间来运行:输出了上万个整数。
练习 14.1:channel_block3.go:写一个通道证明它的阻塞性,开启一个协程接收通道的数据,持续15秒,然后给通道放入一个值。在不同的阶段打印消息并观察输出。
14.2.4 通过一个(或多个)通道交换数据进行协程同步。
通信是一种同步形式:通过通道,两个协程在通信(协程会和)中某刻同步交换数据。无缓冲通道成为了多个协程同步的完美工具。
甚至可以在通道两端互相阻塞对方,形成了叫做死锁的状态。Go运行时会检查并panic,停止程序。死锁几乎完全是由糟糕的设计导致的。
无缓冲通道会被阻塞。设计无阻塞的程序可以避免这种情况,或者使用带缓冲的通道。
练习 14.2: blocking.go
解释为什么下边这个程序会导致panic:所有的协程都休眠了 - 死锁!
package main
import (
"fmt"
)
func f1(in chan int) {
fmt.Println(<-in)
}
func main() {
out := make(chan int)
out <- 2
go f1(out)
}
14.2.5 同步通道-使用带缓冲的通道
一个无缓冲通道只能包含1个元素,有时显得很局限。我们给通道提供了一个缓存,可以在扩展的make
命令中设置它的容量,如下:
buf := 100
ch1 := make(chan string, buf)
buf是通道可以承受的元素(这里是string)个数
在缓冲满载(缓冲被全部使用)之前,给一个带缓冲的通道发送数据是不会阻塞的,而从通道读取数据也不会阻塞,直到缓冲空了。
缓冲容量和类型无关,所以可以(尽管可能导致危险)给一些通道设置不同的容量,只要他们拥有同样的元素类型。内置的cap
函数可以返回缓冲区的容量。
如果容量大于0,通道就是异步的了:缓冲满载(发送)或变空(接收)之前通信不会阻塞,元素会按照发送的顺序被接收。如果容量是0或者未设置,通信仅在收发双方准备好的情况下才可以成功。
同步:ch :=make(chan type, value)
value == 0 -> synchronous, unbuffered (阻塞)
value > 0 -> asynchronous, buffered(非阻塞)取决于value元素
若使用通道的缓冲,你的程序会在“请求”激增的时候表现更好:更具弹性,专业术语叫:更具有伸缩性(scalable)。要在首要位置使用无缓冲通道来设计算法,只在不确定的情况下使用缓冲。
练习 14.3:channel_buffer.go:给channel_block3.go的通道增加缓冲并观察输出有何不同。
14.2.6 协程中用通道输出结果
为了知道计算何时完成,可以通过信道回报。在例子go sum(bigArray)
中,要这样写:
ch := make(chan int)
go sum(bigArray, ch) // bigArray puts the calculated sum on ch
// .. do something else for a while
sum := <- ch // wait for, and retrieve the sum
也可以使用通道来达到同步的目的,这个很有效的用法在传统计算机中成为(semaphore)。或者换个方式:通过通道发送信号告知处理已经完成(在协程中)。
在其他协程运行时让main程序无限阻塞的通常做法是在main
函数的最后放置一个{}。
也可以使用通道让main
程序等待协程完成,就是所谓的信号量模式,我们会在接下来的部分讨论。
14.2.7 信号量模式
下边的片段阐明:协程通过在通道ch
中放置一个值来处理结束的信号。main
协程等待<-ch
直到从中获取到值。
我们期望从这个通道中获取返回的结果,像这样:
func compute(ch chan int){
ch <- someComputation() // when it completes, signal on the channel.
}
func main(){
ch := make(chan int) // allocate a channel.
go compute(ch) // stat something in a goroutines
doSomethingElseForAWhile()
result := <- ch
}
这个信号也可以是其他的,不反回结果,比如下边这个协程中的lambda函数 协程:
ch := make(chan int)
go func(){
// doSomething
ch <- 1 // Send a signal; value does not matter
}
doSomethingElseForAWhile()
<- ch // Wait for goroutine to finish; discard sent value.
或者等待两个协程完成,每一个都会对切片s的一部分进行排序,片段如下:
done := make(chan bool)
// doSort is a lambda function, so a closure which knows the channel done:
doSort := func(s []int){
sort(s)
done <- true
}
i := pivot(s)
go doSort(s[:i])
go doSort(s[i:])
<-done
<-done
下边的代码,用完整的信号量模式对size长度的gloat64切片进行了N个doSomething()
计算并同时完成,通道sem分配了相同的长度(切包含空接口类型的元素),待所有的计算都完成后,发送信号(通过放入值)。在循环中从通道sem不停的接收数据来等待所有的协程完成。
type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N)
...
for i, xi := range data {
go func (i int, xi float64) {
res[i] = doSomething(i, xi)
sem <- empty
} (i, xi)
}
// wait for goroutines to finish
for i := 0; i < N; i++ { <-sem }
注意闭合:i
,xi
都是作为参数传入闭合函数的,从外层循环中隐藏了变量i
和xi
。让每个协程有一份i
和xi
的拷贝;另外,for循环的下一次迭代会更新所有协程中i
和xi
的值。切片res
没有传入闭合函数,因为协程不需要单独拷贝一份。切片res
也在闭合函数中但并不是参数。
14.2.8 实现并行的for循环
在上一部分章节14.2.7的代码片段中:for循环的每一个迭代是并行完成的:
for i, v := range data {
go func (i int, v float64) {
doSomething(i, v)
...
} (i, v)
}
在for循环中并行计算迭代可能带来很好的性能提升。不过所有的迭代都必须是独立完成的。有些语言比如Fortress或者其他并行框架以不同的结构实现了这种方式,在Go中用协程实现起来非常容易:
14.2.9 用带缓冲通道实现一个信号量
信号量是实现互斥锁(排外锁)常见的同步机制,限制对资源的访问,解决读写问题,比如没有实现信号量的sync
的Go包,使用带缓冲的通道可以轻松实现:
- 带缓冲通道的容量和我们要同步的资源容量相同
- 通道的长度(当前存放的元素个数)当前资源被使用的数量相同
- 容量减去通道的长度就是未处理的资源个数(标准信号量的整数值)
不用管通道中存放的是什么,只关注长度;因此我们创建了一个有长度变量为0(字节)的通道:
type Empty interface {}
type semaphore chan Empty
将可用资源的数量N来初始化信号量semaphore
: sem = make(semaphore, N)
然后直接对信号量进行操作:
// acquire n resources
func (s semaphore) P(n int) {
e := new(Empty)
for i := 0; i < n; i++ {
s <- e
}
}
// release n resouces
func (s semaphore) V(n int) {
for i:= 0; i < n; i++{
<- s
}
}
可以用来实现一个互斥的例子:
/* mutexes */
func (s semaphore) Lock() {
s.P(1)
}
func (s semaphore) Unlock(){
s.V(1)
}
/* signal-wait */
func (s semaphore) Wait(n int) {
s.P(n)
}
func (s semaphore) Signal() {
s.V(1)
}
练习 14.5:gosum.go:用这种习惯用法写一个程序,开启一个协程来计算2个整数的合并等待计算结果并打印出来。
练习 14.6:producer_consumer.go:用这种习惯用法写一个程序,有两个协程,第一个提供数字0,10,20,...90并将他们放入通道,第二个协程从通道中读取并打印。main()
等待两个协程完成后再结束。
习惯用法:通道工厂模式
编程中常见另外一种模式如下:不将通道作为参数传递给协程,而用函数来生成一个通道并返回(工厂角色);函数内有个lambda函数被协程调用。
在channel_block2.go加入这种模式便有了示例 14.5-channel_idiom.go:
package main
import (
"fmt"
"time"
)
func main() {
stream := pump()
go suck(stream)
time.Sleep(1e9)
}
func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func suck(ch chan int) {
for {
fmt.Println(<-ch)
}
}
14.2.10 给通道使用For循环
for
循环的range
语句可以用在通道ch
上,便可以从通道中获取值,像这样:
for v := range ch {
fmt.Printf("The value is %v\n", v)
}
它从指定通道中读取数据直到通道关闭,才继续执行下边的代码。很明显,另外一个协程必须写入ch
(不然代码就阻塞在for循环了),而且必须在写入完成后才关闭。suck
函数可以这样写,且在协程中调用这个动作,程序变成了这样:
示例 14.6-channel_idiom2.go
package main
import (
"fmt"
"time"
)
func main() {
suck(pump())
time.Sleep(1e9)
}
func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func suck(ch chan int) {
go func() {
for v := range ch {
fmt.Println(v)
}
}()
}
习惯用法:通道迭代模式
这个模式用到了前边示例14.6中的模式,通常,需要从包含了地址索引字段items的容器给通道填入元素。为容器的类型定义一个方法Iter()
,返回一个只读的通道(参见章节14.2.8)items,如下:
func (c *container) Iter () <- chan items {
ch := make(chan item)
go func () {
for i:= 0; i < c.Len(); i++{ // or use a for-range loop
ch <- c.items[i]
}
} ()
return ch
}
链接
- 目录
- 上一节:并发,并行和协程
- 下一节:协程同步:关闭通道-测试阻塞的通道