Golang进阶——并发编程

并发编程

如果想对Go并发编程进一步了解的话,推荐去写一个Go的秒杀系统进行实战演练。

并发介绍

进程和线程

  • 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位
  • 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位
  • 一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行

并发和并行

  • 并发:多线程程序在一个核的cpu上运行
  • 并行:多线程程序在多个核的cpu上运行

并发不是并行:

  • 并发:主要由切换时间片来实现同时运行
  • 并行:直接利用多核实现多线程的运行

go可以设置使用核数,以发挥多核计算机的能力。

线程和协程

  • 线程:一个线程上可以跑多个协程协程是轻量级的线程
  • 协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的

Goroutine

goroutine是由官方实现的超级线程池。每个实例4~5KB的栈内存占用由于实现机制而大幅减少的创建和销毁开销go高并发的根本原因

goroutine 奉行通过通信来共享内存,而不是共享内存来通信。

在java/c++中我们要实现并发编程的时候,我们通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,同时需要自己去调度线程执行任务并维护上下文切换,这一切通常会耗费程序员大量的心智。那么能不能有一种机制,程序员只需要定义很多个任务,让系统去帮助我们把这些任务分配到CPU上实现并发执行呢?

Go语言中的goroutine就是这样一种机制,goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制

在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能:goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,就是这么简单粗暴。

使用goroutine

Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。

一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。

启动单个goroutine

启动goroutine的方式非常简单,只需要在调用的函数(普通函数和匿名函数)前面加上一个go关键字。

示例:先看以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import "fmt"

func main() {
hello()
fmt.Println("main goroutine done!")
}

func hello() {
fmt.Println("Hello Goroutine!")
}

运行结果为:

可以看到在这个示例中hello函数和下面的语句是串行的,执行的结果是打印完Hello Goroutine!后打印main goroutine done!

接下来我们在调用hello函数前面加上关键字go,也就是启动一个goroutine去执行hello这个函数

1
2
3
4
5
6
7
8
9
10
11
12
package main

import "fmt"

func main() {
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
}

func hello() {
fmt.Println("Hello Goroutine!")
}

运行结果如下:

这一次的执行结果只打印了main goroutine done!,并没有打印Hello Goroutine!。这是因为:

在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束

所以我们要想办法让main函数等一等hello函数,最简单粗暴的方式就是time.Sleep了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"fmt"
"time"
)

func main() {
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
time.Sleep(time.Second * 2)
}

func hello() {
fmt.Println("Hello Goroutine!")
}

结果如下:

可以发现是先打印main goroutine done!,这是因为我们在创建新的goroutine的时候需要花费一些时间,而此时main函数所在的goroutine是继续执行的。

启动多个goroutine

在Go语言中实现并发就是这样简单,我们还可以启动多个goroutine。让我们再来一个例子: (这里使用了sync.WaitGroup实现goroutine的同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"sync"
)

var wg sync.WaitGroup

func main() {
for i := 0; i < 5; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
wg.Wait() // 等待所有登记的goroutine都结束
}

func hello(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello Goroutine!", i)
}

结果如下:

多次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是因为5个goroutine是并发执行的,而goroutine的调度是随机的。

注意:如果主协程退出了,其他任务还执行吗?看一下如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"time"
)

func main() {
// 使用匿名函数
go func() {
i := 0
for true {
i++
fmt.Printf("new goroutine: i = %v\n", i)
time.Sleep(time.Second)
}
}()
i := 0
for true {
i++
fmt.Printf("new goroutine: i = %v\n", i)
time.Sleep(time.Second)
if i == 2 {
break
}
}
}

通过运行结果可以看到:如果主协程退出了,其他任务就不会执行了。(因为我们上面已经提到过了,当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束。)

goroutine与线程

Go语言中的操作系统线程和goroutine的关系:

  • 一个操作系统线程对应用户态多个goroutine。
  • go程序可以同时使用多个操作系统线程。
  • goroutine和OS线程是多对多的关系,即m:n。

可增长的栈

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB)goroutine的栈不是固定的,它可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

goroutine调度(GPM)

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

  • G很好理解,就是个goroutine的,里面除了存放本goroutine信息外还有与所在P的绑定等信息
  • P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等),当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务
  • M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的

P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。

P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

runtime包

知乎:GO语言基础进阶教程:runtime包

runtime包

官网文档对runtime包的介绍:

