介绍goroutine及其中共享资源的几种方式。
goroutine
并发(concurrency)不是并行(parallelism)。并行是让不同的代码片段同时在不同的物理处 理器上执行。并行的关键是同时做很多事情,而并发是指同时管理很多事情,这些事情可能只做 了一半就被暂停去做别的事情了。在很多情况下,并发的效果比并行好,因为操作系统和硬件的 总资源一般很少,但能支持系统同时做很多事情。这种“使用较少的资源做更多的事情”的哲学, 也是指导 Go 语言设计的哲学。
如果希望让 goroutine 并行,必须使用多于一个逻辑处理器。当有多个逻辑处理器时,调度器 会将 goroutine 平等分配到每个逻辑处理器上。这会让 goroutine 在不同的线程上运行。不过要想真 的实现并行的效果,用户需要让自己的程序运行在有多个物理处理器的机器上。否则,哪怕 Go 语言运行时使用多个线程,goroutine 依然会在同一个物理处理器上并发运行,达不到并行的效果。
如下图所示,展示了在一个逻辑处理器上并发运行goroutine和在两个逻辑处理器上并行运行两个并 发的 goroutine 之间的区别。调度器包含一些聪明的算法,这些算法会随着 Go 语言的发布被更新 和改进,所以不推荐盲目修改语言运行时对逻辑处理器的默认设置。
基于调度器的内部算法,一个正运行的 goroutine 在工作结束前,可以被停止并重新调度。 调度器这样做的目的是防止某个 goroutine 长时间占用逻辑处理器。当 goroutine 占用时间过长时, 调度器会停止当前正运行的 goroutine,并给其他可运行的 goroutine 运行的机会。
如下图所示,从逻辑处理器的角度展示了这一场景。在第 1 步,调度器开始运行 goroutine A,而 goroutine B 在运行队列里等待调度。之后,在第 2 步,调度器交换了 goroutine A 和 goroutine B。 由于 goroutine A 并没有完成工作,因此被放回到运行队列。之后,在第 3 步,goroutine B 完成 了它的工作并被系统销毁。这也让 goroutine A 继续之前的工作。
goroutine中资源竞争(共享)的三种方式,原子函数、互斥锁以及channel
原子函数
原子函数能够以底层的加锁机制来同步访问整型变量和指针。
atomic中StoreInt64,LoadInt64,AddInt64可以安全的读写整型变量。
通过atomic.AddInt64来同步整型值得加法,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
// This sample program demonstrates how to use the atomic
// package to provide safe access to numeric types.
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
// counter is a variable incremented by all goroutines.
counter int64
// wg is used to wait for the program to finish.
wg sync.WaitGroup
)
// main is the entry point for all Go programs.
func main() {
// Add a count of two, one for each goroutine.
wg.Add(2)
// Create two goroutines.
go incCounter(1)
go incCounter(2)
// Wait for the goroutines to finish.
wg.Wait()
// Display the final value.
fmt.Println("Final Counter:", counter)
}
// incCounter increments the package level counter variable.
func incCounter(id int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for count := 0; count < 2; count++ {
// Safely Add One To Counter.
atomic.AddInt64(&counter, 1)
// Yield the thread and be placed back in queue.
runtime.Gosched()
}
}
|
通过atomic.LoadInt64,atomic.StoreInt64安全的读写整型变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
// This sample program demonstrates how to use the atomic
// package functions Store and Load to provide safe access
// to numeric types.
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
// shutdown is a flag to alert running goroutines to shutdown.
shutdown int64
// wg is used to wait for the program to finish.
wg sync.WaitGroup
)
// main is the entry point for all Go programs.
func main() {
// Add a count of two, one for each goroutine.
wg.Add(2)
// Create two goroutines.
go doWork("A")
go doWork("B")
// Give the goroutines time to run.
time.Sleep(1 * time.Second)
// Safely flag it is time to shutdown.
fmt.Println("Shutdown Now")
atomic.StoreInt64(&shutdown, 1)
// Wait for the goroutines to finish.
wg.Wait()
}
// doWork simulates a goroutine performing work and
// checking the Shutdown flag to terminate early.
func doWork(name string) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for {
fmt.Printf("Doing %s Work\n", name)
time.Sleep(250 * time.Millisecond)
// Do we need to shutdown.
if atomic.LoadInt64(&shutdown) == 1 {
fmt.Printf("Shutting %s Down\n", name)
break
}
}
}
|
互斥锁
互斥锁(mutex),用于在代码上创建一个临界区,保证同一时间只有一个goroutine可以执行临界区代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
// This sample program demonstrates how to use a mutex
// to define critical sections of code that need synchronous
// access.
package main
import (
"fmt"
"runtime"
"sync"
)
var (
// counter is a variable incremented by all goroutines.
counter int
// wg is used to wait for the program to finish.
wg sync.WaitGroup
// mutex is used to define a critical section of code.
mutex sync.Mutex
)
// main is the entry point for all Go programs.
func main() {
// Add a count of two, one for each goroutine.
wg.Add(2)
// Create two goroutines.
go incCounter(1)
go incCounter(2)
// Wait for the goroutines to finish.
wg.Wait()
fmt.Printf("Final Counter: %d\n", counter)
}
// incCounter increments the package level Counter variable
// using the Mutex to synchronize and provide safe access.
func incCounter(id int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for count := 0; count < 2; count++ {
// Only allow one goroutine through this
// critical section at a time.
mutex.Lock()
{
// Capture the value of counter.
value := counter
// Yield the thread and be placed back in queue.
runtime.Gosched()
// Increment our local value of counter.
value++
// Store the value back into counter.
counter = value
}
mutex.Unlock()
// Release the lock and allow any
// waiting goroutine through.
}
}
|
在Lock()和Unlock()之间的临界区被保护起来,保证同一时刻只有一个goroutine可以进入临界区。
channel
当一个资源需要在goroutine之间共享时,channel在goroutine之间搭起了一个管道,并提供了确保同步交换数据的机制。什么通道时,需要制定将要被共享的数据的类型,可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。
1
2
3
4
5
6
7
8
9
10
11
|
// 无缓冲整型通道
unbuffered := make(chan int)
// 有缓存字符串整型
buffered := make(chan string, 10)
// 通过通道发送一个字符串
bufferer <- "Gopher"
// 从通道接受一个字符串
value := <- buffered
|
无缓存channel
无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通 道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送 和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
模拟网球比赛
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
// This sample program demonstrates how to use an unbuffered
// channel to simulate a game of tennis between two goroutines.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// wg is used to wait for the program to finish.
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
// main is the entry point for all Go programs.
func main() {
// Create an unbuffered channel.
court := make(chan int)
// Add a count of two, one for each goroutine.
wg.Add(2)
// Launch two players.
go player("Nadal", court)
go player("Djokovic", court)
// Start the set.
court <- 1
// Wait for the game to finish.
wg.Wait()
}
// player simulates a person playing the game of tennis.
func player(name string, court chan int) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()
for {
// Wait for the ball to be hit back to us.
ball, ok := <-court
if !ok {
// If the channel was closed we won.
fmt.Printf("Player %s Won\n", name)
return
}
// Pick a random number and see if we miss the ball.
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)
// Close the channel to signal we lost.
close(court)
return
}
// Display and then increment the hit count by one.
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++
// Hit the ball back to the opposing player.
court <- ball
}
}
|
模拟接力赛
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
// This sample program demonstrates how to use an unbuffered
// channel to simulate a relay race between four goroutines.
package main
import (
"fmt"
"sync"
"time"
)
// wg is used to wait for the program to finish.
var wg sync.WaitGroup
// main is the entry point for all Go programs.
func main() {
// Create an unbuffered channel.
baton := make(chan int)
// Add a count of one for the last runner.
wg.Add(1)
// First runner to his mark.
go Runner(baton)
// Start the race.
baton <- 1
// Wait for the race to finish.
wg.Wait()
}
// Runner simulates a person running in the relay race.
func Runner(baton chan int) {
var newRunner int
// Wait to receive the baton.
runner := <-baton
// Start running around the track.
fmt.Printf("Runner %d Running With Baton\n", runner)
// New runner to the line.
if runner != 4 {
newRunner = runner + 1
fmt.Printf("Runner %d To The Line\n", newRunner)
go Runner(baton)
}
// Running around the track.
time.Sleep(1000 * time.Millisecond)
// Is the race over.
if runner == 4 {
fmt.Printf("Runner %d Finished, Race Over\n", runner)
wg.Done()
return
}
// Exchange the baton for the next runner.
fmt.Printf("Runner %d Exchange With Runner %d\n",
runner,
newRunner)
baton <- newRunner
}
|
有缓存channel
有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类 型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的 条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲 区容纳被发送的值时,发送动作才会阻塞。这导致有缓冲的通道和无缓冲的通道之间的一个很大 的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的 通道没有这种保证。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
// This sample program demonstrates how to use a buffered
// channel to work on multiple tasks with a predefined number
// of goroutines.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
const (
numberGoroutines = 4 // Number of goroutines to use.
taskLoad = 10 // Amount of work to process.
)
// wg is used to wait for the program to finish.
var wg sync.WaitGroup
// init is called to initialize the package by the
// Go runtime prior to any other code being executed.
func init() {
// Seed the random number generator.
// 初始化随机数种子
rand.Seed(time.Now().Unix())
}
// main is the entry point for all Go programs.
func main() {
// Create a buffered channel to manage the task load.
tasks := make(chan string, taskLoad)
// Launch goroutines to handle the work.
wg.Add(numberGoroutines)
for gr := 1; gr <= numberGoroutines; gr++ {
go worker(tasks, gr)
}
// Add a bunch of work to get done.
for post := 1; post <= taskLoad; post++ {
tasks <- fmt.Sprintf("Task : %d", post)
}
// Close the channel so the goroutines will quit
// when all the work is done.
close(tasks)
// Wait for all the work to get done.
wg.Wait()
}
// worker is launched as a goroutine to process work from
// the buffered channel.
func worker(tasks chan string, worker int) {
// Report that we just returned.
defer wg.Done()
for {
// Wait for work to be assigned.
task, ok := <-tasks
if !ok {
// This means the channel is empty and closed.
fmt.Printf("Worker: %d : Shutting Down\n", worker)
return
}
// Display we are starting the work.
fmt.Printf("Worker: %d : Started %s\n", worker, task)
// Randomly wait to simulate work time.
sleep := rand.Int63n(100)
time.Sleep(time.Duration(sleep) * time.Millisecond)
// Display we finished the work.
fmt.Printf("Worker: %d : Completed %s\n", worker, task)
}
}
|
当通道关闭后,goroutine 依旧可以从通道接收数据, 但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值。如果在获取通道时还加入了可选的标志,就能得到通道的状态信息。
- 并发是指 goroutine 运行的时候是相互独立的。
- 使用关键字 go创建 goroutine 来运行函数。
- goroutine 在逻辑处理器上执行,而逻辑处理器具有独立的系统线程和运行队列。
- 竞争状态是指两个或者多个 goroutine 试图访问同一个资源。
- 原子函数和互斥锁提供了一种防止出现竞争状态的办法。
- 通道提供了一种在两个 goroutine 之间共享数据的简单方法。
- 无缓冲的通道保证同时交换数据,而有缓冲的通道不做这种保证。