goroutine 并发中竞争条件的解决

2019-11-20 09:27:19   最后更新: 2019-11-20 09:27:19   访问数量:35




上一篇文章,我们详细介绍了通过 goroutine 和通道来实现并发编程:

GoLang 的并发编程与通信 -- goroutine 与通道

 

但是,在并发环境中,有另外一个不可回避的问题,那就是如何处理竞争条件,由于并发的多个 goroutine 的执行顺序通常是无法确定的,因此他们能够访问的同一个资源就会在多个 goroutine 之间产生竞争,如何避免竞争条件,如何处理竞争,都是必须要考虑的问题,本文我们就来详细介绍一下

 

 

由于 GoLang 中 goroutine 的存在,只要让变量不在多个 goroutine 内共享,他就一定是并发安全的

如果一个变量不只限制在一个 goroutine 内,那就必须维护一个更高层的互斥不变量来保证其访问的安全性了

特殊的,包级别的变量在使用时因为无法限制在一个 goroutine 内,所以这些变量是非并发安全的,在使用中必须采取互斥机制

 

示例

下面是一个典型的竞争条件发生的例子:

package main import "fmt" var balance = 0 var sigovers = make(chan struct{}, 2) func Deposit(amount int) { balance += amount sigovers <- struct{}{} } func main() { for i := 0; i < 2; i++ { go Deposit(100) } for i := 0; i < 2; i++ { <-sigovers } fmt.Printf("balance = %d\n", balance) }

 

 

这是一个简单的银行存款余额增加的示例,balance 表示银行存款余额,函数 Deposit 负责在原有基础上增加 amount

我们循环 100 次 Deposit(100),最终理论上,balance 应该会增加到 10000

但实际上打印出的结果是:

balance = 9900

 

这是因为看上去只是一步操作的 +=,实际上进行的是读取原值、求和、赋值给原变量三步操作,他们是非并发安全的

多个 goroutine 共同通过 Deposit 函数使用了包级别的变量 balance,从而产生了竞争条件

可见,在并发环境中,竞争条件是非常严重的一个问题

 

竞争条件的避免

那么,如何在程序中避免竞争条件呢?

有三种方法可以避免:

  1. 不修改变量,每个 goroutine 都只读变量,自然不会有竞争和冲突的存在
  2. 避免从多个 goroutine 访问同一个变量,例如创建一个唯一能够访问该变量的 goroutine,从而将这个变量限制在单个 goroutine 内部,其他 goroutine 通过通道来受限的发送查询或变更变量的请求
  3. 引入互斥机制

 

第二种方式是最为推荐的,这正是 GoLang 文档中提到的:

不要通过共享内存来通信,而应该通过通信来共享内存

 

在这种方式中,负责代理受限变量的 goroutine 被称为监控 goroutine

下面是上面的银行存款管理案例的重写:

package main import "fmt" var deposits = make(chan int) var balances = make(chan int) var sigovers = make(chan struct{}, 100) func Deposit(amount int) { deposits <- amount sigovers <- struct{}{} } func Balance() int { return <-balances } func teller() { var balance int for { select { case amount := <-deposits: balance += amount case balances <- balance: } } } func main() { go teller() for i:=0; i < 100; i++ { go Deposit(100) } for i := 0; i < 100; i++ { <-sigovers } fmt.Printf("balance = %d\n", Balance()) }

 

 

绝大部分语言中,在处理并发环境可能造成的竞争条件时,都会引入互斥锁的概念,例如 linux 原生支持的互斥量、信号量等

顾名思义,所谓的互斥锁,就是保证同一时间多个并发单位中只有一个可以获取到锁,其他的并发单位只有等到持有锁的单位释放锁后方能重新获取到锁

 

由于 GoLang 中的通道阻塞机制,我们可以自己通过一个容量为 1 的通道来实现互斥锁

同一时间,只能有 N 个 goroutine 可以向容量为 N 的通道放入数据,除非通道中的数据被取出,否则其他 goroutine 都将陷入阻塞,于是,这样我们就可以实现一个计数上限为 N 的信号量:

  • 加锁操作为放入数据操作
  • 解锁操作为取出数据操作

 

示例