Package runtime contains operations that interact with Go’s runtime system, such as functions to control goroutines. It also includes the low-level type information used by the reflect package; see reflect’s documentation for the programmable interface to the run-time type system.

尽管 Go 编译器产生的是本地可执行代码,这些代码仍旧运行在 Go 的 runtime(这部分的代码可以在 runtime 包中找到)当中。这个 runtime 类似 Java 和 .NET 语言所用到的虚拟机,它负责管理包括内存分配、垃圾回收、栈处理、goroutine、channel、切片(slice)、map 和反射(reflection)等等。

runtime包的常用函数:

  • NumCPU():返回当前系统的 CPU 核数量
  • GOMAXPROCS():设置最大的可同时使用的 CPU 核数
    通过runtime.GOMAXPROCS函数,应用程序何以在运行期间设置运行时系统中得P最大数量。但这会引起“Stop the World”。所以,应在应用程序最早的调用。并且最好是在运行Go程序之前设置好操作程序的环境变量GOMAXPROCS,而不是在程序中调用runtime.GOMAXPROCS函数。
    无论我们传递给函数的整数值是什么值,运行时系统的P最大值总会在1~256之间。

go1.8后,默认让程序运行在多个核上,可以不用设置了
go1.8前,还是要设置一下,可以更高效的利益cpu

  • Gosched():让当前线程让出 cpu 以让其它线程运行,它不会挂起当前线程,因此当前线程未来会继续执行。
    这个函数的作用是让当前 goroutine 让出 CPU,当一个 goroutine 发生阻塞,Go 会自动地把与该 goroutine 处于同一系统线程的其他 goroutine 转移到另一个系统线程上去,以使这些 goroutine 不阻塞。
  • Goexit():退出当前 goroutine(但是defer语句会照常执行)
  • NumGoroutine():返回正在执行和排队的任务总数
    runtime.NumGoroutine函数在被调用后,会返回系统中的处于特定状态的Goroutine的数量。这里的特指是指Grunnable\Gruning\Gsyscall\Gwaition。处于这些状态的Groutine即被看做是活跃的或者说正在被调度。
    注意:垃圾回收所在Groutine的状态也处于这个范围内的话,也会被纳入该计数器。
  • GOOS():目标操作系统
  • GC():runtime.GC()会让运行时系统进行一次强制性的垃圾收集
    1.强制的垃圾回收:不管怎样,都要进行的垃圾回收。2.非强制的垃圾回收:只会在一定条件下进行的垃圾回收(即运行时,系统自上次垃圾回收之后新申请的堆内存的单元(也成为单元增量)达到指定的数值)。
  • GOROOT():获取goroot目录
  • GOOS() : 查看目标操作系统很多时候,我们会根据平台的不同实现不同的操作,就而已用GOOS了:

runtime.Gosched()

runtime.Gosched():让出CPU时间片,重新等待安排任务。

(大概意思就是本来计划的好好的周末出去烧烤,但是你妈让你去相亲,两种情况:第一就是你相亲速度非常快,见面就黄不耽误你继续烧烤;第二种情况就是你相亲速度特别慢,见面就是你侬我侬的,耽误了烧烤,但是相亲结束后还得去烧烤)

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"fmt"
"runtime"
)

func main() {
go func(s string) {
for i := 0; i < 2; i++ {
fmt.Println(s)
}
}("新开的协程")
// 主协程
for i := 0; i < 2; i++ {
runtime.Gosched() // 切一下,再次分配任务
fmt.Println("主协程")
}
}

结果为:

如果没有runtime.Gosched(),主协程跑完了就不会再执行我们新开的协程了。输出结果可能会没有“新开的协程”。

runtime.Goexit()

runtime.Goexit():退出当前协程。

(一边烧烤一边相亲,突然发现相亲对象太丑影响烧烤,果断让她滚蛋,然后也就没有然后了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"runtime"
"sync"
)

var wg sync.WaitGroup

func main() {
wg.Add(1)
go func() {
defer wg.Done()
defer fmt.Println("A.defer")
func() {
defer fmt.Println("B.defer")
runtime.Goexit() // 结束协程
defer fmt.Println("C.defer")
fmt.Println("B")
}()
fmt.Println("A")
}()
wg.Wait()
}

运行结果为:

可以看到结束协程之后,我们新开启的协程就结束了,后续的defer和Println都不起作用了。

runtime.GOMAXPROCS()

runtime.GOMAXPROCS():Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数

Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。

