EffectiveGo-6-并发、错误

Parts of Effective Go

Concurrency

通过通信共享内存

Go将共享的值通过信道传递
多个独立执行的线程从不会主动共享
在任意给定的时间点,只有一个Go程能访问该值
数据竞争从设计上就被杜绝了
Do not communicate by sharing memory; instead, share memory by communicating
不要通过共享内存来通信,而应该通过通信来共享内存。

当在典型的单线程运行在单CPU上的时候,无需使用同步语句
当多个线程进行通信时,若通信过程是同步的,也就完全不需要其它同步了
这种设计模型完美契合Unix管道

Go Routines

Go程是与其它Go程并发运行在同一地址空间的函数
Go程很廉价
Go程在多线程操作系统上可实现多路复用,因此若一个线程阻塞,比如等待I/O,那么其它线程就会运行
Go程的设计隐藏了线程创建和管理的诸多复杂性
在函数或方法前添加go关键字能够在新的Go程中调用它
当调用完成后,该Go程也会安静地退出(类似于Unix Shell中的&符号,能让命令在后台运行)

1
2
3
4
5
6
7
8
9
// 并发运行,无需等它结束
go list.Sort()
func Announce(message string, delay time.Duration) {
go func() {
time.Sleep(delay)
fmt.Println(message)
}() // 注意括号
}

Go中,函数是闭包
保证了函数内引用变量的生命周期与函数的活动时间相同

Channels

信道与映射一样,需要通过make来分配内存
其结果值是对底层数据结构的引用
若提供一个可选的整数形参,可为该信道设置缓冲区大小
默认值是零,表示不带缓冲的或者同步的信道
无缓冲信道在通信时会同步交换数据————同步通信

1
2
3
4
5
6
7
8
9
10
11
12
ci := make(chan int) // 整数类型的无缓冲信道
cj := make(chan int, 0) // 整数类型的无缓冲信道
cs := make(chan *os.File, 100) // 指向文件指针的带缓冲信道
c := make(chan int) // 分配一个信道
// 在 Go 程中启动排序 当排序完成后 在信道上发送信号
go func() {
list.Sort()
c <- 1 // 发送信号
}()
doSomethingForAWhile()
<-c // 等待排序结束,丢弃发来的值

接收者在收到数据之前会一直阻塞
若信道是不带缓冲的,那么在接收者收到值前,发送者也会一直阻塞
若信道是带缓冲的,则发送者仅会在值被复制到缓冲区前阻塞(若缓冲区满,发送者会一直等待直到某个接收者取出一个值为止)
数据同步发生在信道的接收端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 带缓冲的信道可被用作信号量
// 例如限制吞吐量
var sem = make(chan int, MaxOutstanding)
func handle(r *Request) {
sem <- 1 // 等待缓冲区空位
process(r) // 执行
<-sem // 执行完成后释放
}
// 如果大量调用Serve
// 会创建多个被阻塞的 go程
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req)
}
}
// 解决上例中的问题
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
// req变量会在所有Go程间共享
process(req)
<-sem
}()
}
}
// 解决上例中的问题
// 为了确保req对于每个Go程都是唯一的
// 将req的值作为实参传入Go程的闭包中
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<-sem
}
}
}
// 另一种写法
// 以相同名字创建新的变量
func Serve(queue chan *Request) {
for req := range queue {
// 为Go程创建req的新实例
req := req
sem <- 1
go func() {
process(req)
<-sem
}()
}
}
// 用相同名字获得该变量的一个新的版本
// 局部地刻意屏蔽循环变量
// 使它对每个Go程保持唯一

管理资源的另一个好方法就是启动固定数量的handle``Go程,一起从请求信道中读取数据
Go程的数量限制了同时调用process的数量
Serve同样会接收一个通知退出的信道,在启动所有Go程后,它将阻塞并暂停从信道中接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func handle(queue chan *Request) {
for r := range queue {
process(r)
}
}
func Server(clientRequests chan *Request, quit chan bool) {
// start
for i := 0; i < MaxOutStanding; i++ {
go handle(clientRequests)
}
// Wait to be told to exit
<-quit
}