package main import "fmt" var sema = make(chan struct{}, 1) var sigovers = make(chan struct{}, 100) var balance = 0 func Deposit(amount int) { sema <- struct{}{} // 加锁 balance += amount <-sema // 解锁 sigovers <- struct{}{} } func Balance() int { sema <- struct{}{} b := balance <-sema return b } func main() { for i := 0; i < 100; i++ { go Deposit(100) } for i := 0; i < 100; i++ { <-sigovers } fmt.Printf("balance = %d\n", Balance()) }

 

 

上述通过通道模拟互斥量的方法虽然完美实现了加锁与解锁,但看起来很不直观

sync 包提供了 Mutex 类型来支持这种模式,通过 Lock 方法加锁,通过 Unlock 方法解锁,这样就显得十分直观了

package main import ( "fmt" "sync" ) var mu sync.Mutex var sigovers = make(chan struct{}, 100) var balance = 0 func Deposit(amount int) { mu.Lock() // 加锁 balance += amount mu.Unlock() // 解锁 sigovers <- struct{}{} } func Balance() int { mu.Lock() b := balance mu.Unlock() return b } func main() { for i := 0; i < 100; i++ { go Deposit(100) } for i := 0; i < 100; i++ { <-sigovers } fmt.Printf("balance = %d\n", Balance()) }

 

 

一个 goroutine 加锁后没有解锁而终止执行是灾难性的,我们之前介绍过 GoLang 的 defer 关键字,他可以避免这样问题的发生:

GoLang 中的异常与处理 -- 错误和宕机

 

下面的例子中,我们实现一个减少余额的方法,并返回余额是否充足:

func WithDraw(amount int) bool { mu.Lock() defer mu.Unlock() Deposit(-amount) if Balance() < 0 { Deposit(amount) return false } return true }

 

 

函数嵌套调用造成的死锁

但上面的例子中有一个隐藏的问题,那就是在 Deposit 函数中,也进行了加锁,由于锁是不可以重入的,因此 WithDraw 函数的执行会一直阻塞等待 Deposit 函数尝试获取锁操作,也就是产生了死锁

因此,推荐将加锁逻辑与具体的函数内部逻辑分开:

func Deposit(amount int) { mu.Lock() defer mu.Unlock() deposit(amount) } func deposit(amount int) { balance += amount } func WithDraw(amount int) bool { mu.Lock() defer mu.Unlock() return withDraw(amount) } func withDraw(amount int) bool { deposit(-amount) if Balance() < 0 { deposit(amount) return false } return true }

 

 

每个函数都提供导出版本与非导出版本,所有方法内均调用非导出版本,而该代码既然不需要导出,也就不用担心其作为 goroutine 在包外使用时的并发安全问题了

 

在实际的场景中,通常数据的读取十分频繁,而数据写入和更新的频率则相对较低,如果对每一次读取、写入操作都进行加锁,那么将严重影响程序的吞吐量,在这种场景下,我们需要一种特殊类型的锁,让只读操作可以并发执行,而写操作则完全独享访问权限

这样的场景下,unix 设计有读写锁 pthread_rwlock,GoLang 也同样拥有读写锁 sync.RWMutex

sync.RWMutex 类型就是读写锁,Lock 与 Unlock 两个函数分别用来加写锁、释放写锁,而 RLock 与 RUnlock 则分别用来加读锁、释放读锁(读锁也称共享锁)

但需要注意的是,RWMutex 的加解锁性能相对于 Mutex 要低一些,所以如非必要,尽量仍然使用 sync.Mutex 来实现加解锁操作,只有读操作远多于写操作,且锁竞争非常激烈时,RWMutex 才能显示出他的优势

 

示例

于是 Balance 函数可以改为:

var mu sync.RWMutex var balance int func Balance() int { mu.RLock() defer mu.RUnlock return balance }

 

 

也许你会疑惑,为什么 Balance 函数只是读取变量的值也需要加锁呢?

现代计算机一般都有处理器的多级缓存或寄存器,只有必要时才会刷回缓存,因此直接读取内存中的值可能并不是当前计算出的最新值

在 GoLang 中,通道通信、互斥锁等操作都会强制内存刷新,从而保证结果的可见性

 

很多时候,并发安全问题很难定位,幸运的是,GoLang 提供了一个十分易用的工具:竞态检测器(race detector)

只需要在 go build、go run、go test 命令中添加 -race 参数,就会在执行结束后输出一份报告,包含变量的标识以及读写 goroutine 当时的调用栈

但是,需要注意的是,竞态检测器只能随着运行过程跟随调用栈来定位是否存在竞态,对于没有执行到或尚未构成并发安全问题的代码他无法排查出来,所以最佳实践是保证 go test 执行的测试用例能够覆盖各种场景,然后通过添加 -race 参数来进行竞态的定位

 

同样作为并发单位,也许你会认为 goroutine 就是操作系统中的线程,这种认识是不对的,接下来我们就来看看他们有哪些区别

 

灵活的栈空间容量

操作系统中,每个线程在创建时,操作系统都会给他分配一个固定的栈空间,通常容量为 2MB

而 GoLang 中,goroutine 十分灵活,用户可能会一个 goroutine 中做繁重的工作,也可能同时创建十万个 goroutine,此时,固定的栈空间就显得有些呆板,GoLang 中,每个 goroutine 占用的栈空间大小都是动态变化的,他可以按需增加或缩小,最大限制达 1GB

 

goroutine 的调度

OS 线程由操作系统内核调度,随着硬件时钟中断触发内核调度器,内核调度器暂停当前线程的执行,保存寄存器等信息到内存,从内存中调度下一个要执行的线程来继续,整个过程就是一个完整的上下文切换,这是一个性能极低的操作

与操作系统类似,GoLang 也拥有一个用于调度 goroutine 的调度器,但 GoLang 调度器不是由硬件时钟定期触发的,而是由特定的 GoLang 语言结构触发的,整个调度过程不涉及用户态与内核态的切换,所以性能消耗要比操作系统线程的切换低很多

这个 GoLang 调度器也被称为 m:n 调度器,m 指的是被调度的 goroutine 数量,n 则指的是实际使用的线程数

环境变量 GOMAXPROCS 就是用来控制上面的 n 参数的大小的,默认为机器上 CPU 的个数

 

线程标识

每个操作系统的线程都拥有一个唯一的标识,但在使用中,很多程序员将线程标识与业务耦合在一起,从而造成了很多十分诡异的现象和问题,这与鼓励简单编程的 GoLang 风格相左,所以 GoLang 拒绝为每个 goroutine 提供他们独有的标识

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,只有全部原创,只有干货没有鸡汤

 

 






线程      竞争条件            mutex      并发编程      golang      goroutine     


京ICP备15018585号