我们可以通过将任务分配到不同的CPU逻辑核心上实现并行的效果,这里举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"runtime"
"sync"
)

var wg sync.WaitGroup

func main() {
runtime.GOMAXPROCS(1)
wg.Add(2)
go a()
go b()
wg.Wait()

}

func a() {
defer wg.Done()
for i := 1; i < 11; i++ {
fmt.Println("a", 1)
}
}

func b() {
defer wg.Done()
for i := 1; i < 11; i++ {
fmt.Println("b", 1)
}
}

设置runtime.GOMAXPROCS(1),此时两个任务只有一个逻辑核心,结果是做完一个任务再做另一个任务

如果 将逻辑核心数设为2,runtime.GOMAXPROCS(2)。此时两个任务并行执行。

Channel

Channel

channel类型

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循**先入先出(First In First Out)**的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

声明channel

channel是一种类型,一种引用类型。声明通道类型的格式如下:

1
var 变量 chan 元素类型

举几个例子:

1
2
3
var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

创建channel

channel是引用类型,通道类型的空值是nil

1
2
var ch chan int
fmt.Println(ch) // <nil>

声明的通道后需要使用make函数初始化之后才能使用。

创建channel的格式如下:

1
make(chan 元素类型, [缓冲大小])
  • channel的缓冲大小是可选的

举几个例子:

1
2
3
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

操作channel

通道channel有发送(send)、接收(receive)和关闭(close)三种操作。发送和接收都使用<-符号。

现在我们先使用以下语句定义一个通道:

1
ch := make(chan int)

发送

将一个值发送到通道中。

1
ch <- 10 // 把10发送到ch中

接收

从一个通道中接收值。

1
2
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果

关闭

我们通过调用内置的close函数来关闭通道。

1
close(ch)

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的

关闭后的通道有以下特点:

  • 对一个关闭的通道再发送值就会导致panic
  • 对一个关闭的通道进行接收会一直获取值直到通道为空
  • 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值
  • 关闭一个已经关闭的通道会导致panic

无缓冲的channel

无缓冲的通道又称为阻塞的通道。

我们来看一下下面的代码:

1
2
3
4
5
6
7
8
9
10
11
package main

import (
"fmt"
)

func main() {
c := make(chan int)
c <- 10
fmt.Println("hello")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

为什么会出现deadlock错误呢?因为我们使用c := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。简单来说就是无缓冲的通道必须有接收才能发送。

上面的代码会阻塞在c <- 10这一行代码形成死锁,那如何解决这个问题呢?

一种方法是启用一个goroutine去接收值,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"sync"
)

var wg sync.WaitGroup

func main() {
c := make(chan int)
wg.Add(1)
go recv(c) // 启用goroutine从通道接收值
c <- 10
fmt.Println("hello")
wg.Wait()
}

func recv(c chan int) {
defer wg.Done()
i := <-c
fmt.Println("recv:", i)
}

注意go recv(c)c <- 10的顺序。如果c <- 10在前面,同样会报deadlock的错误。

1
2
c <- 10
go recv(c) // 启用goroutine从通道接收值

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值

有缓冲的channel

解决上面deadlock问题的方法还有一种就是使用有缓冲区的通道。

我们可以在使用make函数初始化通道的时候为其指定通道的容量,例如:

1
2
3
4
5
6
7
8
9
10
11
package main

import (
"fmt"
)

func main() {
c := make(chan int,1)
c <- 10
fmt.Println("hello")
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。

我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。

close()

可以通过内置的close()函数关闭channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import "fmt"

func main() {
c := make(chan int)
go func() {
defer close(c)
for i := 0; i < 5; i++ {
c <- i
}
}()
for {
data, ok := <-c // 通道关闭后再取值ok=false
if ok {
fmt.Println(data)
} else {
break
}
}
fmt.Println("main结束")
}

如何优雅的从channel循环取值(如何判断channel是否关闭)

先说结论。有两种方式在接收值的时候判断通道是否被关闭,一种是data, ok := <-c来判断ok;一种是直接for range。我们通常使用的是for range的方式。

看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import "fmt"

func main() {
c1 := make(chan int)
c2 := make(chan int)
go func() { // 开启goroutine将0~9的数发送到c1中
defer close(c1)
for i := 0; i < 10; i++ {
c1 <- i
}
}()
go func() { // 开启goroutine从c1中接收值,并将该值的平方发送到c2中
defer close(c2)
for true {
data, ok := <-c1 // 通道关闭后再取值ok=false
if !ok {
break
}
c2 <- data * data
}
}()
// 在主goroutine中从ch2中接收值打印
for i := range c2 { // 通道关闭后会退出for range循环
fmt.Println(i)
}
}

单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收

看个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import "fmt"

func main() {
c1 := make(chan int)
c2 := make(chan int)
go counter(c1)
go squarer(c2, c1)
printer(c2)
}

func counter(out chan<- int) {
defer close(out)
for i := 0; i < 10; i++ {
out <- i
}
}

func squarer(out chan<- int, in <-chan int) {
defer close(out)
for i := range in {
out <- i * i
}
}

func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}