信道中的信道Channels of channels

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Request struct {
args []int
f func([]int) int
resultChan chan int
}
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}
request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Send request
clientRequests <- request
// Wait for response
fmt.Printf("answer: %d\n", <-request.resultChan)
// server
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}

并行化 Parallelization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 在多核CPU上实现并行计算
type Vector []float64
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
for ; i < n; i++ {
v[i] += u.Op(v[i])
}
// 发送信号表示计算完成
c <- 1
}
// CPU核数
const NCPU = 4
func (v Vector) DoAll(u Vector) {
c := make(chan int, NCPU)
for i := 0; i < NCPU; i++ {
go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c)
}
for i := 0; i < NCPU; i++ {
<-c
}
}

Go运行时默认并不会并行执行代码,它只为用户层代码提供单一的处理核心。任意数量的Go程都可能在系统调用中被阻塞,而在任意时刻默认只有一个会执行的用户层代码。
若希望CPU并行执行,就必须告诉系统,你希望同时有多少Go程能执行代码:

  • 在运行时将GOMAXPROCS环境变量设为想要使用的核心数
  • 导入runtime包并调用runtime.GOMAXPROCS(NCPU),调用runtime.NumCPU()会返回当前机器的逻辑CPU核心数

并发是可独立执行的组件构造程序的方法
structuring a program as independently executing components

并行是为了效率在多CPU上平行进行计算
executing calculations in parallel for efficiency on multiple CPUs

尽管Go的并发特性能够让某些问题更容易构造成并行计算,但Go仍然是种并发而非并行的语言,且Go的模型并不适合所有并行问题

可能泄漏的缓冲区 A leaky buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 并发编程工具很容易表达非并发的思想
// RPC包中的栗子
var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)
// On Client
func Client() {
for {
var b *Buffer
// 若缓冲区可用就用它
// 不可用就分配个新的
select {
case b = <-freeList:
// Got one
// Nothing more to do
default:
// None free
// So allocate a new one
b = new(Buffer)
}
// Read next message from the net
load(b)
// Send to server
serverChan <- b
}
}
// On Server
func server() {
for {
// 等待工作
b := <-serverChan
process(b)
select {
case freeList <- b:
// Buffer on free list
// Nothing more to do
default:
// Free list full
// Just carry on
}
}
}

客户端试图从freeList中获取缓冲区,若没有缓冲区可用,就分配个新的。服务端将b放回空闲列表freeList中直到列表已满,此时缓冲区将被丢弃,并被垃圾回收器回收。
select语句中的default子句在没有条件符合时执行,也就意味着selects永远不会被阻塞。
构建一个可能导致缓冲区槽位泄漏的空闲列表,只依靠带缓冲的信道和垃圾回收器的记录。

Errors

调用库通常会向调用者返回某种类型的错误提示
Go的多值返回特性,使得能在返回结果值的同时,还能返回详细的错误描述
按照约定,错误的类型通常为error,这是一个内建的接口

1
2
3
type error interface {
Error() string
}

库的编写者通过更丰富的底层模型可以实现这个接口,使得不仅能看见错误,还能提供上下文

1
2
3
4
5
6
7
8
9
type PathError struct {
Op string
Path string
Err error
}
func (e *PathError) Error() string {
return e.Op + " " + e.Path + " " + e.Err.Error()
}

PathErrorError会生成如下的错误信息
open /etc/passwx: no such file or directory
这种错误类型包含了出错的文件名、操作和触发的操作系统错误
错误字符串应尽可能地指明它们的来源,例如产生该错误的包名前缀
若调用者关心错误的完整细节,可使用类型选择或者类型断言来查看特定的错误,并抽取其中的细节
对于PathErrors,它应该还包含检查内部的Err字段以进行可能的错误恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for try := 0; try < 2; try++ {
file, err = os.Create(filename)
if err == nil {
return
}
// 类型断言
// 若失败 ok == false e == nil
// 若成功 ok == true
// 这意味着错误属于 *os.PathError 类型
// 而e能够检测关于该错误的更多信息
if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
// 恢复操作
deleteTemlFiles()
continue
}
return
}

