Golang-5-并发

Parts of A Tour Of Go

Go程__goroutine

Go程是由Go运行时管理的轻量级线程。

1
2
3
4
5
// 会启动一个新的Go程并执行 f(x, y, z)
// f, x, y, z 的求值会发生当前的Go程中
// 而 f 的执行会发生在新的Go程中
// 这种处理方法和Erlang相同
go f(x, y, z)

Go程在相同的地址空间中运行,因此在访问共享内存时必须进行同步。
sync包提供了这种能力,不过并不常用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
// 启动一个新的Go程
go say("World")
// 在当前Go程执行 func say
say("Hello")
}

信道channel

信道是带有类型的管道
信道使用信道操作符<-来发送值或接收值
<-箭头指向就是数据流的方向

1
2
3
4
5
6
// 信道在使用前必须创建
ch := make(chan int)
// 将 v 发送至信道 ch
ch <- v
// 从 ch 接收值并赋予 v
v := <- ch

默认情况下,发送和接收操作在另一端准备好之前都会阻塞(同步调用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
// 将结果发送至信道c
c <- sum
}
func main() {
s := []int{7, 2, 8, 9, 3, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
// 从信道c中接收
x, y := <-c, <-c
// 17 12 29
fmt.Println(x, y, x+y)
}

信道缓冲

信道是可以带缓冲的
将缓冲长度作为第二个参数提供给make来初始化一个带缓冲的信道

1
ch := make(chan int, 100)

仅当信道的缓冲区填满后,向其发送数据时才会阻塞(等待数据取出)
当缓冲区为空时,接收方会阻塞(等待数据到达)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main
import "fmt"
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
// 1
fmt.Println(<-ch)
// 2
fmt.Println(<-ch)
ch <- 1
ch <- 2
// fatal error: all goroutines are asleep - deadlock!
ch <- 3
}

rangeclose

发送者可通过close关闭一个信道来表示没有需要发送的值了

1
2
3
// 接收者可以通过为接收表达式分配第二个参数来测试信道是否被关闭
// 若没有值可以接收且信道已被关闭,ok == false
v, ok <- ch

循环for i := range ch会不断从信道接收值,直到ch被关闭
注意:

  • 只有发送者才能关闭信道,而接收者不能
  • 向一个已经关闭的信道发送数据会报错panic
  • 信道与文件不同,通常情况下无需关闭它们
  • 只有在必须告诉接收者不再有值需要发送的时候才有必要关闭,例如终止range循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main
import "fmt"
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}
func main() {
c := make(chan int, 9)
go fibonacci(cap(c), c)
for i := range c {
fmt.Println(i)
}
}
// 0
// 1
// 1
// 2
// 3
// 5
// 8
// 13
// 21

select语句

select语句使一个Go程可以等待多个通信操作
select会阻塞到某个分支可以继续执行为止,并执行该分支
当多个分支都准备好时会随机选择一个执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main
import "fmt"
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
f := func() {
for i := 0; i < 9; i++ {
fmt.Println(<-c)
}
quit <- 0
}
go f()
fibonacci(c, quit)
}
// 0
// 1
// 1
// 2
// 3
// 5
// 8
// 13
// 21
// quit

select默认选择

select中的其它分支都没有准备好时,default分支就会执行
为了在尝试发送或接受时不发生阻塞,可使用default分支