其中:

  • chan<- int是一个只能发送的通道,可以发送但是不能接收;
  • <-chan int是一个只能接收的通道,可以接收但是不能发送。

在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。

通道异常

channel常见的异常总结,如下图:

注意:关闭已经关闭的channel也会引发panic

Goroutine池

强烈推荐这篇文章,讲了许多操作系统知识:GMP 并发调度器深度解析之手撸一个高性能 goroutine pool

自己写一个goroutine池玩玩

Goroutine池

  • 本质上是生产者消费者模型
  • 可以有效控制goroutine数量,防止暴涨
  • 需求:
    • 计算一个数字的各个位数之和,例如数字123,结果为1+2+3=6
    • 随机生成数字进行计算

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
"fmt"
"math/rand"
)

type Job struct {
Id int // id
RandNum int // 需要计算的随机数
}

type Result struct {
job *Job // 这里必须传对象实例
sum int // 求出的和
}

func main() {
// 需要2个管道
jobChan := make(chan *Job, 128) // 1.job管道
resultChan := make(chan *Result, 128) // 2.结果管道
createPool(64, jobChan, resultChan) // 3.创建工作池
// 4.开个打印的协程
go func(resultChan chan *Result) {
for result := range resultChan {
fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
result.job.RandNum, result.sum)
}
}(resultChan)
var id int
// 循环创建job,输入到jobChan
for true {
id++
randNum := rand.Int()
job := &Job{
Id: id,
RandNum: randNum,
}
jobChan <- job
}
}

// 创建工作池(num代表开几个协程)
func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
// 根据开的协程个数运行
for i := 0; i < num; i++ {
go func(jobChan chan *Job, resultChan chan *Result) { // 执行运算
// 遍历job管道所有数据,进行相加
for job := range jobChan {
randNum := job.RandNum // 随机数接过来
// 随机数每一位相加
var sum int
for randNum != 0 {
temp := randNum % 10
sum = sum + temp
randNum = randNum / 10
}
// 结果封装到Result中
r := &Result{
job: job,
sum: sum,
}
resultChan <- r // 运算结果扔到管道
}

}(jobChan, resultChan)
}
}

定时器

定时器

  • Timer:时间到了,执行只执行1次
  • Ticker:时间到了,多次执行

Timer

Timer:时间到了,执行且只执行1次。

  1. timer基本使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    package main

    import (
    "fmt"
    "time"
    )

    func main() {
    timer := time.NewTimer(time.Second * 2)
    t1 := time.Now()
    fmt.Printf("t1:%v\n", t1) // t1:2022-06-16 11:53:28.4475827 +0800 CST m=+0.004452101
    t2 := <-timer.C
    fmt.Printf("t2:%v\n", t2) // t2:2022-06-16 11:53:30.4493612 +0800 CST m=+2.006230601
    }
  2. 验证timer只能响应一次

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    package main

    import (
    "fmt"
    "time"
    )

    func main() {
    timer := time.NewTimer(time.Second * 1)
    for true {
    <-timer.C
    fmt.Println("时间到")
    }
    }

    运行结果为:

  3. timer实现延时功能

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    package main

    import (
    "fmt"
    "time"
    )

    func main() {
    // 方法1
    time.Sleep(time.Second * 1)
    fmt.Println("1秒过去了")
    // 方法2
    timer := time.NewTimer(time.Second * 2)
    <-timer.C
    fmt.Println("2秒过去了")
    // 方法3
    <-time.After(time.Second * 2)
    fmt.Println("2秒过去了")
    }
  4. 停止定时器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    package main

    import (
    "fmt"
    "time"
    )

    func main() {
    timer := time.NewTimer(time.Second * 2)
    go func() {
    <-timer.C
    fmt.Println("定时器执行了")
    }()
    stop := timer.Stop()
    if stop {
    fmt.Println("timer已经关闭了")
    }
    }
  5. 重置定时器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    package main

    import (
    "fmt"
    "time"
    )

    func main() {
    timer := time.NewTimer(time.Second * 3)
    timer.Reset(time.Second * 1)
    fmt.Println(time.Now()) // 2022-06-16 12:18:38.9311858 +0800 CST m=+0.004419601
    fmt.Println(<-timer.C) // 2022-06-16 12:18:39.942039 +0800 CST m=+1.015272801
    }