Panic

向调用者报告错误的一般方式就是将error作为额外的值返回
标准的Read方法会返回一个字节流和一个error
如果错误不可恢复,程序就不能继续运行
内建的panic函数,会产生一个运行时的错误并终止程序
panic函数接受一个任意类型的实参(一般为字符串),并在程序终止时打印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 用牛顿法实现立方根计算
func CubeRoot(x float64) float64 {
z := x/3
for i := 0; i < 1e6; i++ {
prevz := z
z -= (z*z*z-x) / (3*z*z)
if veryClose(z, prez) {
return z
}
}
// 1e6次迭代并未收敛
// 抛出
panic(fmt.Spruntf("CubeRoot(%g) did not converge"), x)
}

在实际的库函数中应避免panic
若问题可以被屏蔽或解决,最好让程序继续运行而不是终止整个程序

一个反例就是初始化:若某个库真的不能让自己工作,且有足够的理由产生panic,就由它去吧

1
2
3
4
5
6
7
var user = os.Getenv("USER")
func init() {
if user == "" {
panic("no value for $USER")
}
}

Recover

panic被调用后,程序将立刻终止当前函数的执行,并开始回溯Go程的栈,运行任何被推迟的函数。若回溯到达Go程栈的顶端,程序就会终止。
可以使用内建的recover函数来重新获取Go程的控制权限并使其恢复正常执行。
调用recover将停止回溯过程,并返回传入panic的实参。
由于回溯时只有被推迟的函数中的代码在运行,因此recover只能在被推迟的函数中才有效。
recover的一个应用就是在服务器中终止失败的Go程而无需杀死其它正在执行的Go程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work)
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Println("work failed: ", err)
}
}()
do(work)
}

do(work)触发了panic,其结果就会被记录,而该Go程会被干净利落地结束,不会干涉到其它Go程
无需在推迟的闭包中做任何事情,recover函数会处理好一切。
由于直接从被推迟的函数中调用recover时不会返回nil,因此被推迟的代码能够调用本身使用了panicrecover的库函数而不会失败。
例如在safelyDo中,被推迟的函数可能在调用recover函数之前先调用了记录函数,而该记录函数应当不受panic状态的代码的影响。
通过恰当地使用恢复模式,do函数可通过调用panic来避免更坏的结果。
可以利用这种思想来简化复杂软件中的错误处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// regexp包中的理想化版本
// 以局部的错误类型调用panic来报告解析错误
// Error是解析错误的类型
// 它满足error接口
type Error string
func (e Error) Error() string {
return string(e)
}
// error是 *Regexp 的方法
// 通过用一个 Error 触发 Panic 来报告解析错误
func (regexp *Regexp) error(err string) {
panic(Error(err))
}
// Compile 返回该正则表达式解析后的结果
func Compile(str string) (regexp *Regexp, err error) {
regexp = new(Regexp)
defer func() {
if e := recover(); e != nil {
regexp = nil
err = e.(Error)
}
}()
return regexp.doParse(str), nil
}

doParse触发了Panic,恢复块会将返回值设为nil,被推迟的函数能够修改已命名的返回值。
err赋值过程中,可以通过断言err是否拥有局部类型Error来检查它。若没有,类型断言将会失败,此时会产生运行时错误,并继续栈的回溯,就像一切从未中断过一样。
该检查意味着若发生了一些像索引越界之类的意外,那么即便使用了panicrecover来处理解析错误,代码仍然会失败。
通过适当的错误处理,error方法能让报告解析错误变得更容易,从而无需手动处理回溯的解析栈
error方法是一个绑定到具体类型的方法,因此即便它与内建的error类型名字相同也没有关系

1
2
3
if pos == 0 {
re.error("'*' illegal at start of expression")
}

尽管这种模式很有用,但它应当仅在包内使用。
Parse会将其内部的panic调用转为error值,并不会向调用者暴露出panic
这种重新触发panic的惯用法会在产生实际错误时改变panic的值,然而,不管是原始的还是新的错误都会在崩溃报告中显示,因此问题的根源仍然是可见的。