1
2
3
4
5
6
select {
case i := <- c:
// 使用 i
default:
// 从 c 中接收会阻塞时执行
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(600 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}
// .
// .
// tick
// .
// .
// tick
// .
// .
// tick
// .
// .
// tick
// .
// .
// tick
// .
// .
// BOOM!

练习: 等价二叉树

1
2
3
4
5
6
// tree 包
type Tree struct {
Left *Tree
Value int
Right *Tree
}

函数tree.New(k)用于构造一个随机结构的二叉树,保存了值(k,2k,3k…10k)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main
import (
"fmt"
"golang.org/x/tour/tree"
)
// Walk 步进 tree t 将所有的值从 tree 发送到 channel ch。
func Walk(t *tree.Tree, ch chan int) {
_walk(t, ch)
close(ch)
}
func _walk(t *tree.Tree, ch chan int) {
if t != nil {
_walk(t.Left, ch)
ch <- t.Value
_walk(t.Right, ch)
}
}
// Same 检测树 t1 和 t2 是否含有相同的值。
func Same(t1, t2 *tree.Tree) bool {
ch1 := make(chan int)
ch2 := make(chan int)
go Walk(t1, ch1)
go Walk(t2, ch2)
for i := range ch1 {
if i != <-ch2 {
return false
}
}
return true
}
func main() {
ch := make(chan int, 10)
go Walk(tree.New(1), ch)
for v := range ch {
fmt.Println(v)
}
fmt.Println(Same(tree.New(1), tree.New(1)))
fmt.Println(Same(tree.New(1), tree.New(2)))
}

sync.Mutex互斥锁

信道非常适合在各个Go程间进行通信
要实现每次只有一个Go程能访问一个共享的变量,从而避免冲突,可以使用互斥锁
Go标准库提供了sync.Mutex互斥锁类型 和 Lock Unlock两个方法
可以在代码前调用Lock方法,在代码后调用Unlock方法来保证一段代码的互斥执行
也可以用defer语句来保证互斥锁一定会被解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter 的并发使用是安全的。
type SafeCounter struct {
v map[string]int
mux sync.Mutex
}
// Inc 增加给定 key 的计数器的值。
func (c *SafeCounter) Inc(key string) {
c.mux.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
c.v[key]++
c.mux.Unlock()
}
// Value 返回给定 key 的计数器的当前值。
func (c *SafeCounter) Value(key string) int {
c.mux.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
defer c.mux.Unlock()
return c.v[key]
}
func main() {
c := SafeCounter{v: make(map[string]int)}
for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}
time.Sleep(time.Second)
fmt.Println(c.Value("somekey"))
}

练习: Web爬虫

使用Go的并发特性来并行化一个Web爬虫
修改Crawl函数来并行抓取URL,并保证不重复
可以用一个map来缓存已经获取的URL,但是map本身并不是并发安全的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main
import (
"fmt"
"sync"
)
type Fetcher interface {
// Fetch 返回 URL 的 body 内容,并且将在这个页面上找到的 URL 放到一个 slice 中。
Fetch(url string) (body string, urls []string, err error)
}
// Crawl 使用 fetcher 从某个 URL 开始递归的爬取页面,直到达到最大深度。
func Crawl(url string, depth int, fetcher Fetcher, out chan string, end chan bool) {
// // TODO: 并行的抓取 URL。
// // TODO: 不重复抓取页面。
// // 下面并没有实现上面两种情况:
// if depth <= 0 {
// return
// }
// body, urls, err := fetcher.Fetch(url)
// if err != nil {
// fmt.Println(err)
// return
// }
// fmt.Printf("found: %s %q\n", url, body)
// for _, u := range urls {
// Crawl(u, depth-1, fetcher)
// }
// return
if depth <= 0 {
end <- true
return
}
if _, ok := crawled[url]; ok {
end <- true
return
}
crawledMutex.Lock()
crawled[url] = true
crawledMutex.Unlock()
body, urls, err := fetcher.Fetch(url)
if err != nil {
out <- fmt.Sprintln(err)
end <- true
return
}
out <- fmt.Sprintf("found: %s %q\n", url, body)
subEnd := make(chan bool)
for _, u := range urls {
go Crawl(u, depth-1, fetcher, out, subEnd)
}
for i := 0; i < len(urls); i++ {
<-subEnd
}
end <- true
}
var crawled = make(map[string]bool)
var crawledMutex sync.Mutex
func main() {
out := make(chan string)
end := make(chan bool)
go Crawl("http://golang.org/", 4, fetcher, out, end)
for {
select {
case t := <-out:
fmt.Print(t)
case <-end:
return
}
}
}
// fakeFetcher 是返回若干结果的 Fetcher。
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher 是填充后的 fakeFetcher。
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}