Ticker

Ticker:时间到了,多次执行。

看个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"time"
)

func main() {
ticker := time.NewTicker(time.Second * 1)
i := 0
go func() {
for true {
i++
fmt.Println(<-ticker.C)
if i == 5 {
ticker.Stop() // 停止
}
}
}()
for true { // 这里写个死循环是为了等待子协程能够执行完毕

}
}

运行结果如下:

可以发现输出5次之后就停止输出了。

select

select

彻底理解 IO 多路复用实现机制

在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。

你也许会写出如下代码使用遍历的方式来实现:

1
2
3
4
5
6
7
for{
// 尝试从ch1接收值
data, ok := <-ch1
// 尝试从ch2接收值
data, ok := <-ch2

}

这种方式虽然可以实现从多个通道接收值的需求,但是运行性能会差很多。为了应对这种场景,Go内置了select关键字,可以同时响应多个通道的操作。(条件语句select

1
2
3
4
5
6
7
8
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
  • select可以同时监听一个或多个channel,直到其中一个channel ready

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    package main

    import (
    "fmt"
    "time"
    )

    func main() {
    // 2个管道
    ch1 := make(chan string)
    ch2 := make(chan string)
    // 跑2个子协程,写数据
    go test1(ch1)
    go test2(ch2)
    // 用select监控
    select {
    case str1 := <-ch1:
    fmt.Println(str1)
    case str2 := <-ch2:
    fmt.Println(str2)
    }
    }

    func test1(ch chan string) {
    time.Sleep(time.Second * 2)
    ch <- "test1"
    }

    func test2(ch chan string) {
    time.Sleep(time.Second * 2)
    ch <- "test2"
    }
  • 如果多个channel同时ready,则随机选择一个执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    package main

    import (
    "fmt"
    )

    func main() {
    // 创建2个管道
    int_chan := make(chan int, 1)
    string_chan := make(chan string, 1)
    go func() {
    //time.Sleep(2 * time.Second)
    int_chan <- 1
    }()
    go func() {
    string_chan <- "hello"
    }()
    select {
    case value := <-int_chan:
    fmt.Println("int:", value)
    case value := <-string_chan:
    fmt.Println("string:", value)
    }
    fmt.Println("main结束")
    }
  • 可以用于判断管道是否存满

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    package main

    import (
    "fmt"
    "time"
    )

    // 判断管道有没有存满
    func main() {
    // 创建管道
    output1 := make(chan string, 10)
    // 子协程写数据
    go write(output1)
    // 取数据
    for s := range output1 {
    fmt.Println("res:", s)
    time.Sleep(time.Second)
    }
    }

    func write(ch chan string) {
    for {
    select {
    // 写数据
    case ch <- "hello":
    fmt.Println("write hello")
    default:
    fmt.Println("channel full")
    }
    time.Sleep(time.Millisecond * 500)
    }
    }

并发安全和锁

并发安全和锁

有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"sync"
)

var x int
var wg sync.WaitGroup

func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}

func add() {
defer wg.Done()
for i := 0; i < 5000; i++ {
x++
}
}

多次运行可以发现输出的结果是不一样的!因为上面的代码中我们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符。

互斥锁

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。

使用互斥锁来修复上面代码的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"sync"
)

var x int
var wg sync.WaitGroup
var lock sync.Mutex

func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x) // 10000
}

func add() {
defer wg.Done()
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x++
lock.Unlock() // 解锁
}
}

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

读写互斥锁

读写互斥锁

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

读写锁分为两种:读锁写锁

  • 当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;
  • 当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

读写锁示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"sync"
"time"
)

var x int
var wg sync.WaitGroup
var lock sync.Mutex
var rwLock sync.RWMutex

func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 10; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}

func write() {
defer wg.Done()
rwLock.Lock() // 加写锁
x++
time.Sleep(time.Millisecond * 10) // 假设写操作消耗10ms
rwLock.Unlock() // 解写锁
}

func read() {
defer wg.Done()
rwLock.RLock() // 加读锁
time.Sleep(time.Millisecond * 1) // 假设读操作消耗1ms
rwLock.RUnlock() // 解读锁
}

需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。

Sync包

Sync

sync.WaitGroup

sync.WaitGroup

这个在之前就已经用过很多次了,其实用起来很简单。

在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:

方法名 功能
(wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就调用wg.Add(N),将计数器值增加N。每个任务完成时通过调用wg.Done()方法将计数器减1。通过调用wg.Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。

我们利用sync.WaitGroup将上面的代码优化一下:

1
2
3
4
5
6
7
8
9
10
11
12
var wg sync.WaitGroup

func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}

需要注意sync.WaitGroup是一个结构体,传递的时候要传递指针。

sync.Once

sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。

Go语言中的sync包中提供了一个针对只执行一次场景的解决方案:sync.Once

sync.Once只有一个Do方法,其签名如下:

1
func (o *Once) Do(f func()) {}

注意:如果要执行的函数f需要传递参数就需要搭配闭包来使用。

加载配置文件示例

延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var icons map[string]image.Image

func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}

// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}

多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个goroutine都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:

1
2
3
4
5
6
7
func loadIcons() {
icons = make(map[string]image.Image)
icons["left"] = loadIcon("left.png")
icons["up"] = loadIcon("up.png")
icons["right"] = loadIcon("right.png")
icons["down"] = loadIcon("down.png")
}

在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其他的goroutine操作,但是这样做又会引发性能问题。

使用sync.Once改造的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}

// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

sync.Map

sync.Map

Go语言中内置的map不是并发安全的。请看下面的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"strconv"
"sync"
)

var m = make(map[string]int, 10)

func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
key := strconv.Itoa(n) // 将整型转换为字符串
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
}(i)
}
wg.Wait()
}

func get(key string) int {
return m[key]
}

func set(key string, value int) {
m[key] = value
}

上面的代码开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。

像这种场景下就需要为map加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版mapsync.Map开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如StoreLoadLoadOrStoreDeleteRange等操作方法。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"strconv"
"sync"
)

var m = sync.Map{}

func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
key := strconv.Itoa(n)
m.Store(key, n)
value, ok := m.Load(key)
if ok {
fmt.Printf("k=:%v,v:=%v\n", key, value)
}
}(i)
}
wg.Wait()
}

atomic包(原子操作)

原子操作(atomic包)

原子操作

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。

atomic包

方法 解释
func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr*uint32) (val uint32)
func LoadUint64(addr*uint64) (val uint64)
func LoadUintptr(addr*uintptr) (val uintptr)
func LoadPointer(addr*unsafe.Pointer) (val unsafe.Pointer)
读取操作
func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) 写入操作
func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) 修改操作
func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) 交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) 比较并交换操作
方法 解释
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr*uint32) (val uint32)
func LoadUint64(addr*uint64) (val uint64)
func LoadUintptr(addr*uintptr) (val uintptr)
func LoadPointer(addr*unsafe.Pointer) (val unsafe.Pointer)
读取操作
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
写入操作
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操作
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)<br/>func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)<br/>func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)<br/>func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)<br/>func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
比较并交换操作

示例

看个例子,来比较一下互斥锁和原子操作的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

var x int64
var lock sync.Mutex
var wg sync.WaitGroup

// 普通版加函数
func add() {
// x = x + 1
x++ // 等价于上面的操作
wg.Done()
}

// 互斥锁版加函数
func mutexAdd() {
lock.Lock()
x++
lock.Unlock()
wg.Done()
}

// 原子操作版加函数
func atomicAdd() {
atomic.AddInt64(&x, 1)
wg.Done()
}

func main() {
start := time.Now()
for i := 0; i < 10000; i++ {
wg.Add(1)
//go add() // 普通版add函数 不是并发安全的(9552 4.1647ms)
//go mutexAdd() // 加锁版add函数 是并发安全的,但是加锁性能开销大(10000 3.6763ms)
go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版(10000 3.0914ms)
}
wg.Wait()
end := time.Now()
fmt.Println(x)
fmt.Println(end.Sub(start))
}

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好

GMP 原理与调度

GMP 原理与调度

(面试常问)